mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Merge pull request #880 from liudaomanbu/master
issue737 实现rocketmq的header的value的无感知序列化和反序列化
This commit is contained in:
commit
0b296cb264
@ -16,8 +16,7 @@
|
|||||||
|
|
||||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.*;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||||
@ -27,11 +26,7 @@ import org.apache.rocketmq.remoting.RPCHook;
|
|||||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
import org.springframework.cloud.stream.binder.*;
|
||||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
|
||||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
|
||||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
|
||||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
|
||||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
|
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
|
||||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||||
@ -56,6 +51,8 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindi
|
|||||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
|
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -89,8 +86,7 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
@Override
|
@Override
|
||||||
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
||||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||||
MessageChannel channel,
|
MessageChannel channel, MessageChannel errorChannel) throws Exception {
|
||||||
MessageChannel errorChannel) throws Exception {
|
|
||||||
if (producerProperties.getExtension().getEnabled()) {
|
if (producerProperties.getExtension().getEnabled()) {
|
||||||
|
|
||||||
// if producerGroup is empty, using destination
|
// if producerGroup is empty, using destination
|
||||||
@ -169,7 +165,7 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
.findFirst().orElse(null));
|
.findFirst().orElse(null));
|
||||||
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
||||||
messageHandler.setSync(producerProperties.getExtension().getSync());
|
messageHandler.setSync(producerProperties.getExtension().getSync());
|
||||||
|
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
|
||||||
if (errorChannel != null) {
|
if (errorChannel != null) {
|
||||||
messageHandler.setSendFailureChannel(errorChannel);
|
messageHandler.setSendFailureChannel(errorChannel);
|
||||||
}
|
}
|
||||||
@ -210,6 +206,7 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
|
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
|
||||||
listenerContainer
|
listenerContainer
|
||||||
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
||||||
|
listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
|
||||||
|
|
||||||
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
|
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
|
||||||
listenerContainer, consumerProperties, instrumentationManager);
|
listenerContainer, consumerProperties, instrumentationManager);
|
||||||
@ -294,4 +291,27 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
this.extendedBindingProperties = extendedBindingProperties;
|
this.extendedBindingProperties = extendedBindingProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RocketMQHeaderMapper createHeaderMapper(
|
||||||
|
final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
|
||||||
|
Set<String> trustedPackages = extendedConsumerProperties.getExtension()
|
||||||
|
.getTrustedPackages();
|
||||||
|
return createHeaderMapper(trustedPackages);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RocketMQHeaderMapper createHeaderMapper(
|
||||||
|
final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
|
||||||
|
return createHeaderMapper(Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages) {
|
||||||
|
ObjectMapper objectMapper = this.getApplicationContext()
|
||||||
|
.getBeansOfType(ObjectMapper.class).values().iterator().next();
|
||||||
|
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(
|
||||||
|
objectMapper);
|
||||||
|
if (!StringUtils.isEmpty(trustedPackages)) {
|
||||||
|
headerMapper.addTrustedPackages(trustedPackages);
|
||||||
|
}
|
||||||
|
return headerMapper;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -19,8 +19,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming;
|
|||||||
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
|
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
@ -43,6 +46,8 @@ import org.slf4j.LoggerFactory;
|
|||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||||
import org.springframework.context.SmartLifecycle;
|
import org.springframework.context.SmartLifecycle;
|
||||||
|
import org.springframework.integration.support.MessageBuilder;
|
||||||
|
import org.springframework.integration.support.MutableMessageHeaders;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
@ -88,6 +93,8 @@ public class RocketMQListenerBindingContainer
|
|||||||
|
|
||||||
private RocketMQListener rocketMQListener;
|
private RocketMQListener rocketMQListener;
|
||||||
|
|
||||||
|
private RocketMQHeaderMapper headerMapper;
|
||||||
|
|
||||||
private DefaultMQPushConsumer consumer;
|
private DefaultMQPushConsumer consumer;
|
||||||
|
|
||||||
private boolean running;
|
private boolean running;
|
||||||
@ -369,6 +376,14 @@ public class RocketMQListenerBindingContainer
|
|||||||
return messageModel;
|
return messageModel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RocketMQHeaderMapper getHeaderMapper() {
|
||||||
|
return headerMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
|
||||||
|
this.headerMapper = headerMapper;
|
||||||
|
}
|
||||||
|
|
||||||
public class DefaultMessageListenerConcurrently
|
public class DefaultMessageListenerConcurrently
|
||||||
implements MessageListenerConcurrently {
|
implements MessageListenerConcurrently {
|
||||||
|
|
||||||
@ -429,14 +444,16 @@ public class RocketMQListenerBindingContainer
|
|||||||
* @param messageExt the rocketmq message
|
* @param messageExt the rocketmq message
|
||||||
* @return the converted Spring {@link Message}
|
* @return the converted Spring {@link Message}
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private Message convertToSpringMessage(MessageExt messageExt) {
|
private Message convertToSpringMessage(MessageExt messageExt) {
|
||||||
|
|
||||||
// add reconsume-times header to messageExt
|
// add reconsume-times header to messageExt
|
||||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||||
String.valueOf(reconsumeTimes));
|
String.valueOf(reconsumeTimes));
|
||||||
|
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
|
||||||
return RocketMQUtil.convertToSpringMessage(messageExt);
|
return MessageBuilder.fromMessage(message)
|
||||||
|
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,12 @@
|
|||||||
package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
import org.apache.rocketmq.client.producer.SendCallback;
|
import org.apache.rocketmq.client.producer.SendCallback;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
@ -37,6 +41,7 @@ import org.springframework.integration.handler.AbstractMessageHandler;
|
|||||||
import org.springframework.integration.support.DefaultErrorMessageStrategy;
|
import org.springframework.integration.support.DefaultErrorMessageStrategy;
|
||||||
import org.springframework.integration.support.ErrorMessageStrategy;
|
import org.springframework.integration.support.ErrorMessageStrategy;
|
||||||
import org.springframework.integration.support.MessageBuilder;
|
import org.springframework.integration.support.MessageBuilder;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageChannel;
|
||||||
import org.springframework.messaging.MessagingException;
|
import org.springframework.messaging.MessagingException;
|
||||||
import org.springframework.messaging.support.ErrorMessage;
|
import org.springframework.messaging.support.ErrorMessage;
|
||||||
@ -62,6 +67,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
|
|
||||||
private final RocketMQTemplate rocketMQTemplate;
|
private final RocketMQTemplate rocketMQTemplate;
|
||||||
|
|
||||||
|
private RocketMQHeaderMapper headerMapper;
|
||||||
|
|
||||||
private final Boolean transactional;
|
private final Boolean transactional;
|
||||||
|
|
||||||
private final String destination;
|
private final String destination;
|
||||||
@ -149,6 +156,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
try {
|
try {
|
||||||
|
// issue 737 fix
|
||||||
|
Map<String, String> jsonHeaders = headerMapper
|
||||||
|
.fromHeaders(message.getHeaders());
|
||||||
|
message = org.springframework.messaging.support.MessageBuilder
|
||||||
|
.fromMessage(message).copyHeaders(jsonHeaders).build();
|
||||||
|
|
||||||
final StringBuilder topicWithTags = new StringBuilder(destination);
|
final StringBuilder topicWithTags = new StringBuilder(destination);
|
||||||
String tags = Optional
|
String tags = Optional
|
||||||
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
|
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
|
||||||
@ -156,6 +169,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
if (!StringUtils.isEmpty(tags)) {
|
if (!StringUtils.isEmpty(tags)) {
|
||||||
topicWithTags.append(":").append(tags);
|
topicWithTags.append(":").append(tags);
|
||||||
}
|
}
|
||||||
|
|
||||||
SendResult sendRes = null;
|
SendResult sendRes = null;
|
||||||
if (transactional) {
|
if (transactional) {
|
||||||
sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
|
sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
|
||||||
@ -195,6 +209,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
|
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
Message<?> finalMessage = message;
|
||||||
SendCallback sendCallback = new SendCallback() {
|
SendCallback sendCallback = new SendCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(SendResult sendResult) {
|
public void onSuccess(SendResult sendResult) {
|
||||||
@ -210,7 +225,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
getSendFailureChannel().send(
|
getSendFailureChannel().send(
|
||||||
RocketMQMessageHandler.this.errorMessageStrategy
|
RocketMQMessageHandler.this.errorMessageStrategy
|
||||||
.buildErrorMessage(new MessagingException(
|
.buildErrorMessage(new MessagingException(
|
||||||
message, e), null));
|
finalMessage, e), null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -277,4 +292,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
public void setSync(boolean sync) {
|
public void setSync(boolean sync) {
|
||||||
this.sync = sync;
|
this.sync = sync;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RocketMQHeaderMapper getHeaderMapper() {
|
||||||
|
return headerMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
|
||||||
|
this.headerMapper = headerMapper;
|
||||||
|
}
|
||||||
}
|
}
|
@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||||
import org.apache.rocketmq.client.consumer.MQPushConsumer;
|
import org.apache.rocketmq.client.consumer.MQPushConsumer;
|
||||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||||
@ -23,6 +24,9 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Timur Valiev
|
* @author Timur Valiev
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||||
@ -65,6 +69,11 @@ public class RocketMQConsumerProperties {
|
|||||||
|
|
||||||
private Boolean enabled = true;
|
private Boolean enabled = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}
|
||||||
|
*/
|
||||||
|
private Set<String> trustedPackages;
|
||||||
|
|
||||||
// ------------ For Pull Consumer ------------
|
// ------------ For Pull Consumer ------------
|
||||||
|
|
||||||
private long pullTimeout = 10 * 1000;
|
private long pullTimeout = 10 * 1000;
|
||||||
@ -148,4 +157,12 @@ public class RocketMQConsumerProperties {
|
|||||||
public boolean shouldRequeue() {
|
public boolean shouldRequeue() {
|
||||||
return delayLevelWhenNextConsume != -1;
|
return delayLevelWhenNextConsume != -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<String> getTrustedPackages() {
|
||||||
|
return trustedPackages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrustedPackages(Set<String> trustedPackages) {
|
||||||
|
this.trustedPackages = trustedPackages;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,63 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (C) 2019 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||||
|
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.common.message.MessageConst;
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base for RocketMQ header mappers.
|
||||||
|
*
|
||||||
|
* @author caotc
|
||||||
|
* @since 2.1.1.RELEASE
|
||||||
|
*/
|
||||||
|
public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper {
|
||||||
|
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||||
|
|
||||||
|
private Charset charset;
|
||||||
|
|
||||||
|
public AbstractRocketMQHeaderMapper() {
|
||||||
|
this(DEFAULT_CHARSET);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractRocketMQHeaderMapper(Charset charset) {
|
||||||
|
Assert.notNull(charset, "'charset' cannot be null");
|
||||||
|
this.charset = charset;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean matches(String headerName) {
|
||||||
|
return !MessageConst.STRING_HASH_SET.contains(headerName)
|
||||||
|
&& !MessageHeaders.ID.equals(headerName)
|
||||||
|
&& !MessageHeaders.TIMESTAMP.equals(headerName)
|
||||||
|
&& !MessageHeaders.CONTENT_TYPE.equals(headerName)
|
||||||
|
&& !MessageHeaders.REPLY_CHANNEL.equals(headerName)
|
||||||
|
&& !MessageHeaders.ERROR_CHANNEL.equals(headerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Charset getCharset() {
|
||||||
|
return charset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCharset(Charset charset) {
|
||||||
|
Assert.notNull(charset, "'charset' cannot be null");
|
||||||
|
this.charset = charset;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,278 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (C) 2019 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.lang.Nullable;
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
import org.springframework.util.ClassUtils;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* jackson header mapper for RocketMQ. Header types are added to a special header
|
||||||
|
* {@link #JSON_TYPES}.
|
||||||
|
*
|
||||||
|
* @author caotc
|
||||||
|
* @since 2.1.1.RELEASE
|
||||||
|
*/
|
||||||
|
public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
|
||||||
|
private final static Logger log = LoggerFactory
|
||||||
|
.getLogger(JacksonRocketMQHeaderMapper.class);
|
||||||
|
|
||||||
|
private static final List<String> DEFAULT_TRUSTED_PACKAGES = Arrays
|
||||||
|
.asList("java.lang", "java.net", "java.util", "org.springframework.util");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Header name for java types of other headers.
|
||||||
|
*/
|
||||||
|
public static final String JSON_TYPES = "spring_json_header_types";
|
||||||
|
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
private final Set<String> trustedPackages = new LinkedHashSet<>(
|
||||||
|
DEFAULT_TRUSTED_PACKAGES);
|
||||||
|
|
||||||
|
public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) {
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) {
|
||||||
|
super(charset);
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> fromHeaders(MessageHeaders headers) {
|
||||||
|
final Map<String, String> target = Maps.newHashMap();
|
||||||
|
final Map<String, String> jsonHeaders = Maps.newHashMap();
|
||||||
|
headers.forEach((key, value) -> {
|
||||||
|
if (matches(key)) {
|
||||||
|
if (value instanceof String) {
|
||||||
|
target.put(key, (String) value);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
try {
|
||||||
|
String className = value.getClass().getName();
|
||||||
|
target.put(key, objectMapper.writeValueAsString(value));
|
||||||
|
jsonHeaders.put(key, className);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.debug("Could not map " + key + " with type "
|
||||||
|
+ value.getClass().getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (jsonHeaders.size() > 0) {
|
||||||
|
try {
|
||||||
|
target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders));
|
||||||
|
}
|
||||||
|
catch (IllegalStateException | JsonProcessingException e) {
|
||||||
|
log.error("Could not add json types header", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageHeaders toHeaders(Map<String, String> source) {
|
||||||
|
final Map<String, Object> target = Maps.newHashMap();
|
||||||
|
final Map<String, String> jsonTypes = decodeJsonTypes(source);
|
||||||
|
source.forEach((key, value) -> {
|
||||||
|
if (matches(key) && !(key.equals(JSON_TYPES))) {
|
||||||
|
if (jsonTypes != null && jsonTypes.containsKey(key)) {
|
||||||
|
Class<?> type = Object.class;
|
||||||
|
String requestedType = jsonTypes.get(key);
|
||||||
|
boolean trusted = trusted(requestedType);
|
||||||
|
if (trusted) {
|
||||||
|
try {
|
||||||
|
type = ClassUtils.forName(requestedType, null);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Could not load class for header: " + key, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (trusted) {
|
||||||
|
try {
|
||||||
|
Object val = decodeValue(value, type);
|
||||||
|
target.put(key, val);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
log.error("Could not decode json type: " + value
|
||||||
|
+ " for key: " + key, e);
|
||||||
|
target.put(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
target.put(key, new NonTrustedHeaderType(value, requestedType));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
target.put(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return new MessageHeaders(target);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param packagesToTrust the packages to trust.
|
||||||
|
* @see #addTrustedPackages(Collection)
|
||||||
|
*/
|
||||||
|
public void addTrustedPackages(String... packagesToTrust) {
|
||||||
|
if (Objects.nonNull(packagesToTrust)) {
|
||||||
|
addTrustedPackages(Arrays.asList(packagesToTrust));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add packages to the trusted packages list (default {@code java.util, java.lang})
|
||||||
|
* used when constructing objects from JSON. If any of the supplied packages is
|
||||||
|
* {@code "*"}, all packages are trusted. If a class for a non-trusted package is
|
||||||
|
* encountered, the header is returned to the application with value of type
|
||||||
|
* {@link NonTrustedHeaderType}.
|
||||||
|
* @param packagesToTrust the packages to trust.
|
||||||
|
*/
|
||||||
|
public void addTrustedPackages(Collection<String> packagesToTrust) {
|
||||||
|
if (packagesToTrust != null) {
|
||||||
|
for (String whiteList : packagesToTrust) {
|
||||||
|
if ("*".equals(whiteList)) {
|
||||||
|
this.trustedPackages.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.trustedPackages.add(whiteList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getTrustedPackages() {
|
||||||
|
return this.trustedPackages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ObjectMapper getObjectMapper() {
|
||||||
|
return objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object decodeValue(String jsonString, Class<?> type)
|
||||||
|
throws IOException, LinkageError {
|
||||||
|
Object value = objectMapper.readValue(jsonString, type);
|
||||||
|
if (type.equals(NonTrustedHeaderType.class)) {
|
||||||
|
// Upstream NTHT propagated; may be trusted here...
|
||||||
|
NonTrustedHeaderType nth = (NonTrustedHeaderType) value;
|
||||||
|
if (trusted(nth.getUntrustedType())) {
|
||||||
|
try {
|
||||||
|
value = objectMapper.readValue(nth.getHeaderValue(),
|
||||||
|
ClassUtils.forName(nth.getUntrustedType(), null));
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Could not decode header: " + nth, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private Map<String, String> decodeJsonTypes(Map<String, String> source) {
|
||||||
|
if (source.containsKey(JSON_TYPES)) {
|
||||||
|
String value = source.get(JSON_TYPES);
|
||||||
|
try {
|
||||||
|
return objectMapper.readValue(value,
|
||||||
|
new TypeReference<Map<String, String>>() {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
log.error("Could not decode json types: " + value, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean trusted(String requestedType) {
|
||||||
|
if (requestedType.equals(NonTrustedHeaderType.class.getName())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!this.trustedPackages.isEmpty()) {
|
||||||
|
int lastDot = requestedType.lastIndexOf('.');
|
||||||
|
if (lastDot < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String packageName = requestedType.substring(0, lastDot);
|
||||||
|
for (String trustedPackage : this.trustedPackages) {
|
||||||
|
if (packageName.equals(trustedPackage)
|
||||||
|
|| packageName.startsWith(trustedPackage + ".")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a header that could not be decoded due to an untrusted type.
|
||||||
|
*/
|
||||||
|
public static class NonTrustedHeaderType {
|
||||||
|
|
||||||
|
private String headerValue;
|
||||||
|
|
||||||
|
private String untrustedType;
|
||||||
|
|
||||||
|
public NonTrustedHeaderType() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
NonTrustedHeaderType(String headerValue, String untrustedType) {
|
||||||
|
this.headerValue = headerValue;
|
||||||
|
this.untrustedType = untrustedType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHeaderValue(String headerValue) {
|
||||||
|
this.headerValue = headerValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHeaderValue() {
|
||||||
|
return this.headerValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUntrustedType(String untrustedType) {
|
||||||
|
this.untrustedType = untrustedType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUntrustedType() {
|
||||||
|
return this.untrustedType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "NonTrustedHeaderType [headerValue=" + headerValue + ", untrustedType="
|
||||||
|
+ this.untrustedType + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,43 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (C) 2019 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* header value mapper for RocketMQ
|
||||||
|
*
|
||||||
|
* @author caotc
|
||||||
|
* @since 2.1.1.RELEASE
|
||||||
|
*/
|
||||||
|
public interface RocketMQHeaderMapper {
|
||||||
|
/**
|
||||||
|
* Map from the given {@link MessageHeaders} to the specified target message.
|
||||||
|
* @param headers the abstracted MessageHeaders.
|
||||||
|
* @return the native target message.
|
||||||
|
*/
|
||||||
|
Map<String, String> fromHeaders(MessageHeaders headers);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map from the given target message to abstracted {@link MessageHeaders}.
|
||||||
|
* @param source the native target message.
|
||||||
|
* @return the target headers.
|
||||||
|
*/
|
||||||
|
MessageHeaders toHeaders(Map<String, String> source);
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user