mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
sync & commit in edgware
This commit is contained in:
@@ -16,8 +16,11 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
|
||||
*/
|
||||
public interface RocketMQBinderConstants {
|
||||
|
||||
@@ -31,6 +34,11 @@ public interface RocketMQBinderConstants {
|
||||
*/
|
||||
String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
|
||||
|
||||
String DEFAULT_GROUP = "rocketmq_binder_default_group_name";
|
||||
String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
|
||||
|
||||
/**
|
||||
* RocketMQ re-consume times
|
||||
*/
|
||||
String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
|
||||
|
||||
}
|
||||
|
@@ -16,11 +16,12 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
|
@@ -16,27 +16,11 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.common.UtilAll;
|
||||
import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.util.StringUtils;
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
|
||||
@@ -47,8 +31,29 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.common.UtilAll;
|
||||
import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
@@ -149,10 +154,10 @@ public class RocketMQMessageChannelBinder extends
|
||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||
rocketMQTemplate, destination.getName(), producerGroup,
|
||||
producerProperties.getExtension().getTransactional(),
|
||||
instrumentationManager);
|
||||
instrumentationManager, producerProperties);
|
||||
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
||||
messageHandler.setSync(producerProperties.getExtension().getSync());
|
||||
|
||||
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
|
||||
if (errorChannel != null) {
|
||||
messageHandler.setSendFailureChannel(errorChannel);
|
||||
}
|
||||
@@ -220,4 +225,28 @@ public class RocketMQMessageChannelBinder extends
|
||||
public Map<String, String> getTopicInUse() {
|
||||
return topicInUse;
|
||||
}
|
||||
|
||||
private RocketMQHeaderMapper createHeaderMapper(
|
||||
final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
|
||||
Set<String> trustedPackages = extendedConsumerProperties.getExtension()
|
||||
.getTrustedPackages();
|
||||
return createHeaderMapper(trustedPackages);
|
||||
}
|
||||
|
||||
private RocketMQHeaderMapper createHeaderMapper(
|
||||
final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
|
||||
return createHeaderMapper(new ArrayList<String>());
|
||||
}
|
||||
|
||||
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages) {
|
||||
ObjectMapper objectMapper = this.getApplicationContext()
|
||||
.getBeansOfType(ObjectMapper.class).values().iterator().next();
|
||||
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(
|
||||
objectMapper);
|
||||
if (!StringUtils.isEmpty(trustedPackages)) {
|
||||
headerMapper.addTrustedPackages(trustedPackages);
|
||||
}
|
||||
return headerMapper;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -16,13 +16,13 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.actuator;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
|
@@ -16,20 +16,21 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
|
@@ -16,12 +16,12 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
|
@@ -16,6 +16,9 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
@@ -24,6 +27,7 @@ import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
|
||||
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
|
||||
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
@@ -32,10 +36,6 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
|
@@ -18,6 +18,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
@@ -42,15 +47,16 @@ import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
@@ -83,6 +89,8 @@ public class RocketMQListenerBindingContainer
|
||||
|
||||
private RocketMQListener rocketMQListener;
|
||||
|
||||
private RocketMQHeaderMapper headerMapper;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
private boolean running;
|
||||
@@ -90,12 +98,16 @@ public class RocketMQListenerBindingContainer
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
|
||||
|
||||
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
// The following properties came from RocketMQConsumerProperties.
|
||||
private ConsumeMode consumeMode;
|
||||
|
||||
private SelectorType selectorType;
|
||||
|
||||
private String selectorExpression;
|
||||
|
||||
private MessageModel messageModel;
|
||||
|
||||
public RocketMQListenerBindingContainer(
|
||||
@@ -364,10 +376,35 @@ public class RocketMQListenerBindingContainer
|
||||
return messageModel;
|
||||
}
|
||||
|
||||
public RocketMQHeaderMapper getHeaderMapper() {
|
||||
return headerMapper;
|
||||
}
|
||||
|
||||
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
|
||||
this.headerMapper = headerMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert rocketmq {@link MessageExt} to Spring {@link Message}.
|
||||
* @param messageExt the rocketmq message
|
||||
* @return the converted Spring {@link Message}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Message convertToSpringMessage(MessageExt messageExt) {
|
||||
|
||||
// add reconsume-times header to messageExt
|
||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||
String.valueOf(reconsumeTimes));
|
||||
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
|
||||
return MessageBuilder.fromMessage(message)
|
||||
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerConcurrently
|
||||
implements MessageListenerConcurrently {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SuppressWarnings({ "unchecked", "Duplicates" })
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeConcurrentlyContext context) {
|
||||
@@ -375,10 +412,10 @@ public class RocketMQListenerBindingContainer
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
rocketMQListener
|
||||
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
|
||||
rocketMQListener.onMessage(convertToSpringMessage(messageExt));
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
|
||||
log.debug("consume {} message key:[{}] cost: {} ms",
|
||||
messageExt.getMsgId(), messageExt.getKeys(), costTime);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("consume message failed. messageExt:{}", messageExt, e);
|
||||
@@ -389,11 +426,12 @@ public class RocketMQListenerBindingContainer
|
||||
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SuppressWarnings({ "unchecked", "Duplicates" })
|
||||
@Override
|
||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeOrderlyContext context) {
|
||||
@@ -401,10 +439,10 @@ public class RocketMQListenerBindingContainer
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
rocketMQListener
|
||||
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
|
||||
rocketMQListener.onMessage(convertToSpringMessage(messageExt));
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
|
||||
log.info("consume {} message key:[{}] cost: {} ms",
|
||||
messageExt.getMsgId(), messageExt.getKeys(), costTime);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("consume message failed. messageExt:{}", messageExt, e);
|
||||
|
@@ -16,9 +16,15 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.integration.endpoint.MessageProducerSupport;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
@@ -31,11 +37,6 @@ import org.springframework.retry.RetryListener;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
|
@@ -16,30 +16,39 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.client.producer.SendStatus;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.support.DefaultErrorMessageStrategy;
|
||||
import org.springframework.integration.support.ErrorMessageStrategy;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.ErrorMessage;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@@ -54,6 +63,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private RocketMQHeaderMapper headerMapper;
|
||||
|
||||
private final Boolean transactional;
|
||||
|
||||
private final String destination;
|
||||
@@ -66,12 +77,15 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
private volatile boolean running = false;
|
||||
|
||||
private ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
|
||||
|
||||
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
|
||||
String group, Boolean transactional,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
String groupName, Boolean transactional,
|
||||
InstrumentationManager instrumentationManager,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
|
||||
this.rocketMQTemplate = rocketMQTemplate;
|
||||
this.destination = destination;
|
||||
this.groupName = group;
|
||||
this.groupName = groupName;
|
||||
this.transactional = transactional;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
@@ -95,6 +109,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
.build(), e);
|
||||
}
|
||||
}
|
||||
if (producerProperties.isPartitioned()) {
|
||||
try {
|
||||
List<MessageQueue> messageQueues = rocketMQTemplate.getProducer()
|
||||
.fetchPublishMessageQueues(destination);
|
||||
if (producerProperties.getPartitionCount() != messageQueues.size()) {
|
||||
logger.info(String.format(
|
||||
"The partition count of topic '%s' will change from '%s' to '%s'",
|
||||
destination, producerProperties.getPartitionCount(),
|
||||
messageQueues.size()));
|
||||
producerProperties.setPartitionCount(messageQueues.size());
|
||||
}
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
logger.error("fetch publish message queues fail", e);
|
||||
}
|
||||
}
|
||||
running = true;
|
||||
}
|
||||
|
||||
@@ -145,36 +175,52 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
boolean needSelectQueue = message.getHeaders()
|
||||
.containsKey(BinderHeaders.PARTITION_HEADER);
|
||||
if (sync) {
|
||||
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
|
||||
rocketMQTemplate.getProducer().getSendMsgTimeout(),
|
||||
delayLevel);
|
||||
if (needSelectQueue) {
|
||||
sendRes = rocketMQTemplate.syncSendOrderly(
|
||||
topicWithTags.toString(), message, "",
|
||||
rocketMQTemplate.getProducer().getSendMsgTimeout());
|
||||
}
|
||||
else {
|
||||
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
|
||||
message,
|
||||
rocketMQTemplate.getProducer().getSendMsgTimeout(),
|
||||
delayLevel);
|
||||
}
|
||||
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
|
||||
}
|
||||
else {
|
||||
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
|
||||
new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.debug("async send to topic " + topicWithTags + " "
|
||||
+ sendResult);
|
||||
}
|
||||
final Message<?> finalMessage = message;
|
||||
SendCallback sendCallback = new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.debug("async send to topic " + topicWithTags + " "
|
||||
+ sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable e) {
|
||||
log.error(
|
||||
"RocketMQ Message hasn't been sent. Caused by "
|
||||
+ e.getMessage());
|
||||
if (getSendFailureChannel() != null) {
|
||||
getSendFailureChannel().send(
|
||||
RocketMQMessageHandler.this.errorMessageStrategy
|
||||
.buildErrorMessage(
|
||||
new MessagingException(
|
||||
message, e),
|
||||
null));
|
||||
}
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onException(Throwable e) {
|
||||
log.error("RocketMQ Message hasn't been sent. Caused by "
|
||||
+ e.getMessage());
|
||||
if (getSendFailureChannel() != null) {
|
||||
getSendFailureChannel().send(
|
||||
RocketMQMessageHandler.this.errorMessageStrategy
|
||||
.buildErrorMessage(new MessagingException(
|
||||
finalMessage, e), null));
|
||||
}
|
||||
}
|
||||
};
|
||||
if (needSelectQueue) {
|
||||
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
|
||||
message, "", sendCallback,
|
||||
rocketMQTemplate.getProducer().getSendMsgTimeout());
|
||||
}
|
||||
else {
|
||||
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
|
||||
sendCallback);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
@@ -229,4 +275,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
public void setSync(boolean sync) {
|
||||
this.sync = sync;
|
||||
}
|
||||
|
||||
public RocketMQHeaderMapper getHeaderMapper() {
|
||||
return headerMapper;
|
||||
}
|
||||
|
||||
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
|
||||
this.headerMapper = headerMapper;
|
||||
}
|
||||
}
|
@@ -16,21 +16,26 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import javax.validation.constraints.Pattern;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
||||
@Validated
|
||||
public class RocketMQBinderConfigurationProperties {
|
||||
|
||||
/**
|
||||
* The name server for rocketMQ, formats: `host:port;host:port`.
|
||||
*/
|
||||
@Pattern(regexp = "^[\\d.:;]+$", message = "nameServer needs to match expression \"host:port;host:port\"")
|
||||
private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
|
||||
|
||||
/**
|
||||
@@ -93,4 +98,5 @@ public class RocketMQBinderConfigurationProperties {
|
||||
public void setCustomizedTraceTopic(String customizedTraceTopic) {
|
||||
this.customizedTraceTopic = customizedTraceTopic;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -16,8 +16,13 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.MQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||
@@ -51,7 +56,9 @@ public class RocketMQConsumerProperties {
|
||||
private Boolean orderly = false;
|
||||
|
||||
/**
|
||||
* for concurrently listener. message consume retry strategy
|
||||
* for concurrently listener. message consume retry strategy. see
|
||||
* {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or
|
||||
* discard, see {@link this#shouldRequeue}), others means requeue
|
||||
*/
|
||||
private int delayLevelWhenNextConsume = 0;
|
||||
|
||||
@@ -62,6 +69,19 @@ public class RocketMQConsumerProperties {
|
||||
|
||||
private Boolean enabled = true;
|
||||
|
||||
/**
|
||||
* {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}
|
||||
*/
|
||||
private Set<String> trustedPackages;
|
||||
|
||||
// ------------ For Pull Consumer ------------
|
||||
|
||||
private long pullTimeout = 10 * 1000;
|
||||
|
||||
private boolean fromStore;
|
||||
|
||||
// ------------ For Pull Consumer ------------
|
||||
|
||||
public String getTags() {
|
||||
return tags;
|
||||
}
|
||||
@@ -117,4 +137,32 @@ public class RocketMQConsumerProperties {
|
||||
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
|
||||
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public long getPullTimeout() {
|
||||
return pullTimeout;
|
||||
}
|
||||
|
||||
public void setPullTimeout(long pullTimeout) {
|
||||
this.pullTimeout = pullTimeout;
|
||||
}
|
||||
|
||||
public boolean isFromStore() {
|
||||
return fromStore;
|
||||
}
|
||||
|
||||
public void setFromStore(boolean fromStore) {
|
||||
this.fromStore = fromStore;
|
||||
}
|
||||
|
||||
public boolean shouldRequeue() {
|
||||
return delayLevelWhenNextConsume != -1;
|
||||
}
|
||||
|
||||
public Set<String> getTrustedPackages() {
|
||||
return trustedPackages;
|
||||
}
|
||||
|
||||
public void setTrustedPackages(Set<String> trustedPackages) {
|
||||
this.trustedPackages = trustedPackages;
|
||||
}
|
||||
}
|
||||
|
@@ -16,10 +16,14 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.provisioning;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
|
||||
import org.apache.rocketmq.client.Validators;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
@@ -27,9 +31,6 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
|
@@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
|
||||
/**
|
||||
* @author wangxing
|
||||
*/
|
||||
public class PartitionMessageQueueSelector implements MessageQueueSelector {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory
|
||||
.getLogger(PartitionMessageQueueSelector.class);
|
||||
|
||||
@Override
|
||||
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
||||
Integer partition = 0;
|
||||
try {
|
||||
partition = Math.abs(
|
||||
Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER)));
|
||||
if (partition >= mqs.size()) {
|
||||
LOGGER.warn(
|
||||
"the partition '{}' is greater than the number of queues '{}'.",
|
||||
partition, mqs.size());
|
||||
partition = partition % mqs.size();
|
||||
}
|
||||
}
|
||||
catch (NumberFormatException ignored) {
|
||||
}
|
||||
return mqs.get(partition);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Base for RocketMQ header mappers.
|
||||
*
|
||||
* @author caotc
|
||||
* @since 1.5.1.RELEASE
|
||||
*/
|
||||
public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper {
|
||||
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||
|
||||
private Charset charset;
|
||||
|
||||
public AbstractRocketMQHeaderMapper() {
|
||||
this(DEFAULT_CHARSET);
|
||||
}
|
||||
|
||||
public AbstractRocketMQHeaderMapper(Charset charset) {
|
||||
Assert.notNull(charset, "'charset' cannot be null");
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
protected boolean matches(String headerName) {
|
||||
return !MessageConst.STRING_HASH_SET.contains(headerName)
|
||||
&& !MessageHeaders.ID.equals(headerName)
|
||||
&& !MessageHeaders.TIMESTAMP.equals(headerName)
|
||||
&& !MessageHeaders.CONTENT_TYPE.equals(headerName)
|
||||
&& !MessageHeaders.REPLY_CHANNEL.equals(headerName)
|
||||
&& !MessageHeaders.ERROR_CHANNEL.equals(headerName);
|
||||
}
|
||||
|
||||
public Charset getCharset() {
|
||||
return charset;
|
||||
}
|
||||
|
||||
public void setCharset(Charset charset) {
|
||||
Assert.notNull(charset, "'charset' cannot be null");
|
||||
this.charset = charset;
|
||||
}
|
||||
}
|
@@ -0,0 +1,289 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* jackson header mapper for RocketMQ. Header types are added to a special header
|
||||
* {@link #JSON_TYPES}.
|
||||
*
|
||||
* @author caotc
|
||||
* @since 1.5.1.RELEASE
|
||||
*/
|
||||
public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
|
||||
|
||||
private final static Logger log = LoggerFactory
|
||||
.getLogger(JacksonRocketMQHeaderMapper.class);
|
||||
|
||||
private static final List<String> DEFAULT_TRUSTED_PACKAGES = Arrays
|
||||
.asList("java.lang", "java.net", "java.util", "org.springframework.util");
|
||||
|
||||
/**
|
||||
* Header name for java types of other headers.
|
||||
*/
|
||||
public static final String JSON_TYPES = "spring_json_header_types";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final Set<String> trustedPackages = new LinkedHashSet<>(
|
||||
DEFAULT_TRUSTED_PACKAGES);
|
||||
|
||||
public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) {
|
||||
super(charset);
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> fromHeaders(MessageHeaders headers) {
|
||||
final Map<String, String> target = new HashMap<>();
|
||||
final Map<String, String> jsonHeaders = new HashMap<>();
|
||||
|
||||
for (String key : headers.keySet()) {
|
||||
Object value = headers.get(key);
|
||||
if (matches(key)) {
|
||||
if (value instanceof String) {
|
||||
target.put(key, (String) value);
|
||||
}
|
||||
else {
|
||||
try {
|
||||
String className = value.getClass().getName();
|
||||
target.put(key, objectMapper.writeValueAsString(value));
|
||||
jsonHeaders.put(key, className);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.debug("Could not map " + key + " with type "
|
||||
+ value.getClass().getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (jsonHeaders.size() > 0) {
|
||||
try {
|
||||
target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders));
|
||||
}
|
||||
catch (IllegalStateException | JsonProcessingException e) {
|
||||
log.error("Could not add json types header", e);
|
||||
}
|
||||
}
|
||||
return target;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageHeaders toHeaders(Map<String, String> source) {
|
||||
final Map<String, Object> target = new HashMap<>();
|
||||
final Map<String, String> jsonTypes = decodeJsonTypes(source);
|
||||
|
||||
for (String key : source.keySet()) {
|
||||
String value = source.get(key);
|
||||
if (matches(key) && !(key.equals(JSON_TYPES))) {
|
||||
if (jsonTypes != null && jsonTypes.containsKey(key)) {
|
||||
Class<?> type = Object.class;
|
||||
String requestedType = jsonTypes.get(key);
|
||||
boolean trusted = trusted(requestedType);
|
||||
if (trusted) {
|
||||
try {
|
||||
type = ClassUtils.forName(requestedType, null);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Could not load class for header: " + key, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (trusted) {
|
||||
try {
|
||||
Object val = decodeValue(value, type);
|
||||
target.put(key, val);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Could not decode json type: " + value
|
||||
+ " for key: " + key, e);
|
||||
target.put(key, value);
|
||||
}
|
||||
}
|
||||
else {
|
||||
target.put(key, new NonTrustedHeaderType(value, requestedType));
|
||||
}
|
||||
}
|
||||
else {
|
||||
target.put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new MessageHeaders(target);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param packagesToTrust the packages to trust.
|
||||
* @see #addTrustedPackages(Collection)
|
||||
*/
|
||||
public void addTrustedPackages(String... packagesToTrust) {
|
||||
if (packagesToTrust != null) {
|
||||
addTrustedPackages(Arrays.asList(packagesToTrust));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add packages to the trusted packages list (default {@code java.util, java.lang})
|
||||
* used when constructing objects from JSON. If any of the supplied packages is
|
||||
* {@code "*"}, all packages are trusted. If a class for a non-trusted package is
|
||||
* encountered, the header is returned to the application with value of type
|
||||
* {@link NonTrustedHeaderType}.
|
||||
* @param packagesToTrust the packages to trust.
|
||||
*/
|
||||
public void addTrustedPackages(Collection<String> packagesToTrust) {
|
||||
if (packagesToTrust != null) {
|
||||
for (String whiteList : packagesToTrust) {
|
||||
if ("*".equals(whiteList)) {
|
||||
this.trustedPackages.clear();
|
||||
break;
|
||||
}
|
||||
else {
|
||||
this.trustedPackages.add(whiteList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getTrustedPackages() {
|
||||
return this.trustedPackages;
|
||||
}
|
||||
|
||||
public ObjectMapper getObjectMapper() {
|
||||
return objectMapper;
|
||||
}
|
||||
|
||||
private Object decodeValue(String jsonString, Class<?> type)
|
||||
throws IOException, LinkageError {
|
||||
Object value = objectMapper.readValue(jsonString, type);
|
||||
if (type.equals(NonTrustedHeaderType.class)) {
|
||||
// Upstream NTHT propagated; may be trusted here...
|
||||
NonTrustedHeaderType nth = (NonTrustedHeaderType) value;
|
||||
if (trusted(nth.getUntrustedType())) {
|
||||
try {
|
||||
value = objectMapper.readValue(nth.getHeaderValue(),
|
||||
ClassUtils.forName(nth.getUntrustedType(), null));
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Could not decode header: " + nth, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
private Map<String, String> decodeJsonTypes(Map<String, String> source) {
|
||||
if (source.containsKey(JSON_TYPES)) {
|
||||
String value = source.get(JSON_TYPES);
|
||||
try {
|
||||
return objectMapper.readValue(value,
|
||||
new TypeReference<Map<String, String>>() {
|
||||
});
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Could not decode json types: " + value, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
protected boolean trusted(String requestedType) {
|
||||
if (requestedType.equals(NonTrustedHeaderType.class.getName())) {
|
||||
return true;
|
||||
}
|
||||
if (!this.trustedPackages.isEmpty()) {
|
||||
int lastDot = requestedType.lastIndexOf('.');
|
||||
if (lastDot < 0) {
|
||||
return false;
|
||||
}
|
||||
String packageName = requestedType.substring(0, lastDot);
|
||||
for (String trustedPackage : this.trustedPackages) {
|
||||
if (packageName.equals(trustedPackage)
|
||||
|| packageName.startsWith(trustedPackage + ".")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a header that could not be decoded due to an untrusted type.
|
||||
*/
|
||||
public static class NonTrustedHeaderType {
|
||||
|
||||
private String headerValue;
|
||||
|
||||
private String untrustedType;
|
||||
|
||||
public NonTrustedHeaderType() {
|
||||
super();
|
||||
}
|
||||
|
||||
NonTrustedHeaderType(String headerValue, String untrustedType) {
|
||||
this.headerValue = headerValue;
|
||||
this.untrustedType = untrustedType;
|
||||
}
|
||||
|
||||
public void setHeaderValue(String headerValue) {
|
||||
this.headerValue = headerValue;
|
||||
}
|
||||
|
||||
public String getHeaderValue() {
|
||||
return this.headerValue;
|
||||
}
|
||||
|
||||
public void setUntrustedType(String untrustedType) {
|
||||
this.untrustedType = untrustedType;
|
||||
}
|
||||
|
||||
public String getUntrustedType() {
|
||||
return this.untrustedType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NonTrustedHeaderType [headerValue=" + headerValue + ", untrustedType="
|
||||
+ this.untrustedType + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.support;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
/**
|
||||
* header value mapper for RocketMQ
|
||||
*
|
||||
* @author caotc
|
||||
* @since 1.5.1.RELEASE
|
||||
*/
|
||||
public interface RocketMQHeaderMapper {
|
||||
/**
|
||||
* Map from the given {@link MessageHeaders} to the specified target message.
|
||||
* @param headers the abstracted MessageHeaders.
|
||||
* @return the native target message.
|
||||
*/
|
||||
Map<String, String> fromHeaders(MessageHeaders headers);
|
||||
|
||||
/**
|
||||
* Map from the given target message to abstracted {@link MessageHeaders}.
|
||||
* @param source the native target message.
|
||||
* @return the target headers.
|
||||
*/
|
||||
MessageHeaders toHeaders(Map<String, String> source);
|
||||
}
|
@@ -16,17 +16,18 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
|
Reference in New Issue
Block a user