diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index c6ee54ca..e580594e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,7 +16,11 @@ package com.alibaba.cloud.stream.binder.rocketmq; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; @@ -26,7 +30,11 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQUtil; -import org.springframework.cloud.stream.binder.*; +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; @@ -53,6 +61,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvis import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; + import com.fasterxml.jackson.databind.ObjectMapper; /** diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 6a9e4cd2..8e00a249 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -19,16 +19,18 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming; import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES; import java.util.List; -import java.util.Map; import java.util.Objects; -import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; -import com.google.common.collect.Maps; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; -import org.apache.rocketmq.client.consumer.listener.*; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; @@ -47,7 +49,6 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.context.SmartLifecycle; import org.springframework.integration.support.MessageBuilder; -import org.springframework.integration.support.MutableMessageHeaders; import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -55,6 +56,7 @@ import org.springframework.util.StringUtils; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; /** * A class that Listen on rocketmq message diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 1f2f7be7..ec5c4c6e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -20,9 +20,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; -import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; -import com.google.common.collect.Maps; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; @@ -52,6 +49,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; /** * @author Jim diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index f8a4b39b..53489355 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -16,7 +16,8 @@ package com.alibaba.cloud.stream.binder.rocketmq.properties; -import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; +import java.util.Set; + import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -24,8 +25,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; -import java.util.Set; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; /** * @author Timur Valiev diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java index 41605858..2c582b97 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -18,7 +18,13 @@ package com.alibaba.cloud.stream.binder.rocketmq.support; import java.io.IOException; import java.nio.charset.Charset; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory;