1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

fix spring-cloud-starter-bus-rocketmq in 1.x

This commit is contained in:
fangjian0423 2019-03-14 20:30:39 +08:00
parent a59011fa93
commit eb72043b9f
2 changed files with 23 additions and 12 deletions

View File

@ -16,19 +16,20 @@
*/ */
package org.springframework.cloud.bus.rocketmq.env; package org.springframework.cloud.bus.rocketmq.env;
import static org.springframework.cloud.bus.SpringCloudBusClient.INPUT;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor; import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.cloud.bus.BusEnvironmentPostProcessor; import org.springframework.cloud.bus.BusEnvironmentPostProcessor;
import org.springframework.cloud.bus.SpringCloudBusClient;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource; import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources; import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertySource; import org.springframework.core.env.PropertySource;
import java.util.HashMap;
import java.util.Map;
/** /**
* The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus * The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus
* Properties that will be appended into {@link SpringApplication#defaultProperties} * Properties that will be appended into {@link SpringApplication#defaultProperties}
@ -64,9 +65,16 @@ public class RocketMQBusEnvironmentPostProcessor
private void configureDefaultProperties(Map<String, Object> source) { private void configureDefaultProperties(Map<String, Object> source) {
// Required Properties // Required Properties
String groupBindingPropertyName = createBindingPropertyName( String groupBindingPropertyName = createBindingPropertyName(INPUT, "group");
SpringCloudBusClient.INPUT, "group"); String broadcastingPropertyName = createRocketMQPropertyName(INPUT,
"broadcasting");
source.put(groupBindingPropertyName, "rocketmq-bus-group"); source.put(groupBindingPropertyName, "rocketmq-bus-group");
source.put(broadcastingPropertyName, "true");
}
private String createRocketMQPropertyName(String channel, String propertyName) {
return "spring.cloud.stream.rocketmq.bindings." + INPUT + ".consumer."
+ propertyName;
} }
private String createBindingPropertyName(String channel, String propertyName) { private String createBindingPropertyName(String channel, String propertyName) {

View File

@ -82,6 +82,12 @@ public class RocketMQMessageChannelBinder extends
MessageChannel errorChannel) throws Exception { MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) { if (producerProperties.getExtension().getEnabled()) {
// if producerGroup is empty, using destination
String extendedProducerGroup = producerProperties.getExtension().getGroup();
String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
? destination.getName()
: extendedProducerGroup;
RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
.mergeProperties(rocketBinderConfigurationProperties, .mergeProperties(rocketBinderConfigurationProperties,
rocketMQProperties); rocketMQProperties);
@ -110,8 +116,7 @@ public class RocketMQMessageChannelBinder extends
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook( RPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ak, sk)); new SessionCredentials(ak, sk));
producer = new DefaultMQProducer( producer = new DefaultMQProducer(producerGroup, rpcHook,
producerProperties.getExtension().getGroup(), rpcHook,
mergedProperties.isEnableMsgTrace(), mergedProperties.isEnableMsgTrace(),
mergedProperties.getCustomizedTraceTopic()); mergedProperties.getCustomizedTraceTopic());
producer.setVipChannelEnabled(false); producer.setVipChannelEnabled(false);
@ -119,8 +124,7 @@ public class RocketMQMessageChannelBinder extends
RocketMQUtil.getInstanceName(rpcHook, destination.getName())); RocketMQUtil.getInstanceName(rpcHook, destination.getName()));
} }
else { else {
producer = new DefaultMQProducer( producer = new DefaultMQProducer(producerGroup);
producerProperties.getExtension().getGroup());
producer.setVipChannelEnabled( producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled()); producerProperties.getExtension().getVipChannelEnabled());
} }
@ -141,8 +145,7 @@ public class RocketMQMessageChannelBinder extends
} }
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), rocketMQTemplate, destination.getName(), producerGroup,
producerProperties.getExtension().getGroup(),
producerProperties.getExtension().getTransactional(), producerProperties.getExtension().getTransactional(),
instrumentationManager); instrumentationManager);
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());