From 585a6494634af1a38f3b09fce013e4147ac0bc18 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 14 Mar 2019 20:25:23 +0800 Subject: [PATCH] add default broadcasting in spring-cloud-starter-bus-rocketmq --- .../RocketMQBusEnvironmentPostProcessor.java | 131 ++++++++++-------- 1 file changed, 71 insertions(+), 60 deletions(-) diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java b/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java index 384db727..093dce41 100644 --- a/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java +++ b/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java @@ -16,6 +16,11 @@ */ package org.springframework.cloud.bus.rocketmq.env; +import static org.springframework.cloud.bus.SpringCloudBusClient.INPUT; + +import java.util.HashMap; +import java.util.Map; + import org.springframework.boot.SpringApplication; import org.springframework.boot.env.EnvironmentPostProcessor; import org.springframework.cloud.bus.BusEnvironmentPostProcessor; @@ -25,82 +30,88 @@ import org.springframework.core.env.MapPropertySource; import org.springframework.core.env.MutablePropertySources; import org.springframework.core.env.PropertySource; -import java.util.HashMap; -import java.util.Map; - -import static org.springframework.cloud.bus.SpringCloudBusClient.INPUT; - /** - * The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus Properties that will be - * appended into {@link SpringApplication#defaultProperties} + * The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus + * Properties that will be appended into {@link SpringApplication#defaultProperties} * * @author Mercy * @see BusEnvironmentPostProcessor * @since 0.2.1 */ -public class RocketMQBusEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered { +public class RocketMQBusEnvironmentPostProcessor + implements EnvironmentPostProcessor, Ordered { - /** - * The name of {@link PropertySource} of {@link SpringApplication#defaultProperties} - */ - private static final String PROPERTY_SOURCE_NAME = "defaultProperties"; + /** + * The name of {@link PropertySource} of {@link SpringApplication#defaultProperties} + */ + private static final String PROPERTY_SOURCE_NAME = "defaultProperties"; - @Override - public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, + SpringApplication application) { - addDefaultPropertySource(environment); + addDefaultPropertySource(environment); - } + } - private void addDefaultPropertySource(ConfigurableEnvironment environment) { + private void addDefaultPropertySource(ConfigurableEnvironment environment) { - Map map = new HashMap(); + Map map = new HashMap(); - configureDefaultProperties(map); + configureDefaultProperties(map); - addOrReplace(environment.getPropertySources(), map); - } + addOrReplace(environment.getPropertySources(), map); + } - private void configureDefaultProperties(Map source) { - // Required Properties - String groupBindingPropertyName = createBindingPropertyName(INPUT, "group"); - source.put(groupBindingPropertyName, "rocketmq-bus-group"); - } + private void configureDefaultProperties(Map source) { + // Required Properties + String groupBindingPropertyName = createBindingPropertyName(INPUT, "group"); + String broadcastingPropertyName = createRocketMQPropertyName(INPUT, + "broadcasting"); + source.put(groupBindingPropertyName, "rocketmq-bus-group"); + source.put(broadcastingPropertyName, "true"); + } - private String createBindingPropertyName(String channel, String propertyName) { - return "spring.cloud.stream.bindings." + channel + "." + propertyName; - } + private String createRocketMQPropertyName(String channel, String propertyName) { + return "spring.cloud.stream.rocketmq.bindings." + INPUT + ".consumer." + + propertyName; + } - /** - * Copy from {@link BusEnvironmentPostProcessor#addOrReplace(MutablePropertySources, Map)} - * - * @param propertySources {@link MutablePropertySources} - * @param map Default RocketMQ Bus Properties - */ - private void addOrReplace(MutablePropertySources propertySources, - Map map) { - MapPropertySource target = null; - if (propertySources.contains(PROPERTY_SOURCE_NAME)) { - PropertySource source = propertySources.get(PROPERTY_SOURCE_NAME); - if (source instanceof MapPropertySource) { - target = (MapPropertySource) source; - for (String key : map.keySet()) { - if (!target.containsProperty(key)) { - target.getSource().put(key, map.get(key)); - } - } - } - } - if (target == null) { - target = new MapPropertySource(PROPERTY_SOURCE_NAME, map); - } - if (!propertySources.contains(PROPERTY_SOURCE_NAME)) { - propertySources.addLast(target); - } - } + private String createBindingPropertyName(String channel, String propertyName) { + return "spring.cloud.stream.bindings." + channel + "." + propertyName; + } - @Override - public int getOrder() { - return LOWEST_PRECEDENCE; - } + /** + * Copy from + * {@link BusEnvironmentPostProcessor#addOrReplace(MutablePropertySources, Map)} + * + * @param propertySources {@link MutablePropertySources} + * @param map Default RocketMQ Bus Properties + */ + private void addOrReplace(MutablePropertySources propertySources, + Map map) { + MapPropertySource target = null; + if (propertySources.contains(PROPERTY_SOURCE_NAME)) { + PropertySource source = propertySources.get(PROPERTY_SOURCE_NAME); + if (source instanceof MapPropertySource) { + target = (MapPropertySource) source; + for (String key : map.keySet()) { + if (!target.containsProperty(key)) { + target.getSource().put(key, map.get(key)); + } + } + } + } + if (target == null) { + target = new MapPropertySource(PROPERTY_SOURCE_NAME, map); + } + if (!propertySources.contains(PROPERTY_SOURCE_NAME)) { + propertySources.addLast(target); + } + } + + @Override + public int getOrder() { + return LOWEST_PRECEDENCE; + } }