diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 91516050..579153e1 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -82,11 +82,10 @@ public class RocketMQMessageChannelBinder extends this.instrumentationManager = instrumentationManager; } - @Override + @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, - MessageChannel channel, - MessageChannel errorChannel) throws Exception { + MessageChannel channel, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { // if producerGroup is empty, using destination @@ -155,8 +154,6 @@ public class RocketMQMessageChannelBinder extends } } - - RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), @@ -167,7 +164,7 @@ public class RocketMQMessageChannelBinder extends .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); - messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); + messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); if (errorChannel != null) { messageHandler.setSendFailureChannel(errorChannel); } @@ -208,7 +205,7 @@ public class RocketMQMessageChannelBinder extends consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); - listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); + listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); @@ -293,26 +290,27 @@ public class RocketMQMessageChannelBinder extends this.extendedBindingProperties = extendedBindingProperties; } - private RocketMQHeaderMapper createHeaderMapper( - final ExtendedConsumerProperties extendedConsumerProperties) { - Set trustedPackages = extendedConsumerProperties.getExtension() - .getTrustedPackages(); - return createHeaderMapper(trustedPackages); - } + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedConsumerProperties extendedConsumerProperties) { + Set trustedPackages = extendedConsumerProperties.getExtension() + .getTrustedPackages(); + return createHeaderMapper(trustedPackages); + } - private RocketMQHeaderMapper createHeaderMapper( - final ExtendedProducerProperties producerProperties) { - return createHeaderMapper(Collections.emptyList()); - } + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedProducerProperties producerProperties) { + return createHeaderMapper(Collections.emptyList()); + } - private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages){ - ObjectMapper objectMapper=this.getApplicationContext() - .getBeansOfType(ObjectMapper.class).values().iterator().next(); - JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(objectMapper); - if (!StringUtils.isEmpty(trustedPackages)) { - headerMapper.addTrustedPackages(trustedPackages); - } - return headerMapper; - } + private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages) { + ObjectMapper objectMapper = this.getApplicationContext() + .getBeansOfType(ObjectMapper.class).values().iterator().next(); + JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper( + objectMapper); + if (!StringUtils.isEmpty(trustedPackages)) { + headerMapper.addTrustedPackages(trustedPackages); + } + return headerMapper; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index d0e16604..6a9e4cd2 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -445,14 +445,15 @@ public class RocketMQListenerBindingContainer * @return the converted Spring {@link Message} */ @SuppressWarnings("unchecked") - private Message convertToSpringMessage(MessageExt messageExt) { + private Message convertToSpringMessage(MessageExt messageExt) { // add reconsume-times header to messageExt int reconsumeTimes = messageExt.getReconsumeTimes(); messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); - Message message=RocketMQUtil.convertToSpringMessage(messageExt); - return MessageBuilder.fromMessage(message).copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); + Message message = RocketMQUtil.convertToSpringMessage(messageExt); + return MessageBuilder.fromMessage(message) + .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index f778964b..1f2f7be7 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private final RocketMQTemplate rocketMQTemplate; - private RocketMQHeaderMapper headerMapper; + private RocketMQHeaderMapper headerMapper; private final Boolean transactional; @@ -156,12 +156,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li protected void handleMessageInternal(org.springframework.messaging.Message message) throws Exception { try { - //issue 737 fix - Map jsonHeaders=headerMapper.fromHeaders(message.getHeaders()); - message = org.springframework.messaging.support.MessageBuilder - .fromMessage(message).copyHeaders(jsonHeaders) - .build(); - + // issue 737 fix + Map jsonHeaders = headerMapper + .fromHeaders(message.getHeaders()); + message = org.springframework.messaging.support.MessageBuilder + .fromMessage(message).copyHeaders(jsonHeaders).build(); final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional @@ -210,8 +209,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { - Message finalMessage = message; - SendCallback sendCallback = new SendCallback() { + Message finalMessage = message; + SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("async send to topic " + topicWithTags + " " @@ -226,7 +225,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage(new MessagingException( - finalMessage, e), null)); + finalMessage, e), null)); } } };