diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 03e43a54..7ea7337a 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -120,8 +121,8 @@ public class RocketMQMessageChannelBinder extends mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); - producer.setInstanceName( - RocketMQUtil.getInstanceName(rpcHook, destination.getName())); + producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, + destination.getName() + "|" + UtilAll.getPid())); } else { producer = new DefaultMQProducer(producerGroup); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 714a3e92..9b6854d8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -205,7 +206,8 @@ public class RocketMQListenerBindingContainer new AllocateMessageQueueAveragely(), rocketBinderConfigurationProperties.isEnableMsgTrace(), rocketBinderConfigurationProperties.getCustomizedTraceTopic()); - consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic)); + consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, + topic + "|" + UtilAll.getPid())); consumer.setVipChannelEnabled(false); } else {