diff --git a/pom.xml b/pom.xml index 07b9b74b..d67aaaf9 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ spring-cloud-alibaba-nacos-config spring-cloud-alibaba-nacos-discovery spring-cloud-alibaba-fescar - + spring-cloud-stream-binder-rocketmq spring-cloud-alibaba-nacos-config-server spring-cloud-alicloud-context diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 8f8cf458..00822ee6 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -305,6 +305,11 @@ spring-cloud-stream-binder-rocketmq ${project.version} + + org.springframework.cloud + spring-cloud-alibaba-dubbo + ${project.version} + diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index a0af5c8e..aa6c79e2 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -33,10 +33,10 @@ fescar-example/storage-service fescar-example/account-service acm-example/acm-local-example - - + rocketmq-example/rocketmq-consume-example + rocketmq-example/rocketmq-produce-example sms-example - + spring-cloud-bus-rocketmq-example schedulerx-example/schedulerx-simple-task-example diff --git a/spring-cloud-starter-alibaba/pom.xml b/spring-cloud-starter-alibaba/pom.xml index 3894da9d..55f2d696 100644 --- a/spring-cloud-starter-alibaba/pom.xml +++ b/spring-cloud-starter-alibaba/pom.xml @@ -17,8 +17,8 @@ spring-cloud-starter-alibaba-nacos-discovery spring-cloud-starter-alibaba-sentinel spring-cloud-starter-alibaba-fescar - - + spring-cloud-starter-stream-rocketmq + spring-cloud-starter-bus-rocketmq \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 38776db5..0305fcfb 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -27,7 +27,6 @@ import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; -import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; @@ -57,8 +56,7 @@ public class RocketMQMessageChannelBinder extends implements ExtendedPropertiesBinder { - private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); - + private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQProperties rocketMQProperties; private final InstrumentationManager instrumentationManager; @@ -71,10 +69,10 @@ public class RocketMQMessageChannelBinder extends RocketMQProperties rocketMQProperties, InstrumentationManager instrumentationManager) { super(null, provisioningProvider); - this.extendedBindingProperties = extendedBindingProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketMQProperties = rocketMQProperties; this.instrumentationManager = instrumentationManager; + this.extendedBindingProperties = extendedBindingProperties; } @Override @@ -83,7 +81,7 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - // if producerGroup is empty, using destination + // if producerGroup is empty, using destination String extendedProducerGroup = producerProperties.getExtension().getGroup(); String producerGroup = StringUtils.isEmpty(extendedProducerGroup) ? destination.getName() @@ -220,19 +218,4 @@ public class RocketMQMessageChannelBinder extends return topicInUse; } - @Override - public String getDefaultsPrefix() { - return extendedBindingProperties.getDefaultsPrefix(); - } - - @Override - public Class getExtendedPropertiesEntryClass() { - return extendedBindingProperties.getExtendedPropertiesEntryClass(); - } - - public void setExtendedBindingProperties( - RocketMQExtendedBindingProperties extendedBindingProperties) { - this.extendedBindingProperties = extendedBindingProperties; - } - } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index aa7ca366..6dc28442 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -68,7 +68,6 @@ public class RocketMQBinderAutoConfiguration { provisioningProvider, extendedBindingProperties, rocketBinderConfigurationProperties, rocketMQProperties, instrumentationManager); - binder.setExtendedBindingProperties(extendedBindingProperties); return binder; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java index 6d1146e9..094dafd1 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java @@ -16,19 +16,16 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; -import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; - /** * @author Timur Valiev * @author Jim */ -public class RocketMQBindingProperties implements BinderSpecificPropertiesProvider { +public class RocketMQBindingProperties { private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties(); private RocketMQProducerProperties producer = new RocketMQProducerProperties(); - @Override public RocketMQConsumerProperties getConsumer() { return consumer; } @@ -37,7 +34,6 @@ public class RocketMQBindingProperties implements BinderSpecificPropertiesProvid this.consumer = consumer; } - @Override public RocketMQProducerProperties getProducer() { return producer; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java index 659a350e..b016f693 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java @@ -16,27 +16,71 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; +import java.util.HashMap; +import java.util.Map; + import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties; -import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; +import org.springframework.cloud.stream.binder.ExtendedBindingProperties; /** * @author Timur Valiev * @author Jim */ @ConfigurationProperties("spring.cloud.stream.rocketmq") -public class RocketMQExtendedBindingProperties extends - AbstractExtendedBindingProperties { +public class RocketMQExtendedBindingProperties implements + ExtendedBindingProperties { - private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default"; + private Map bindings = new HashMap<>(); - @Override - public String getDefaultsPrefix() { - return DEFAULTS_PREFIX; + public Map getBindings() { + return this.bindings; + } + + public void setBindings(Map bindings) { + this.bindings = bindings; } @Override - public Class getExtendedPropertiesEntryClass() { - return RocketMQBindingProperties.class; + public synchronized RocketMQConsumerProperties getExtendedConsumerProperties( + String channelName) { + if (bindings.containsKey(channelName)) { + if (bindings.get(channelName).getConsumer() != null) { + return bindings.get(channelName).getConsumer(); + } + else { + RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); + this.bindings.get(channelName).setConsumer(properties); + return properties; + } + } + else { + RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); + RocketMQBindingProperties rbp = new RocketMQBindingProperties(); + rbp.setConsumer(properties); + bindings.put(channelName, rbp); + return properties; + } + } + + @Override + public synchronized RocketMQProducerProperties getExtendedProducerProperties( + String channelName) { + if (bindings.containsKey(channelName)) { + if (bindings.get(channelName).getProducer() != null) { + return bindings.get(channelName).getProducer(); + } + else { + RocketMQProducerProperties properties = new RocketMQProducerProperties(); + this.bindings.get(channelName).setProducer(properties); + return properties; + } + } + else { + RocketMQProducerProperties properties = new RocketMQProducerProperties(); + RocketMQBindingProperties rbp = new RocketMQBindingProperties(); + rbp.setProducer(properties); + bindings.put(channelName, rbp); + return properties; + } } }