diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index fddd8639..8b392808 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -27,7 +27,7 @@ 4.4.1 1.0.5 2.44.0 - 2.0.2 + 2.0.3 2.1.6 2.7.1 2.7.1 diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java index b50e799d..dfedec53 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.util.StringUtils; @@ -66,6 +67,14 @@ public class RocketMQBinderUtils { result.setEnableMsgTrace( rocketBinderConfigurationProperties.isEnableMsgTrace()); } + if (!StringUtils.isEmpty(rocketMQProperties.getAccessChannel())) { + result.setAccessChannel( + AccessChannel.valueOf(rocketMQProperties.getAccessChannel())); + } + else { + result.setAccessChannel( + rocketBinderConfigurationProperties.getAccessChannel()); + } return result; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 30eb3760..4ff34f53 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -134,6 +134,11 @@ public class RocketMQMessageChannelBinder extends producer.setVipChannelEnabled( producerProperties.getExtension().getVipChannelEnabled()); } + + if (mergedProperties.getAccessChannel() != null) { + producer.setAccessChannel(mergedProperties.getAccessChannel()); + } + producer.setNamesrvAddr(mergedProperties.getNameServer()); producer.setSendMsgTimeout( producerProperties.getExtension().getSendMessageTimeout()); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 35f29ae8..0b105b51 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -215,6 +215,11 @@ public class RocketMQListenerBindingContainer rocketBinderConfigurationProperties.getCustomizedTraceTopic()); } + if (rocketBinderConfigurationProperties.getAccessChannel() != null) { + consumer.setAccessChannel( + rocketBinderConfigurationProperties.getAccessChannel()); + } + consumer.setNamesrvAddr(nameServer); consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index af292e74..d88434e5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.common.MixAll; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; @@ -53,6 +54,11 @@ public class RocketMQBinderConfigurationProperties { */ private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC; + /** + * Enum type for accessChannel, values: LOCAL, CLOUD. + */ + private AccessChannel accessChannel = AccessChannel.LOCAL; + public String getNameServer() { return nameServer; } @@ -92,4 +98,12 @@ public class RocketMQBinderConfigurationProperties { public void setCustomizedTraceTopic(String customizedTraceTopic) { this.customizedTraceTopic = customizedTraceTopic; } + + public AccessChannel getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(AccessChannel accessChannel) { + this.accessChannel = accessChannel; + } }