From 265400ff5a4b7452f0ebce9ead92fc9b859084c5 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 19 Jun 2019 15:20:44 +0800 Subject: [PATCH] fix #709 --- .../stream/binder/rocketmq/RocketMQMessageChannelBinder.java | 5 +++-- .../rocketmq/consuming/RocketMQListenerBindingContainer.java | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 30eb3760..6df97fca 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -126,8 +127,8 @@ public class RocketMQMessageChannelBinder extends mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); - producer.setInstanceName( - RocketMQUtil.getInstanceName(rpcHook, destination.getName())); + producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, + destination.getName() + "|" + UtilAll.getPid())); } else { producer = new DefaultMQProducer(producerGroup); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 35f29ae8..de19d262 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -206,7 +207,8 @@ public class RocketMQListenerBindingContainer new AllocateMessageQueueAveragely(), rocketBinderConfigurationProperties.isEnableMsgTrace(), rocketBinderConfigurationProperties.getCustomizedTraceTopic()); - consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic)); + consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, + topic + "|" + UtilAll.getPid())); consumer.setVipChannelEnabled(false); } else {