1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

fix Binder

This commit is contained in:
fangjian0423 2019-10-31 17:42:30 +08:00
parent 3455c90232
commit beb00fbb10
2 changed files with 8 additions and 1 deletions

View File

@ -31,6 +31,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
@ -149,6 +150,10 @@ public class RocketMQMessageChannelBinder extends
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
if (producerProperties.isPartitioned()) {
rocketMQTemplate
.setMessageQueueSelector(new PartitionMessageQueueSelector());
}
}
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
@ -190,6 +195,7 @@ public class RocketMQMessageChannelBinder extends
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
listenerContainer, consumerProperties, instrumentationManager);
@ -239,7 +245,7 @@ public class RocketMQMessageChannelBinder extends
}
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages) {
ObjectMapper objectMapper = this.getApplicationContext()
ObjectMapper objectMapper = this.getApplicationContext().getParent()
.getBeansOfType(ObjectMapper.class).values().iterator().next();
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(
objectMapper);

View File

@ -88,6 +88,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
this.groupName = groupName;
this.transactional = transactional;
this.instrumentationManager = instrumentationManager;
this.producerProperties = producerProperties;
}
@Override