diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 152ccd31..38776db5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -83,6 +83,12 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { + // if producerGroup is empty, using destination + String extendedProducerGroup = producerProperties.getExtension().getGroup(); + String producerGroup = StringUtils.isEmpty(extendedProducerGroup) + ? destination.getName() + : extendedProducerGroup; + RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils .mergeProperties(rocketBinderConfigurationProperties, rocketMQProperties); @@ -111,8 +117,7 @@ public class RocketMQMessageChannelBinder extends if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { RPCHook rpcHook = new AclClientRPCHook( new SessionCredentials(ak, sk)); - producer = new DefaultMQProducer( - producerProperties.getExtension().getGroup(), rpcHook, + producer = new DefaultMQProducer(producerGroup, rpcHook, mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); @@ -120,8 +125,7 @@ public class RocketMQMessageChannelBinder extends RocketMQUtil.getInstanceName(rpcHook, destination.getName())); } else { - producer = new DefaultMQProducer( - producerProperties.getExtension().getGroup()); + producer = new DefaultMQProducer(producerGroup); producer.setVipChannelEnabled( producerProperties.getExtension().getVipChannelEnabled()); } @@ -142,8 +146,7 @@ public class RocketMQMessageChannelBinder extends } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( - rocketMQTemplate, destination.getName(), - producerProperties.getExtension().getGroup(), + rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), instrumentationManager); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());