mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Change the class type of nameServer
This commit is contained in:
parent
aba89ed5ba
commit
1368a1aa84
@ -16,9 +16,13 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@ -38,7 +42,7 @@ public final class RocketMQBinderUtils {
|
||||
result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
||||
}
|
||||
else {
|
||||
result.setNameServer(rocketMQProperties.getNameServer());
|
||||
result.setNameServer(Arrays.asList(rocketMQProperties.getNameServer()));
|
||||
}
|
||||
if (rocketMQProperties.getProducer() == null
|
||||
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
|
||||
@ -74,4 +78,11 @@ public final class RocketMQBinderUtils {
|
||||
return result;
|
||||
}
|
||||
|
||||
public static String getNameServerStr(List<String> nameServerList) {
|
||||
if (CollectionUtils.isEmpty(nameServerList)) {
|
||||
return null;
|
||||
}
|
||||
return String.join(";", nameServerList);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -144,7 +144,8 @@ public class RocketMQMessageChannelBinder extends
|
||||
producer.setVipChannelEnabled(
|
||||
producerProperties.getExtension().getVipChannelEnabled());
|
||||
}
|
||||
producer.setNamesrvAddr(mergedProperties.getNameServer());
|
||||
producer.setNamesrvAddr(RocketMQBinderUtils
|
||||
.getNameServerStr(mergedProperties.getNameServer()));
|
||||
producer.setSendMsgTimeout(
|
||||
producerProperties.getExtension().getSendMessageTimeout());
|
||||
producer.setRetryTimesWhenSendFailed(
|
||||
|
@ -19,6 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
@ -83,7 +84,7 @@ public class RocketMQListenerBindingContainer
|
||||
*/
|
||||
private int delayLevelWhenNextConsume = 0;
|
||||
|
||||
private String nameServer;
|
||||
private List<String> nameServer;
|
||||
|
||||
private String consumerGroup;
|
||||
|
||||
@ -233,7 +234,7 @@ public class RocketMQListenerBindingContainer
|
||||
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
|
||||
}
|
||||
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
|
||||
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
|
||||
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
|
||||
|
||||
@ -304,11 +305,11 @@ public class RocketMQListenerBindingContainer
|
||||
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public String getNameServer() {
|
||||
public List<String> getNameServer() {
|
||||
return nameServer;
|
||||
}
|
||||
|
||||
public void setNameServer(String nameServer) {
|
||||
public void setNameServer(List<String> nameServer) {
|
||||
this.nameServer = nameServer;
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
@ -103,8 +104,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
|
||||
}
|
||||
try {
|
||||
consumer = new DefaultMQPullConsumer(group);
|
||||
consumer.setNamesrvAddr(
|
||||
rocketMQBinderConfigurationProperties.getNameServer());
|
||||
consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(
|
||||
rocketMQBinderConfigurationProperties.getNameServer()));
|
||||
consumer.setConsumerPullTimeoutMillis(
|
||||
rocketMQConsumerProperties.getExtension().getPullTimeout());
|
||||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||
|
@ -16,28 +16,26 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import javax.validation.constraints.Pattern;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
||||
@Validated
|
||||
public class RocketMQBinderConfigurationProperties {
|
||||
|
||||
/**
|
||||
* The name server for rocketMQ, formats: `host:port;host:port`.
|
||||
* The name server list for rocketMQ.
|
||||
*/
|
||||
@Pattern(regexp = "^[\\d.:;]+$",
|
||||
message = "nameServer needs to match expression \"host:port;host:port\"")
|
||||
private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
|
||||
private List<String> nameServer = Arrays
|
||||
.asList(RocketMQBinderConstants.DEFAULT_NAME_SERVER);
|
||||
|
||||
/**
|
||||
* The property of "access-key".
|
||||
@ -60,11 +58,11 @@ public class RocketMQBinderConfigurationProperties {
|
||||
*/
|
||||
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
|
||||
|
||||
public String getNameServer() {
|
||||
public List<String> getNameServer() {
|
||||
return nameServer;
|
||||
}
|
||||
|
||||
public void setNameServer(String nameServer) {
|
||||
public void setNameServer(List<String> nameServer) {
|
||||
this.nameServer = nameServer;
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,8 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
@ -35,7 +37,8 @@ public class RocketMQAutoConfigurationTests {
|
||||
.withConfiguration(
|
||||
AutoConfigurations.of(RocketMQBinderAutoConfiguration.class))
|
||||
.withPropertyValues(
|
||||
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876",
|
||||
"spring.cloud.stream.rocketmq.binder.name-server[0]=127.0.0.1:9876",
|
||||
"spring.cloud.stream.rocketmq.binder.name-server[1]=127.0.0.1:9877",
|
||||
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
|
||||
"spring.cloud.stream.bindings.output.content-type=application/json",
|
||||
"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
|
||||
@ -55,7 +58,7 @@ public class RocketMQAutoConfigurationTests {
|
||||
RocketMQBinderConfigurationProperties binderConfigurationProperties = context
|
||||
.getBean(RocketMQBinderConfigurationProperties.class);
|
||||
assertThat(binderConfigurationProperties.getNameServer())
|
||||
.isEqualTo("127.0.0.1:9876");
|
||||
.isEqualTo(Arrays.asList("127.0.0.1:9876", "127.0.0.1:9877"));
|
||||
RocketMQExtendedBindingProperties bindingProperties = context
|
||||
.getBean(RocketMQExtendedBindingProperties.class);
|
||||
assertThat(
|
||||
|
Loading…
x
Reference in New Issue
Block a user