From cf88032925b550e5a8c02333ed468d656c5f9b0b Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Tue, 25 Jun 2019 18:52:57 +0800 Subject: [PATCH] edgware sync rocketmq-binder: fix bug about instanceName of Aliware MQ --- .../stream/binder/rocketmq/RocketMQMessageChannelBinder.java | 5 +++-- .../rocketmq/consuming/RocketMQListenerBindingContainer.java | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 03e43a54..7ea7337a 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -120,8 +121,8 @@ public class RocketMQMessageChannelBinder extends mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); - producer.setInstanceName( - RocketMQUtil.getInstanceName(rpcHook, destination.getName())); + producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, + destination.getName() + "|" + UtilAll.getPid())); } else { producer = new DefaultMQProducer(producerGroup); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 714a3e92..9b6854d8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -205,7 +206,8 @@ public class RocketMQListenerBindingContainer new AllocateMessageQueueAveragely(), rocketBinderConfigurationProperties.isEnableMsgTrace(), rocketBinderConfigurationProperties.getCustomizedTraceTopic()); - consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic)); + consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, + topic + "|" + UtilAll.getPid())); consumer.setVipChannelEnabled(false); } else {