diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java b/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java index cbf162d2..093dce41 100644 --- a/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java +++ b/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java @@ -16,19 +16,20 @@ */ package org.springframework.cloud.bus.rocketmq.env; +import static org.springframework.cloud.bus.SpringCloudBusClient.INPUT; + +import java.util.HashMap; +import java.util.Map; + import org.springframework.boot.SpringApplication; import org.springframework.boot.env.EnvironmentPostProcessor; import org.springframework.cloud.bus.BusEnvironmentPostProcessor; -import org.springframework.cloud.bus.SpringCloudBusClient; import org.springframework.core.Ordered; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.MapPropertySource; import org.springframework.core.env.MutablePropertySources; import org.springframework.core.env.PropertySource; -import java.util.HashMap; -import java.util.Map; - /** * The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus * Properties that will be appended into {@link SpringApplication#defaultProperties} @@ -64,9 +65,16 @@ public class RocketMQBusEnvironmentPostProcessor private void configureDefaultProperties(Map source) { // Required Properties - String groupBindingPropertyName = createBindingPropertyName( - SpringCloudBusClient.INPUT, "group"); + String groupBindingPropertyName = createBindingPropertyName(INPUT, "group"); + String broadcastingPropertyName = createRocketMQPropertyName(INPUT, + "broadcasting"); source.put(groupBindingPropertyName, "rocketmq-bus-group"); + source.put(broadcastingPropertyName, "true"); + } + + private String createRocketMQPropertyName(String channel, String propertyName) { + return "spring.cloud.stream.rocketmq.bindings." + INPUT + ".consumer." + + propertyName; } private String createBindingPropertyName(String channel, String propertyName) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index b2ab0b00..03e43a54 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -82,6 +82,12 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { + // if producerGroup is empty, using destination + String extendedProducerGroup = producerProperties.getExtension().getGroup(); + String producerGroup = StringUtils.isEmpty(extendedProducerGroup) + ? destination.getName() + : extendedProducerGroup; + RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils .mergeProperties(rocketBinderConfigurationProperties, rocketMQProperties); @@ -110,8 +116,7 @@ public class RocketMQMessageChannelBinder extends if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { RPCHook rpcHook = new AclClientRPCHook( new SessionCredentials(ak, sk)); - producer = new DefaultMQProducer( - producerProperties.getExtension().getGroup(), rpcHook, + producer = new DefaultMQProducer(producerGroup, rpcHook, mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); @@ -119,8 +124,7 @@ public class RocketMQMessageChannelBinder extends RocketMQUtil.getInstanceName(rpcHook, destination.getName())); } else { - producer = new DefaultMQProducer( - producerProperties.getExtension().getGroup()); + producer = new DefaultMQProducer(producerGroup); producer.setVipChannelEnabled( producerProperties.getExtension().getVipChannelEnabled()); } @@ -141,8 +145,7 @@ public class RocketMQMessageChannelBinder extends } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( - rocketMQTemplate, destination.getName(), - producerProperties.getExtension().getGroup(), + rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), instrumentationManager); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());