mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
issue737 实现rocketmq的header的value的无感知序列化和反序列化
Signed-off-by: caotc <250622148@qq.com>
This commit is contained in:
parent
aa78fcdd0c
commit
818a139696
@ -19,6 +19,8 @@ package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
@ -210,6 +212,9 @@ public class RocketMQMessageChannelBinder extends
|
||||
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
|
||||
listenerContainer
|
||||
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
||||
RocketMQHeaderMapper headerMapper=new JacksonRocketMQHeaderMapper(this.getApplicationContext()
|
||||
.getBeansOfType(ObjectMapper.class).values().iterator().next());
|
||||
listenerContainer.setHeaderMapper(headerMapper);
|
||||
|
||||
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
|
||||
listenerContainer, consumerProperties, instrumentationManager);
|
||||
|
@ -19,8 +19,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming;
|
||||
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
@ -43,6 +46,8 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.integration.support.MutableMessageHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
@ -88,6 +93,8 @@ public class RocketMQListenerBindingContainer
|
||||
|
||||
private RocketMQListener rocketMQListener;
|
||||
|
||||
private RocketMQHeaderMapper headerMapper;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
private boolean running;
|
||||
@ -369,6 +376,14 @@ public class RocketMQListenerBindingContainer
|
||||
return messageModel;
|
||||
}
|
||||
|
||||
public RocketMQHeaderMapper getHeaderMapper() {
|
||||
return headerMapper;
|
||||
}
|
||||
|
||||
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
|
||||
this.headerMapper = headerMapper;
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerConcurrently
|
||||
implements MessageListenerConcurrently {
|
||||
|
||||
@ -435,8 +450,10 @@ public class RocketMQListenerBindingContainer
|
||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||
String.valueOf(reconsumeTimes));
|
||||
|
||||
return RocketMQUtil.convertToSpringMessage(messageExt);
|
||||
Message message=RocketMQUtil.convertToSpringMessage(messageExt);
|
||||
Map<String,Object> afterMapperHeaders= Maps.newHashMap();
|
||||
headerMapper.toHeaders(messageExt.getProperties(),afterMapperHeaders);
|
||||
return MessageBuilder.fromMessage(message).copyHeaders(afterMapperHeaders).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,8 +17,12 @@
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
@ -37,6 +41,7 @@ import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.support.DefaultErrorMessageStrategy;
|
||||
import org.springframework.integration.support.ErrorMessageStrategy;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.ErrorMessage;
|
||||
@ -62,6 +67,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private final RocketMQHeaderMapper headerMapper;
|
||||
|
||||
private final Boolean transactional;
|
||||
|
||||
private final String destination;
|
||||
@ -90,6 +97,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
this.producerProperties = producerProperties;
|
||||
this.partitioningInterceptor = partitioningInterceptor;
|
||||
|
||||
this.headerMapper=new JacksonRocketMQHeaderMapper(rocketMQTemplate.getObjectMapper());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -149,6 +158,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
||||
throws Exception {
|
||||
try {
|
||||
//issue 737 fix
|
||||
Map<String,String> jsonHeaders= Maps.newHashMap();
|
||||
headerMapper.fromHeaders(message.getHeaders(),jsonHeaders);
|
||||
message = org.springframework.messaging.support.MessageBuilder
|
||||
.fromMessage(message).copyHeaders(jsonHeaders)
|
||||
.build();
|
||||
|
||||
|
||||
final StringBuilder topicWithTags = new StringBuilder(destination);
|
||||
String tags = Optional
|
||||
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
|
||||
@ -156,6 +173,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
if (!StringUtils.isEmpty(tags)) {
|
||||
topicWithTags.append(":").append(tags);
|
||||
}
|
||||
|
||||
SendResult sendRes = null;
|
||||
if (transactional) {
|
||||
sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
|
||||
@ -195,7 +213,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
|
||||
}
|
||||
else {
|
||||
SendCallback sendCallback = new SendCallback() {
|
||||
Message<?> finalMessage = message;
|
||||
SendCallback sendCallback = new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.debug("async send to topic " + topicWithTags + " "
|
||||
@ -210,7 +229,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
getSendFailureChannel().send(
|
||||
RocketMQMessageHandler.this.errorMessageStrategy
|
||||
.buildErrorMessage(new MessagingException(
|
||||
message, e), null));
|
||||
finalMessage, e), null));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -0,0 +1,47 @@
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* Base for RocketMQ header mappers.
|
||||
*
|
||||
* @author caotc
|
||||
* @date 2019-08-22
|
||||
* @since 2.1.1
|
||||
*/
|
||||
public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper{
|
||||
private static final Charset DEFAULT_CHARSET=StandardCharsets.UTF_8;
|
||||
|
||||
private Charset charset;
|
||||
|
||||
public AbstractRocketMQHeaderMapper() {
|
||||
this(DEFAULT_CHARSET);
|
||||
}
|
||||
|
||||
public AbstractRocketMQHeaderMapper(Charset charset) {
|
||||
Assert.notNull(charset, "'charset' cannot be null");
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
protected boolean matches(String headerName) {
|
||||
return !MessageConst.STRING_HASH_SET.contains(headerName) && !MessageHeaders.ID.equals(headerName)
|
||||
&& !MessageHeaders.TIMESTAMP.equals(headerName) && !MessageHeaders.CONTENT_TYPE.equals(headerName)
|
||||
&& !MessageHeaders.REPLY_CHANNEL.equals(headerName) && !MessageHeaders.ERROR_CHANNEL.equals(headerName);
|
||||
}
|
||||
|
||||
public Charset getCharset() {
|
||||
return charset;
|
||||
}
|
||||
|
||||
public void setCharset(Charset charset) {
|
||||
Assert.notNull(charset, "'charset' cannot be null");
|
||||
this.charset = charset;
|
||||
}
|
||||
}
|
@ -0,0 +1,246 @@
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* jackson header mapper for RocketMQ.
|
||||
* Header types are added to a special header {@link #JSON_TYPES}.
|
||||
*
|
||||
* @author caotc
|
||||
* @date 2019-08-22
|
||||
* @since 2.1.1
|
||||
*/
|
||||
public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
|
||||
private final static Logger log = LoggerFactory
|
||||
.getLogger(JacksonRocketMQHeaderMapper.class);
|
||||
|
||||
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
|
||||
Arrays.asList(
|
||||
"java.lang",
|
||||
"java.net",
|
||||
"java.util",
|
||||
"org.springframework.util"
|
||||
);
|
||||
|
||||
/**
|
||||
* Header name for java types of other headers.
|
||||
*/
|
||||
public static final String JSON_TYPES = "spring_json_header_types";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES);
|
||||
|
||||
public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) {
|
||||
super(charset);
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fromHeaders(Map<String,Object> headers, Map<String, String> target) {
|
||||
final Map<String, String> jsonHeaders = Maps.newHashMap();
|
||||
headers.forEach((key, value) -> {
|
||||
if (matches(key)) {
|
||||
if (value instanceof String) {
|
||||
target.put(key, (String) value);
|
||||
}else {
|
||||
try {
|
||||
String className = value.getClass().getName();
|
||||
target.put(key, objectMapper.writeValueAsString(value));
|
||||
jsonHeaders.put(key, className);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.debug("Could not map " + key + " with type " + value.getClass().getName(),e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if (jsonHeaders.size() > 0) {
|
||||
try {
|
||||
target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders));
|
||||
}
|
||||
catch (IllegalStateException | JsonProcessingException e) {
|
||||
log.error( "Could not add json types header",e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Map<String, String> source, Map<String,Object> target) {
|
||||
final Map<String, String> jsonTypes = decodeJsonTypes(source);
|
||||
source.forEach((key,value) -> {
|
||||
if (!(key.equals(JSON_TYPES))) {
|
||||
if (jsonTypes != null && jsonTypes.containsKey(key)) {
|
||||
Class<?> type = Object.class;
|
||||
String requestedType = jsonTypes.get(key);
|
||||
boolean trusted = trusted(requestedType);
|
||||
if (trusted) {
|
||||
try {
|
||||
type = ClassUtils.forName(requestedType, null);
|
||||
}catch (Exception e) {
|
||||
log.error( "Could not load class for header: " + key,e);
|
||||
}
|
||||
}
|
||||
|
||||
if (trusted) {
|
||||
try {
|
||||
Object val = decodeValue(value, type);
|
||||
target.put(key, val);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Could not decode json type: " + value + " for key: "
|
||||
+ key,e);
|
||||
target.put(key, value);
|
||||
}
|
||||
}else {
|
||||
target.put(key, new NonTrustedHeaderType(value, requestedType));
|
||||
}
|
||||
}
|
||||
else {
|
||||
target.put(key, value);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used
|
||||
* when constructing objects from JSON.
|
||||
* If any of the supplied packages is {@code "*"}, all packages are trusted.
|
||||
* If a class for a non-trusted package is encountered, the header is returned to the
|
||||
* application with value of type {@link NonTrustedHeaderType}.
|
||||
* @param packagesToTrust the packages to trust.
|
||||
*/
|
||||
public void addTrustedPackages(String... packagesToTrust) {
|
||||
if (packagesToTrust != null) {
|
||||
for (String whiteList : packagesToTrust) {
|
||||
if ("*".equals(whiteList)) {
|
||||
this.trustedPackages.clear();
|
||||
break;
|
||||
}
|
||||
else {
|
||||
this.trustedPackages.add(whiteList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Set<String> getTrustedPackages() {
|
||||
return this.trustedPackages;
|
||||
}
|
||||
|
||||
public ObjectMapper getObjectMapper() {
|
||||
return objectMapper;
|
||||
}
|
||||
|
||||
private Object decodeValue(String jsonString, Class<?> type) throws IOException, LinkageError {
|
||||
Object value = objectMapper.readValue(jsonString, type);
|
||||
if (type.equals(NonTrustedHeaderType.class)) {
|
||||
// Upstream NTHT propagated; may be trusted here...
|
||||
NonTrustedHeaderType nth = (NonTrustedHeaderType) value;
|
||||
if (trusted(nth.getUntrustedType())) {
|
||||
try {
|
||||
value = objectMapper.readValue(nth.getHeaderValue(),
|
||||
ClassUtils.forName(nth.getUntrustedType(), null));
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Could not decode header: " + nth,e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Map<String, String> decodeJsonTypes(Map<String, String> source) {
|
||||
if(source.containsKey(JSON_TYPES)){
|
||||
String value=source.get(JSON_TYPES);
|
||||
try {
|
||||
return objectMapper.readValue(value,new TypeReference<Map<String,String>>(){});
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Could not decode json types: " + value,e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
protected boolean trusted(String requestedType) {
|
||||
if (requestedType.equals(NonTrustedHeaderType.class.getName())) {
|
||||
return true;
|
||||
}
|
||||
if (!this.trustedPackages.isEmpty()) {
|
||||
int lastDot = requestedType.lastIndexOf('.');
|
||||
if (lastDot < 0) {
|
||||
return false;
|
||||
}
|
||||
String packageName = requestedType.substring(0, lastDot);
|
||||
for (String trustedPackage : this.trustedPackages) {
|
||||
if (packageName.equals(trustedPackage) || packageName.startsWith(trustedPackage + ".")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a header that could not be decoded due to an untrusted type.
|
||||
*/
|
||||
public static class NonTrustedHeaderType {
|
||||
|
||||
private String headerValue;
|
||||
|
||||
private String untrustedType;
|
||||
|
||||
public NonTrustedHeaderType() {
|
||||
super();
|
||||
}
|
||||
|
||||
NonTrustedHeaderType(String headerValue, String untrustedType) {
|
||||
this.headerValue = headerValue;
|
||||
this.untrustedType = untrustedType;
|
||||
}
|
||||
|
||||
|
||||
public void setHeaderValue(String headerValue) {
|
||||
this.headerValue = headerValue;
|
||||
}
|
||||
|
||||
public String getHeaderValue() {
|
||||
return this.headerValue;
|
||||
}
|
||||
|
||||
public void setUntrustedType(String untrustedType) {
|
||||
this.untrustedType = untrustedType;
|
||||
}
|
||||
|
||||
public String getUntrustedType() {
|
||||
return this.untrustedType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NonTrustedHeaderType [headerValue=" + headerValue
|
||||
+ ", untrustedType=" + this.untrustedType + "]";
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* header value mapper for RocketMQ
|
||||
*
|
||||
* @author caotc
|
||||
* @date 2019-08-22
|
||||
* @since 2.1.1
|
||||
*/
|
||||
public interface RocketMQHeaderMapper {
|
||||
|
||||
void fromHeaders(Map<String,Object> headers, Map<String,String> target);
|
||||
|
||||
void toHeaders(Map<String,String> source, Map<String,Object> target);
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user