diff --git a/pom.xml b/pom.xml index cdcc2d3c..bffa398f 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ org.springframework.cloud spring-cloud-build - 2.0.4.RELEASE + 2.1.2.RELEASE @@ -66,10 +66,10 @@ - 2.0.2.RELEASE - 2.0.2.RELEASE - 2.0.2.RELEASE - 2.0.0.RELEASE + 2.1.0.RELEASE + 2.1.0.RELEASE + 2.1.0.RELEASE + 2.1.0.RELEASE 4.12 3.0 diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 2eda46bb..f1fc6bfd 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -6,7 +6,7 @@ spring-cloud-dependencies-parent org.springframework.cloud - 2.0.4.RELEASE + 2.1.2.RELEASE diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 1321c84b..b85df2a0 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; @@ -53,19 +54,18 @@ public class RocketMQMessageChannelBinder extends private static final Logger logger = LoggerFactory .getLogger(RocketMQMessageChannelBinder.class); - private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final InstrumentationManager instrumentationManager; private final ConsumersManager consumersManager; + private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); + public RocketMQMessageChannelBinder(ConsumersManager consumersManager, - RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQTopicProvisioner provisioningProvider, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, InstrumentationManager instrumentationManager) { super(null, provisioningProvider); this.consumersManager = consumersManager; - this.extendedBindingProperties = extendedBindingProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.instrumentationManager = instrumentationManager; } @@ -143,6 +143,16 @@ public class RocketMQMessageChannelBinder extends return extendedBindingProperties.getExtendedProducerProperties(channelName); } + @Override + public String getDefaultsPrefix() { + return extendedBindingProperties.getDefaultsPrefix(); + } + + @Override + public Class getExtendedPropertiesEntryClass() { + return extendedBindingProperties.getExtendedPropertiesEntryClass(); + } + private T getClassConfiguration(String destName, String className, Class interfaceClass) { if (StringUtils.isEmpty(className)) { @@ -184,4 +194,9 @@ public class RocketMQMessageChannelBinder extends return null; } + public void setExtendedBindingProperties( + RocketMQExtendedBindingProperties extendedBindingProperties) { + this.extendedBindingProperties = extendedBindingProperties; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index 95abaeeb..ede8b259 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -64,8 +64,9 @@ public class RocketMQBinderAutoConfiguration { RocketMQTopicProvisioner provisioningProvider, ConsumersManager consumersManager) { RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( - consumersManager, extendedBindingProperties, provisioningProvider, + consumersManager, provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager); + binder.setExtendedBindingProperties(extendedBindingProperties); return binder; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java index 094dafd1..6d1146e9 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java @@ -16,16 +16,19 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; + /** * @author Timur Valiev * @author Jim */ -public class RocketMQBindingProperties { +public class RocketMQBindingProperties implements BinderSpecificPropertiesProvider { private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties(); private RocketMQProducerProperties producer = new RocketMQProducerProperties(); + @Override public RocketMQConsumerProperties getConsumer() { return consumer; } @@ -34,6 +37,7 @@ public class RocketMQBindingProperties { this.consumer = consumer; } + @Override public RocketMQProducerProperties getProducer() { return producer; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java index b016f693..659a350e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java @@ -16,71 +16,27 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; -import java.util.HashMap; -import java.util.Map; - import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.cloud.stream.binder.ExtendedBindingProperties; +import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; /** * @author Timur Valiev * @author Jim */ @ConfigurationProperties("spring.cloud.stream.rocketmq") -public class RocketMQExtendedBindingProperties implements - ExtendedBindingProperties { +public class RocketMQExtendedBindingProperties extends + AbstractExtendedBindingProperties { - private Map bindings = new HashMap<>(); + private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default"; - public Map getBindings() { - return this.bindings; - } - - public void setBindings(Map bindings) { - this.bindings = bindings; + @Override + public String getDefaultsPrefix() { + return DEFAULTS_PREFIX; } @Override - public synchronized RocketMQConsumerProperties getExtendedConsumerProperties( - String channelName) { - if (bindings.containsKey(channelName)) { - if (bindings.get(channelName).getConsumer() != null) { - return bindings.get(channelName).getConsumer(); - } - else { - RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); - this.bindings.get(channelName).setConsumer(properties); - return properties; - } - } - else { - RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); - RocketMQBindingProperties rbp = new RocketMQBindingProperties(); - rbp.setConsumer(properties); - bindings.put(channelName, rbp); - return properties; - } - } - - @Override - public synchronized RocketMQProducerProperties getExtendedProducerProperties( - String channelName) { - if (bindings.containsKey(channelName)) { - if (bindings.get(channelName).getProducer() != null) { - return bindings.get(channelName).getProducer(); - } - else { - RocketMQProducerProperties properties = new RocketMQProducerProperties(); - this.bindings.get(channelName).setProducer(properties); - return properties; - } - } - else { - RocketMQProducerProperties properties = new RocketMQProducerProperties(); - RocketMQBindingProperties rbp = new RocketMQBindingProperties(); - rbp.setProducer(properties); - bindings.put(channelName, rbp); - return properties; - } + public Class getExtendedPropertiesEntryClass() { + return RocketMQBindingProperties.class; } }