diff --git a/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/RocketMQBusApplication.java b/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/RocketMQBusApplication.java index c4a05a8b..60299a22 100644 --- a/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/RocketMQBusApplication.java +++ b/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/RocketMQBusApplication.java @@ -16,8 +16,6 @@ */ package org.springframework.cloud.alibaba.cloud.examples.rocketmq; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -30,6 +28,9 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * RocketMQ Bus Spring Application * @@ -41,54 +42,61 @@ import org.springframework.web.bind.annotation.RestController; @RemoteApplicationEventScan(basePackages = "org.springframework.cloud.alibaba.cloud.examples.rocketmq") public class RocketMQBusApplication { - public static void main(String[] args) { - new SpringApplicationBuilder(RocketMQBusApplication.class) - .properties("server.port=0") // Random server port - .properties("management.endpoints.web.exposure.include=*") // exposure includes all - .properties("spring.cloud.bus.trace.enabled=true") // Enable trace - .run(args); - } + public static void main(String[] args) { + new SpringApplicationBuilder(RocketMQBusApplication.class) + .properties("server.port=0") // Random server port + .properties("management.endpoints.web.exposure.include=*") // exposure + // includes + // all + .properties("spring.cloud.bus.trace.enabled=true") // Enable trace + .run(args); + } - @Autowired - private ApplicationEventPublisher publisher; + @Autowired + private ApplicationEventPublisher publisher; - @Value("${spring.cloud.bus.id}") - private String originService; + @Value("${spring.cloud.bus.id}") + private String originService; - @Value("${server.port}") - private int localServerPort; + @Value("${server.port}") + private int localServerPort; - @Autowired - private ObjectMapper objectMapper; + @Autowired + private ObjectMapper objectMapper; - /** - * Publish the {@link UserRemoteApplicationEvent} - * - * @param name the user name - * @param destination the destination - * @return If published - */ - @GetMapping("/bus/event/publish/user") - public boolean publish(@RequestParam String name, @RequestParam(required = false) String destination) { - User user = new User(); - user.setId(System.currentTimeMillis()); - user.setName(name); - publisher.publishEvent(new UserRemoteApplicationEvent(user, originService, destination)); - return true; - } + /** + * Publish the {@link UserRemoteApplicationEvent} + * + * @param name the user name + * @param destination the destination + * @return If published + */ + @GetMapping("/bus/event/publish/user") + public boolean publish(@RequestParam String name, + @RequestParam(required = false) String destination) { + User user = new User(); + user.setId(System.currentTimeMillis()); + user.setName(name); + publisher.publishEvent( + new UserRemoteApplicationEvent(this, user, originService, destination)); + return true; + } - /** - * Listener on the {@link UserRemoteApplicationEvent} - * - * @param event {@link UserRemoteApplicationEvent} - */ - @EventListener - public void onEvent(UserRemoteApplicationEvent event) { - System.out.printf("Server [port : %d] listeners on %s\n", localServerPort, event.getUser()); - } + /** + * Listener on the {@link UserRemoteApplicationEvent} + * + * @param event {@link UserRemoteApplicationEvent} + */ + @EventListener + public void onEvent(UserRemoteApplicationEvent event) { + System.out.printf("Server [port : %d] listeners on %s\n", localServerPort, + event.getUser()); + } - @EventListener - public void onAckEvent(AckRemoteApplicationEvent event) throws JsonProcessingException { - System.out.printf("Server [port : %d] listeners on %s\n", localServerPort, objectMapper.writeValueAsString(event)); - } + @EventListener + public void onAckEvent(AckRemoteApplicationEvent event) + throws JsonProcessingException { + System.out.printf("Server [port : %d] listeners on %s\n", localServerPort, + objectMapper.writeValueAsString(event)); + } } diff --git a/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/UserRemoteApplicationEvent.java b/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/UserRemoteApplicationEvent.java index 919e00ea..fa404e89 100644 --- a/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/UserRemoteApplicationEvent.java +++ b/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/rocketmq/UserRemoteApplicationEvent.java @@ -26,12 +26,22 @@ import org.springframework.cloud.bus.event.RemoteApplicationEvent; */ public class UserRemoteApplicationEvent extends RemoteApplicationEvent { - public UserRemoteApplicationEvent(User user, String originService, + private User user; + + public UserRemoteApplicationEvent() { + } + + public UserRemoteApplicationEvent(Object source, User user, String originService, String destinationService) { - super(user, originService, destinationService); + super(source, originService, destinationService); + this.user = user; + } + + public void setUser(User user) { + this.user = user; } public User getUser() { - return (User) getSource(); + return user; } } diff --git a/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/resources/bootstrap.properties b/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/resources/bootstrap.properties index 1b944de9..9e36d301 100644 --- a/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/resources/bootstrap.properties +++ b/spring-cloud-alibaba-examples/spring-cloud-bus-rocketmq-example/src/main/resources/bootstrap.properties @@ -1,4 +1,4 @@ spring.application.name=spring-cloud-bus-rocketmq-example -spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 server.port=8080 spring.cloud.bus.id=${spring.application.name}:${server.port} \ No newline at end of file diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java b/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java index 384db727..093dce41 100644 --- a/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java +++ b/spring-cloud-starter-alibaba/spring-cloud-starter-bus-rocketmq/src/main/java/org/springframework/cloud/bus/rocketmq/env/RocketMQBusEnvironmentPostProcessor.java @@ -16,6 +16,11 @@ */ package org.springframework.cloud.bus.rocketmq.env; +import static org.springframework.cloud.bus.SpringCloudBusClient.INPUT; + +import java.util.HashMap; +import java.util.Map; + import org.springframework.boot.SpringApplication; import org.springframework.boot.env.EnvironmentPostProcessor; import org.springframework.cloud.bus.BusEnvironmentPostProcessor; @@ -25,82 +30,88 @@ import org.springframework.core.env.MapPropertySource; import org.springframework.core.env.MutablePropertySources; import org.springframework.core.env.PropertySource; -import java.util.HashMap; -import java.util.Map; - -import static org.springframework.cloud.bus.SpringCloudBusClient.INPUT; - /** - * The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus Properties that will be - * appended into {@link SpringApplication#defaultProperties} + * The lowest precedence {@link EnvironmentPostProcessor} configures default RocketMQ Bus + * Properties that will be appended into {@link SpringApplication#defaultProperties} * * @author Mercy * @see BusEnvironmentPostProcessor * @since 0.2.1 */ -public class RocketMQBusEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered { +public class RocketMQBusEnvironmentPostProcessor + implements EnvironmentPostProcessor, Ordered { - /** - * The name of {@link PropertySource} of {@link SpringApplication#defaultProperties} - */ - private static final String PROPERTY_SOURCE_NAME = "defaultProperties"; + /** + * The name of {@link PropertySource} of {@link SpringApplication#defaultProperties} + */ + private static final String PROPERTY_SOURCE_NAME = "defaultProperties"; - @Override - public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, + SpringApplication application) { - addDefaultPropertySource(environment); + addDefaultPropertySource(environment); - } + } - private void addDefaultPropertySource(ConfigurableEnvironment environment) { + private void addDefaultPropertySource(ConfigurableEnvironment environment) { - Map map = new HashMap(); + Map map = new HashMap(); - configureDefaultProperties(map); + configureDefaultProperties(map); - addOrReplace(environment.getPropertySources(), map); - } + addOrReplace(environment.getPropertySources(), map); + } - private void configureDefaultProperties(Map source) { - // Required Properties - String groupBindingPropertyName = createBindingPropertyName(INPUT, "group"); - source.put(groupBindingPropertyName, "rocketmq-bus-group"); - } + private void configureDefaultProperties(Map source) { + // Required Properties + String groupBindingPropertyName = createBindingPropertyName(INPUT, "group"); + String broadcastingPropertyName = createRocketMQPropertyName(INPUT, + "broadcasting"); + source.put(groupBindingPropertyName, "rocketmq-bus-group"); + source.put(broadcastingPropertyName, "true"); + } - private String createBindingPropertyName(String channel, String propertyName) { - return "spring.cloud.stream.bindings." + channel + "." + propertyName; - } + private String createRocketMQPropertyName(String channel, String propertyName) { + return "spring.cloud.stream.rocketmq.bindings." + INPUT + ".consumer." + + propertyName; + } - /** - * Copy from {@link BusEnvironmentPostProcessor#addOrReplace(MutablePropertySources, Map)} - * - * @param propertySources {@link MutablePropertySources} - * @param map Default RocketMQ Bus Properties - */ - private void addOrReplace(MutablePropertySources propertySources, - Map map) { - MapPropertySource target = null; - if (propertySources.contains(PROPERTY_SOURCE_NAME)) { - PropertySource source = propertySources.get(PROPERTY_SOURCE_NAME); - if (source instanceof MapPropertySource) { - target = (MapPropertySource) source; - for (String key : map.keySet()) { - if (!target.containsProperty(key)) { - target.getSource().put(key, map.get(key)); - } - } - } - } - if (target == null) { - target = new MapPropertySource(PROPERTY_SOURCE_NAME, map); - } - if (!propertySources.contains(PROPERTY_SOURCE_NAME)) { - propertySources.addLast(target); - } - } + private String createBindingPropertyName(String channel, String propertyName) { + return "spring.cloud.stream.bindings." + channel + "." + propertyName; + } - @Override - public int getOrder() { - return LOWEST_PRECEDENCE; - } + /** + * Copy from + * {@link BusEnvironmentPostProcessor#addOrReplace(MutablePropertySources, Map)} + * + * @param propertySources {@link MutablePropertySources} + * @param map Default RocketMQ Bus Properties + */ + private void addOrReplace(MutablePropertySources propertySources, + Map map) { + MapPropertySource target = null; + if (propertySources.contains(PROPERTY_SOURCE_NAME)) { + PropertySource source = propertySources.get(PROPERTY_SOURCE_NAME); + if (source instanceof MapPropertySource) { + target = (MapPropertySource) source; + for (String key : map.keySet()) { + if (!target.containsProperty(key)) { + target.getSource().put(key, map.get(key)); + } + } + } + } + if (target == null) { + target = new MapPropertySource(PROPERTY_SOURCE_NAME, map); + } + if (!propertySources.contains(PROPERTY_SOURCE_NAME)) { + propertySources.addLast(target); + } + } + + @Override + public int getOrder() { + return LOWEST_PRECEDENCE; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 152ccd31..38776db5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -83,6 +83,12 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { + // if producerGroup is empty, using destination + String extendedProducerGroup = producerProperties.getExtension().getGroup(); + String producerGroup = StringUtils.isEmpty(extendedProducerGroup) + ? destination.getName() + : extendedProducerGroup; + RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils .mergeProperties(rocketBinderConfigurationProperties, rocketMQProperties); @@ -111,8 +117,7 @@ public class RocketMQMessageChannelBinder extends if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { RPCHook rpcHook = new AclClientRPCHook( new SessionCredentials(ak, sk)); - producer = new DefaultMQProducer( - producerProperties.getExtension().getGroup(), rpcHook, + producer = new DefaultMQProducer(producerGroup, rpcHook, mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); @@ -120,8 +125,7 @@ public class RocketMQMessageChannelBinder extends RocketMQUtil.getInstanceName(rpcHook, destination.getName())); } else { - producer = new DefaultMQProducer( - producerProperties.getExtension().getGroup()); + producer = new DefaultMQProducer(producerGroup); producer.setVipChannelEnabled( producerProperties.getExtension().getVipChannelEnabled()); } @@ -142,8 +146,7 @@ public class RocketMQMessageChannelBinder extends } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( - rocketMQTemplate, destination.getName(), - producerProperties.getExtension().getGroup(), + rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), instrumentationManager); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());