mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
resolve conflict
This commit is contained in:
@@ -23,7 +23,10 @@ import java.util.Set;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
@@ -52,16 +55,19 @@ public class RocketMQMessageChannelBinder extends
|
||||
implements
|
||||
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
|
||||
private final RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(RocketMQMessageChannelBinder.class);
|
||||
|
||||
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties();
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
private Set<String> clientConfigId = new HashSet<>();
|
||||
private Map<String, String> topicInUse = new HashMap<>();
|
||||
|
||||
public RocketMQMessageChannelBinder(
|
||||
public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties,
|
||||
RocketMQTopicProvisioner provisioningProvider,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
super(null, provisioningProvider);
|
||||
@@ -145,7 +151,7 @@ public class RocketMQMessageChannelBinder extends
|
||||
}
|
||||
|
||||
RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(
|
||||
consumerProperties, this);
|
||||
consumerProperties, rocketBinderConfigurationProperties, this);
|
||||
listenerContainer.setConsumerGroup(group);
|
||||
listenerContainer.setTopic(destination.getName());
|
||||
listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
|
||||
@@ -194,4 +200,20 @@ public class RocketMQMessageChannelBinder extends
|
||||
public Map<String, String> getTopicInUse() {
|
||||
return topicInUse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDefaultsPrefix() {
|
||||
return extendedBindingProperties.getDefaultsPrefix();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return extendedBindingProperties.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
public void setExtendedBindingProperties(
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties) {
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -66,8 +66,9 @@ public class RocketMQBinderAutoConfiguration {
|
||||
RocketMQTopicProvisioner provisioningProvider,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
|
||||
extendedBindingProperties, provisioningProvider,
|
||||
provisioningProvider, extendedBindingProperties,
|
||||
rocketBinderConfigurationProperties, instrumentationManager);
|
||||
binder.setExtendedBindingProperties(extendedBindingProperties);
|
||||
return binder;
|
||||
}
|
||||
|
||||
|
@@ -16,16 +16,19 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQBindingProperties {
|
||||
public class RocketMQBindingProperties implements BinderSpecificPropertiesProvider {
|
||||
|
||||
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
|
||||
|
||||
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
|
||||
|
||||
@Override
|
||||
public RocketMQConsumerProperties getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
@@ -34,6 +37,7 @@ public class RocketMQBindingProperties {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocketMQProducerProperties getProducer() {
|
||||
return producer;
|
||||
}
|
||||
|
@@ -16,71 +16,27 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties("spring.cloud.stream.rocketmq")
|
||||
public class RocketMQExtendedBindingProperties implements
|
||||
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
public class RocketMQExtendedBindingProperties extends
|
||||
AbstractExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties, RocketMQBindingProperties> {
|
||||
|
||||
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
|
||||
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default";
|
||||
|
||||
public Map<String, RocketMQBindingProperties> getBindings() {
|
||||
return this.bindings;
|
||||
}
|
||||
|
||||
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
|
||||
this.bindings = bindings;
|
||||
@Override
|
||||
public String getDefaultsPrefix() {
|
||||
return DEFAULTS_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(
|
||||
String channelName) {
|
||||
if (bindings.containsKey(channelName)) {
|
||||
if (bindings.get(channelName).getConsumer() != null) {
|
||||
return bindings.get(channelName).getConsumer();
|
||||
}
|
||||
else {
|
||||
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
|
||||
this.bindings.get(channelName).setConsumer(properties);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
else {
|
||||
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
|
||||
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
|
||||
rbp.setConsumer(properties);
|
||||
bindings.put(channelName, rbp);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized RocketMQProducerProperties getExtendedProducerProperties(
|
||||
String channelName) {
|
||||
if (bindings.containsKey(channelName)) {
|
||||
if (bindings.get(channelName).getProducer() != null) {
|
||||
return bindings.get(channelName).getProducer();
|
||||
}
|
||||
else {
|
||||
RocketMQProducerProperties properties = new RocketMQProducerProperties();
|
||||
this.bindings.get(channelName).setProducer(properties);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
else {
|
||||
RocketMQProducerProperties properties = new RocketMQProducerProperties();
|
||||
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
|
||||
rbp.setProducer(properties);
|
||||
bindings.put(channelName, rbp);
|
||||
return properties;
|
||||
}
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return RocketMQBindingProperties.class;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user