From beb00fbb1024c576b86e12ca131201041ab6ff94 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 31 Oct 2019 17:42:30 +0800 Subject: [PATCH] fix Binder --- .../binder/rocketmq/RocketMQMessageChannelBinder.java | 8 +++++++- .../rocketmq/integration/RocketMQMessageHandler.java | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 34a6f925..23931a62 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -31,6 +31,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; @@ -149,6 +150,10 @@ public class RocketMQMessageChannelBinder extends producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); rocketMQTemplate.setProducer(producer); + if (producerProperties.isPartitioned()) { + rocketMQTemplate + .setMessageQueueSelector(new PartitionMessageQueueSelector()); + } } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( @@ -190,6 +195,7 @@ public class RocketMQMessageChannelBinder extends consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); + listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); @@ -239,7 +245,7 @@ public class RocketMQMessageChannelBinder extends } private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages) { - ObjectMapper objectMapper = this.getApplicationContext() + ObjectMapper objectMapper = this.getApplicationContext().getParent() .getBeansOfType(ObjectMapper.class).values().iterator().next(); JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper( objectMapper); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 5ccfdfb6..ebcab55c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -88,6 +88,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li this.groupName = groupName; this.transactional = transactional; this.instrumentationManager = instrumentationManager; + this.producerProperties = producerProperties; } @Override