1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

sync rocketmq-binder, rocketmq-bus in finchley branch

This commit is contained in:
fangjian0423 2019-03-25 15:21:08 +08:00
parent c44cdfe721
commit 9ab99e29e3
8 changed files with 69 additions and 42 deletions

View File

@ -96,7 +96,7 @@
<module>spring-cloud-alibaba-nacos-config</module> <module>spring-cloud-alibaba-nacos-config</module>
<module>spring-cloud-alibaba-nacos-discovery</module> <module>spring-cloud-alibaba-nacos-discovery</module>
<module>spring-cloud-alibaba-fescar</module> <module>spring-cloud-alibaba-fescar</module>
<!--<module>spring-cloud-stream-binder-rocketmq</module>--> <module>spring-cloud-stream-binder-rocketmq</module>
<module>spring-cloud-alibaba-nacos-config-server</module> <module>spring-cloud-alibaba-nacos-config-server</module>
<!--<module>spring-cloud-alibaba-dubbo</module>--> <!--<module>spring-cloud-alibaba-dubbo</module>-->
<module>spring-cloud-alicloud-context</module> <module>spring-cloud-alicloud-context</module>

View File

@ -305,6 +305,11 @@
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId> <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-dubbo</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Own dependencies - Starters --> <!-- Own dependencies - Starters -->
<dependency> <dependency>

View File

@ -33,10 +33,10 @@
<module>fescar-example/storage-service</module> <module>fescar-example/storage-service</module>
<module>fescar-example/account-service</module> <module>fescar-example/account-service</module>
<module>acm-example/acm-local-example</module> <module>acm-example/acm-local-example</module>
<!--<module>rocketmq-example/rocketmq-consume-example</module>--> <module>rocketmq-example/rocketmq-consume-example</module>
<!--<module>rocketmq-example/rocketmq-produce-example</module>--> <module>rocketmq-example/rocketmq-produce-example</module>
<module>sms-example</module> <module>sms-example</module>
<!--<module>spring-cloud-bus-rocketmq-example</module>--> <module>spring-cloud-bus-rocketmq-example</module>
<module>schedulerx-example/schedulerx-simple-task-example</module> <module>schedulerx-example/schedulerx-simple-task-example</module>
<!--<module>spring-cloud-alibaba-dubbo-examples</module>--> <!--<module>spring-cloud-alibaba-dubbo-examples</module>-->
</modules> </modules>

View File

@ -17,8 +17,8 @@
<module>spring-cloud-starter-alibaba-nacos-discovery</module> <module>spring-cloud-starter-alibaba-nacos-discovery</module>
<module>spring-cloud-starter-alibaba-sentinel</module> <module>spring-cloud-starter-alibaba-sentinel</module>
<module>spring-cloud-starter-alibaba-fescar</module> <module>spring-cloud-starter-alibaba-fescar</module>
<!--<module>spring-cloud-starter-stream-rocketmq</module>--> <module>spring-cloud-starter-stream-rocketmq</module>
<!--<module>spring-cloud-starter-bus-rocketmq</module>--> <module>spring-cloud-starter-bus-rocketmq</module>
<!--<module>spring-cloud-starter-dubbo</module>--> <!--<module>spring-cloud-starter-dubbo</module>-->
</modules> </modules>
</project> </project>

View File

@ -27,7 +27,6 @@ import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil; import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
@ -57,8 +56,7 @@ public class RocketMQMessageChannelBinder extends
implements implements
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final RocketMQProperties rocketMQProperties; private final RocketMQProperties rocketMQProperties;
private final InstrumentationManager instrumentationManager; private final InstrumentationManager instrumentationManager;
@ -71,10 +69,10 @@ public class RocketMQMessageChannelBinder extends
RocketMQProperties rocketMQProperties, RocketMQProperties rocketMQProperties,
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
super(null, provisioningProvider); super(null, provisioningProvider);
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQProperties = rocketMQProperties; this.rocketMQProperties = rocketMQProperties;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
this.extendedBindingProperties = extendedBindingProperties;
} }
@Override @Override
@ -83,7 +81,7 @@ public class RocketMQMessageChannelBinder extends
MessageChannel errorChannel) throws Exception { MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) { if (producerProperties.getExtension().getEnabled()) {
// if producerGroup is empty, using destination // if producerGroup is empty, using destination
String extendedProducerGroup = producerProperties.getExtension().getGroup(); String extendedProducerGroup = producerProperties.getExtension().getGroup();
String producerGroup = StringUtils.isEmpty(extendedProducerGroup) String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
? destination.getName() ? destination.getName()
@ -220,19 +218,4 @@ public class RocketMQMessageChannelBinder extends
return topicInUse; return topicInUse;
} }
@Override
public String getDefaultsPrefix() {
return extendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return extendedBindingProperties.getExtendedPropertiesEntryClass();
}
public void setExtendedBindingProperties(
RocketMQExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties;
}
} }

View File

@ -68,7 +68,6 @@ public class RocketMQBinderAutoConfiguration {
provisioningProvider, extendedBindingProperties, provisioningProvider, extendedBindingProperties,
rocketBinderConfigurationProperties, rocketMQProperties, rocketBinderConfigurationProperties, rocketMQProperties,
instrumentationManager); instrumentationManager);
binder.setExtendedBindingProperties(extendedBindingProperties);
return binder; return binder;
} }

View File

@ -16,19 +16,16 @@
package org.springframework.cloud.stream.binder.rocketmq.properties; package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQBindingProperties implements BinderSpecificPropertiesProvider { public class RocketMQBindingProperties {
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties(); private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
private RocketMQProducerProperties producer = new RocketMQProducerProperties(); private RocketMQProducerProperties producer = new RocketMQProducerProperties();
@Override
public RocketMQConsumerProperties getConsumer() { public RocketMQConsumerProperties getConsumer() {
return consumer; return consumer;
} }
@ -37,7 +34,6 @@ public class RocketMQBindingProperties implements BinderSpecificPropertiesProvid
this.consumer = consumer; this.consumer = consumer;
} }
@Override
public RocketMQProducerProperties getProducer() { public RocketMQProducerProperties getProducer() {
return producer; return producer;
} }

View File

@ -16,27 +16,71 @@
package org.springframework.cloud.stream.binder.rocketmq.properties; package org.springframework.cloud.stream.binder.rocketmq.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties; import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@ConfigurationProperties("spring.cloud.stream.rocketmq") @ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties extends public class RocketMQExtendedBindingProperties implements
AbstractExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties, RocketMQBindingProperties> { ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default"; private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
@Override public Map<String, RocketMQBindingProperties> getBindings() {
public String getDefaultsPrefix() { return this.bindings;
return DEFAULTS_PREFIX; }
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
this.bindings = bindings;
} }
@Override @Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() { public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(
return RocketMQBindingProperties.class; String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getConsumer() != null) {
return bindings.get(channelName).getConsumer();
}
else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
this.bindings.get(channelName).setConsumer(properties);
return properties;
}
}
else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setConsumer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
@Override
public synchronized RocketMQProducerProperties getExtendedProducerProperties(
String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getProducer() != null) {
return bindings.get(channelName).getProducer();
}
else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
this.bindings.get(channelName).setProducer(properties);
return properties;
}
}
else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setProducer(properties);
bindings.put(channelName, rbp);
return properties;
}
} }
} }