From 97ddae22fc17e82a637ec0a3fc3860b05425d175 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 28 Feb 2019 16:39:22 +0800 Subject: [PATCH] sync rocketmq binder to 1.x --- spring-cloud-alibaba-examples/pom.xml | 3 +- .../{ => rocketmq-consume-example}/pom.xml | 9 ++- .../cloud/alibaba/cloud/examples/Foo.java | 14 ++-- .../cloud/examples/ReceiveService.java | 0 .../examples/RocketMQConsumerApplication.java | 36 ++++++++++ .../src/main/resources/application.properties | 27 +++---- .../rocketmq-produce-example/pom.xml | 54 ++++++++++++++ .../cloud/alibaba/cloud/examples/Foo.java | 39 ++++++++++ .../examples/RocketMQProduceApplication.java} | 35 +++------ .../alibaba/cloud/examples/SenderService.java | 20 +++--- .../examples/TransactionListenerImpl.java | 15 ++-- .../src/main/resources/application.properties | 20 ++++++ .../binder/rocketmq/RocketMQBinderUtils.java | 72 +++++++++++++++++++ .../RocketMQMessageChannelBinder.java | 44 ++++++++++-- .../RocketMQBinderAutoConfiguration.java | 11 ++- ...etMQComponent4BinderAutoConfiguration.java | 55 ++++++-------- .../RocketMQListenerBindingContainer.java | 35 ++++++++- .../RocketMQInboundChannelAdapter.java | 6 +- .../integration/RocketMQMessageHandler.java | 31 +++++--- ...RocketMQBinderConfigurationProperties.java | 66 ++++++++++++++--- .../RocketMQProducerProperties.java | 17 ++++- .../RocketMQAutoConfigurationTests.java | 2 +- 22 files changed, 473 insertions(+), 138 deletions(-) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/pom.xml (89%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java (77%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java (100%) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/src/main/resources/application.properties (56%) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java rename spring-cloud-alibaba-examples/rocketmq-example/{src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java => rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java} (68%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-produce-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java (71%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-produce-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java (77%) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index f0b76037..553e75df 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -31,7 +31,8 @@ fescar-example/storage-service fescar-example/account-service acm-example/acm-local-example - rocketmq-example + rocketmq-example/rocketmq-consume-example + rocketmq-example/rocketmq-produce-example env-extension sms-example spring-cloud-bus-rocketmq-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml similarity index 89% rename from spring-cloud-alibaba-examples/rocketmq-example/pom.xml rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml index 647c2eac..2c730e24 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml @@ -6,13 +6,14 @@ org.springframework.cloud spring-cloud-alibaba-examples 0.1.2.BUILD-SNAPSHOT + ../../pom.xml 4.0.0 - rocketmq-example + rocketmq-consume-example jar - Example demonstrating how to use rocketmq + Example demonstrating how to use rocketmq consume @@ -20,14 +21,17 @@ org.springframework.cloud spring-cloud-starter-stream-rocketmq + org.springframework.boot spring-boot-starter-web + org.springframework.boot spring-boot-starter-actuator + @@ -48,4 +52,3 @@ - diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java similarity index 77% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java index e98b6a10..6ef37023 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java @@ -24,15 +24,15 @@ public class Foo { this.id = id; } - public String getBar() { - return bar; - } + public String getBar() { + return bar; + } - public void setBar(String bar) { - this.bar = bar; - } + public void setBar(String bar) { + this.bar = bar; + } - @Override + @Override public String toString() { return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}'; } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java similarity index 100% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java new file mode 100644 index 00000000..25204919 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java @@ -0,0 +1,36 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +/** + * @author Jim + */ +@SpringBootApplication +@EnableBinding({ MySink.class }) +public class RocketMQConsumerApplication { + + public interface MySink { + + @Input("input1") + SubscribableChannel input1(); + + @Input("input2") + SubscribableChannel input2(); + + @Input("input3") + SubscribableChannel input3(); + + @Input("input4") + SubscribableChannel input4(); + } + + public static void main(String[] args) { + SpringApplication.run(RocketMQConsumerApplication.class, args); + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties similarity index 56% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties index 753e168d..ec27539c 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties @@ -1,39 +1,32 @@ -spring.cloud.stream.default-binder=rocketmq - -spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 - -spring.cloud.stream.bindings.output1.destination=test-topic -spring.cloud.stream.bindings.output1.content-type=application/json - -spring.cloud.stream.bindings.output2.destination=TransactionTopic -spring.cloud.stream.bindings.output2.content-type=application/json -spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true -spring.cloud.stream.bindings.input1.consumer.maxAttempts=1 spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.content-type=text/plain spring.cloud.stream.bindings.input2.group=test-group2 spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr -spring.cloud.stream.bindings.input2.consumer.concurrency=5 +spring.cloud.stream.bindings.input2.consumer.concurrency=20 +spring.cloud.stream.bindings.input2.consumer.maxAttempts=1 spring.cloud.stream.bindings.input3.destination=test-topic spring.cloud.stream.bindings.input3.content-type=application/json spring.cloud.stream.bindings.input3.group=test-group3 spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj -spring.cloud.stream.bindings.input3.consumer.concurrency=5 +spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input4.destination=TransactionTopic spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group -spring.cloud.stream.bindings.input4.consumer.concurrency=10 +spring.cloud.stream.bindings.input4.consumer.concurrency=5 -spring.application.name=rocketmq-example +spring.application.name=rocketmq-consume-example -server.port=28081 -management.security.enabled=false \ No newline at end of file +server.port=28082 + +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml new file mode 100644 index 00000000..232f68cc --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml @@ -0,0 +1,54 @@ + + + + + org.springframework.cloud + spring-cloud-alibaba-examples + 0.1.2.BUILD-SNAPSHOT + ../../pom.xml + + 4.0.0 + + + rocketmq-produce-example + jar + Example demonstrating how to use rocketmq produce + + + + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java new file mode 100644 index 00000000..6ef37023 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java @@ -0,0 +1,39 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +/** + * @author Jim + */ +public class Foo { + + private int id; + private String bar; + + public Foo() { + } + + public Foo(int id, String bar) { + this.id = id; + this.bar = bar; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getBar() { + return bar; + } + + public void setBar(String bar) { + this.bar = bar; + } + + @Override + public String toString() { + return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}'; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java similarity index 68% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java index 87da880a..bd157be0 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java @@ -4,37 +4,18 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ MySource.class, MySink.class }) -public class RocketMQApplication { - - public interface MySink { - - @Input("input1") - SubscribableChannel input1(); - - @Input("input2") - SubscribableChannel input2(); - - @Input("input3") - SubscribableChannel input3(); - - @Input("input4") - SubscribableChannel input4(); - - } +@EnableBinding({ MySource.class }) +public class RocketMQProduceApplication { public interface MySource { @Output("output1") @@ -45,7 +26,7 @@ public class RocketMQApplication { } public static void main(String[] args) { - SpringApplication.run(RocketMQApplication.class, args); + SpringApplication.run(RocketMQProduceApplication.class, args); } @Bean @@ -87,13 +68,13 @@ public class RocketMQApplication { @Override public void run(String... args) throws Exception { // COMMIT_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg1", false); + senderService.sendTransactionalMsg("transactional-msg1", 1); // ROLLBACK_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg2", true); + senderService.sendTransactionalMsg("transactional-msg2", 2); // ROLLBACK_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg3", true); + senderService.sendTransactionalMsg("transactional-msg3", 3); // COMMIT_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg4", false); + senderService.sendTransactionalMsg("transactional-msg4", 4); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java similarity index 71% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index 9614cc5d..789bc56d 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -2,10 +2,13 @@ package org.springframework.cloud.alibaba.cloud.examples; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -26,10 +29,10 @@ public class SenderService { } public void sendWithTags(T msg, String tag) throws Exception { - Map map = new HashMap<>(); - map.put(MessageConst.PROPERTY_TAGS, tag); - Message message = MessageBuilder.createMessage(msg, new MessageHeaders(map)); - source.output1().send(message); + Map map = new HashMap<>(); + map.put(MessageConst.PROPERTY_TAGS, tag); + Message message = MessageBuilder.createMessage(msg, new MessageHeaders(map)); + source.output1().send(message); } public void sendObject(T msg, String tag) throws Exception { @@ -40,12 +43,11 @@ public class SenderService { source.output1().send(message); } - public void sendTransactionalMsg(T msg, boolean error) throws Exception { + public void sendTransactionalMsg(T msg, int num) throws Exception { MessageBuilder builder = MessageBuilder.withPayload(msg) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); - if (error) { - builder.setHeader("test", "1"); - } + builder.setHeader("test", String.valueOf(num)); + builder.setHeader(RocketMQHeaders.TAGS, "binder"); Message message = builder.build(); source.output2().send(message); } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java similarity index 77% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java index 56812aa6..3e28c826 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java @@ -27,13 +27,20 @@ import org.springframework.messaging.Message; /** * @author Jim */ -@RocketMQTransactionListener(txProducerGroup = "TransactionTopic", corePoolSize = 5, maximumPoolSize = 10) -class TransactionListenerImpl implements RocketMQLocalTransactionListener { +@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10) +public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { - if ("1".equals(((HashMap) msg.getHeaders().get(RocketMQHeaders.PROPERTIES)) - .get("USERS_test"))) { + Object num = ((HashMap) msg.getHeaders().get(RocketMQHeaders.PROPERTIES)) + .get("USERS_test"); + + if ("1".equals(num)) { + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " unknown"); + return RocketMQLocalTransactionState.UNKNOWN; + } + else if ("2".equals(num)) { System.out.println( "executer: " + new String((byte[]) msg.getPayload()) + " rollback"); return RocketMQLocalTransactionState.ROLLBACK; diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties new file mode 100644 index 00000000..beca964a --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties @@ -0,0 +1,20 @@ +logging.level.org.springframework.cloud.stream.binder.rocketmq=DEBUG + +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 + +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group +spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup + +spring.application.name=rocketmq-produce-example + +server.port=28081 + +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java new file mode 100644 index 00000000..7c24012a --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQBinderUtils { + + public static RocketMQBinderConfigurationProperties mergeProducerProperties( + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + RocketMQProperties rocketMQProperties) { + RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties(); + if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) { + result.setNameServer(rocketBinderConfigurationProperties.getNameServer()); + } + else { + result.setNameServer(rocketMQProperties.getNameServer()); + } + if (rocketMQProperties.getProducer() == null + || StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) { + result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey()); + } + else { + result.setAccessKey(rocketMQProperties.getProducer().getAccessKey()); + } + if (rocketMQProperties.getProducer() == null + || StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) { + result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey()); + } + else { + result.setSecretKey(rocketMQProperties.getProducer().getSecretKey()); + } + if (rocketMQProperties.getProducer() == null || StringUtils + .isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) { + result.setCustomizedTraceTopic( + rocketBinderConfigurationProperties.getCustomizedTraceTopic()); + } + else { + result.setCustomizedTraceTopic( + rocketMQProperties.getProducer().getCustomizedTraceTopic()); + } + if (rocketMQProperties.getProducer() != null + && rocketMQProperties.getProducer().isEnableMsgTrace()) { + result.setEnableMsgTrace(Boolean.TRUE); + } + else { + result.setEnableMsgTrace( + rocketBinderConfigurationProperties.isEnableMsgTrace()); + } + return result; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 98164c64..04f318ed 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -21,8 +21,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; @@ -41,6 +46,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -54,6 +60,7 @@ public class RocketMQMessageChannelBinder extends private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final RocketMQProperties rocketMQProperties; private final InstrumentationManager instrumentationManager; private Set clientConfigId = new HashSet<>(); @@ -63,9 +70,11 @@ public class RocketMQMessageChannelBinder extends RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQTopicProvisioner provisioningProvider, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + RocketMQProperties rocketMQProperties, InstrumentationManager instrumentationManager) { super(true, null, provisioningProvider); this.extendedBindingProperties = extendedBindingProperties; + this.rocketMQProperties = rocketMQProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.instrumentationManager = instrumentationManager; } @@ -76,6 +85,10 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { + RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils + .mergeProducerProperties(rocketBinderConfigurationProperties, + rocketMQProperties); + RocketMQTemplate rocketMQTemplate; if (producerProperties.getExtension().getTransactional()) { Map rocketMQTemplates = getApplicationContext() @@ -95,9 +108,27 @@ public class RocketMQMessageChannelBinder extends rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setObjectMapper(getApplicationContext().getParent() .getBeansOfType(ObjectMapper.class).values().iterator().next()); - DefaultMQProducer producer = new DefaultMQProducer(destination.getName()); - producer.setNamesrvAddr( - rocketBinderConfigurationProperties.getNamesrvAddr()); + DefaultMQProducer producer; + String ak = mergedProperties.getAccessKey(); + String sk = mergedProperties.getSecretKey(); + if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { + RPCHook rpcHook = new AclClientRPCHook( + new SessionCredentials(ak, sk)); + producer = new DefaultMQProducer( + producerProperties.getExtension().getGroup(), rpcHook, + mergedProperties.isEnableMsgTrace(), + mergedProperties.getCustomizedTraceTopic()); + producer.setVipChannelEnabled(false); + producer.setInstanceName( + RocketMQUtil.getInstanceName(rpcHook, destination.getName())); + } + else { + producer = new DefaultMQProducer( + producerProperties.getExtension().getGroup()); + producer.setVipChannelEnabled( + producerProperties.getExtension().getVipChannelEnabled()); + } + producer.setNamesrvAddr(mergedProperties.getNameServer()); producer.setSendMsgTimeout( producerProperties.getExtension().getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed( @@ -110,14 +141,13 @@ public class RocketMQMessageChannelBinder extends producerProperties.getExtension().isRetryNextServer()); producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); - producer.setVipChannelEnabled( - producerProperties.getExtension().getVipChannelEnabled()); rocketMQTemplate.setProducer(producer); clientConfigId.add(producer.buildMQClientId()); } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), + producerProperties.getExtension().getGroup(), producerProperties.getExtension().getTransactional(), instrumentationManager); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); @@ -145,7 +175,7 @@ public class RocketMQMessageChannelBinder extends } RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer( - consumerProperties, this); + consumerProperties, rocketBinderConfigurationProperties, this); listenerContainer.setConsumerGroup(group); listenerContainer.setTopic(destination.getName()); listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency()); @@ -154,7 +184,7 @@ public class RocketMQMessageChannelBinder extends listenerContainer.setDelayLevelWhenNextConsume( consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer - .setNameServer(rocketBinderConfigurationProperties.getNamesrvAddr()); + .setNameServer(rocketBinderConfigurationProperties.getNameServer()); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index d2888839..6874e512 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -16,6 +16,8 @@ package org.springframework.cloud.stream.binder.rocketmq.config; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; @@ -32,7 +34,8 @@ import org.springframework.context.annotation.Import; * @author Jim */ @Configuration -@Import(RocketMQBinderHealthIndicatorAutoConfiguration.class) +@Import({ RocketMQAutoConfiguration.class, + RocketMQBinderHealthIndicatorAutoConfiguration.class }) @EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class }) public class RocketMQBinderAutoConfiguration { @@ -41,6 +44,9 @@ public class RocketMQBinderAutoConfiguration { private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + @Autowired(required = false) + private RocketMQProperties rocketMQProperties = new RocketMQProperties(); + @Autowired public RocketMQBinderAutoConfiguration( RocketMQExtendedBindingProperties extendedBindingProperties, @@ -60,7 +66,8 @@ public class RocketMQBinderAutoConfiguration { InstrumentationManager instrumentationManager) { RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( extendedBindingProperties, provisioningProvider, - rocketBinderConfigurationProperties, instrumentationManager); + rocketBinderConfigurationProperties, rocketMQProperties, + instrumentationManager); return binder; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java index 4b34a104..407ef1d7 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java @@ -16,11 +16,10 @@ package org.springframework.cloud.stream.binder.rocketmq.config; -import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; -import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; -import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer; import org.apache.rocketmq.spring.config.RocketMQConfigUtils; import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor; import org.apache.rocketmq.spring.config.TransactionHandlerRegistry; @@ -32,7 +31,6 @@ import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; -import org.springframework.util.Assert; import com.fasterxml.jackson.databind.ObjectMapper; @@ -51,48 +49,35 @@ public class RocketMQComponent4BinderAutoConfiguration { } @Bean + @ConditionalOnMissingBean(DefaultMQProducer.class) public DefaultMQProducer defaultMQProducer() { - RocketMQProperties rocketMQProperties = new RocketMQProperties(); - String configNameServer = environment - .getProperty("spring.cloud.stream.rocketmq.binder.namesrv-addr"); - if (StringUtils.isEmpty(configNameServer)) { - rocketMQProperties.setNameServer(RocketMQBinderConstants.DEFAULT_NAME_SERVER); + DefaultMQProducer producer; + String configNameServer = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.name-server:${rocketmq.producer.name-server:}}"); + String ak = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}"); + String sk = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}"); + if (!org.springframework.util.StringUtils.isEmpty(ak) + && !org.springframework.util.StringUtils.isEmpty(sk)) { + producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP, + new AclClientRPCHook(new SessionCredentials(ak, sk))); + producer.setVipChannelEnabled(false); } else { - rocketMQProperties.setNameServer(configNameServer); + producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP); } - RocketMQProperties.Producer producerConfig = new Producer(); - rocketMQProperties.setProducer(producerConfig); - producerConfig.setGroup(RocketMQBinderConstants.DEFAULT_GROUP); - - String nameServer = rocketMQProperties.getNameServer(); - String groupName = producerConfig.getGroup(); - Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); - Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); - - DefaultMQProducer producer = new DefaultMQProducer(groupName); - producer.setNamesrvAddr(nameServer); - producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); - producer.setRetryTimesWhenSendFailed( - producerConfig.getRetryTimesWhenSendFailed()); - producer.setRetryTimesWhenSendAsyncFailed( - producerConfig.getRetryTimesWhenSendAsyncFailed()); - producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); - producer.setCompressMsgBodyOverHowmuch( - producerConfig.getCompressMessageBodyThreshold()); - producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); - + producer.setNamesrvAddr(configNameServer); return producer; } @Bean(destroyMethod = "destroy") - @ConditionalOnBean(DefaultMQProducer.class) - @ConditionalOnMissingBean(RocketMQTemplate.class) + @ConditionalOnMissingBean public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, - ObjectMapper rocketMQMessageObjectMapper) { + ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setProducer(mqProducer); - rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper); + rocketMQTemplate.setObjectMapper(objectMapper); return rocketMQTemplate; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 5b6abad9..7b824600 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -18,6 +18,8 @@ package org.springframework.cloud.stream.binder.rocketmq.consuming; import java.util.List; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -26,8 +28,10 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType; @@ -40,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.context.SmartLifecycle; import org.springframework.util.Assert; @@ -83,6 +88,7 @@ public class RocketMQListenerBindingContainer private final ExtendedConsumerProperties rocketMQConsumerProperties; private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; // The following properties came from RocketMQConsumerProperties. private ConsumeMode consumeMode; @@ -92,8 +98,10 @@ public class RocketMQListenerBindingContainer public RocketMQListenerBindingContainer( ExtendedConsumerProperties rocketMQConsumerProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { this.rocketMQConsumerProperties = rocketMQConsumerProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() ? ConsumeMode.ORDERLY @@ -189,7 +197,23 @@ public class RocketMQListenerBindingContainer Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); - consumer = new DefaultMQPushConsumer(consumerGroup); + String ak = rocketBinderConfigurationProperties.getAccessKey(); + String sk = rocketBinderConfigurationProperties.getSecretKey(); + if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { + RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk)); + consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, + new AllocateMessageQueueAveragely(), + rocketBinderConfigurationProperties.isEnableMsgTrace(), + rocketBinderConfigurationProperties.getCustomizedTraceTopic()); + consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic)); + consumer.setVipChannelEnabled(false); + } + else { + consumer = new DefaultMQPushConsumer(consumerGroup, + rocketBinderConfigurationProperties.isEnableMsgTrace(), + rocketBinderConfigurationProperties.getCustomizedTraceTopic()); + } + consumer.setNamesrvAddr(nameServer); consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); @@ -238,6 +262,15 @@ public class RocketMQListenerBindingContainer } + @Override + public String toString() { + return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup + + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + + messageModel + '}'; + } + public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 56250e16..6bc0de51 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -40,7 +40,7 @@ import org.springframework.util.Assert; */ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { - private static final Logger logger = LoggerFactory + private static final Logger log = LoggerFactory .getLogger(RocketMQInboundChannelAdapter.class); private RetryTemplate retryTemplate; @@ -88,7 +88,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } catch (Exception e) { - logger.error("rocketMQListenerContainer init error: " + e.getMessage(), e); + log.error("rocketMQListenerContainer init error: " + e.getMessage(), e); throw new IllegalArgumentException( "rocketMQListenerContainer init error: " + e.getMessage(), e); } @@ -116,7 +116,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { .getHealthInstrumentation(rocketMQListenerContainer.getTopic() + rocketMQListenerContainer.getConsumerGroup()) .markStartFailed(e); - logger.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); + log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); throw new MessagingException(MessageBuilder.withPayload( "RocketMQTemplate startup failed, Caused by " + e.getMessage()) .build(), e); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 578e746f..a23bcadc 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -23,6 +23,8 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -42,6 +44,9 @@ import org.springframework.util.StringUtils; */ public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { + private final static Logger log = LoggerFactory + .getLogger(RocketMQMessageHandler.class); + private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); private MessageChannel sendFailureChannel; @@ -52,6 +57,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private final String destination; + private final String groupName; + private final InstrumentationManager instrumentationManager; private boolean sync = false; @@ -59,9 +66,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private volatile boolean running = false; public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, - Boolean transactional, InstrumentationManager instrumentationManager) { + String group, Boolean transactional, + InstrumentationManager instrumentationManager) { this.rocketMQTemplate = rocketMQTemplate; this.destination = destination; + this.groupName = group; this.transactional = transactional; this.instrumentationManager = instrumentationManager; } @@ -79,8 +88,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li catch (Exception e) { instrumentationManager.getHealthInstrumentation(destination) .markStartFailed(e); - logger.error( - "RocketMQTemplate startup failed, Caused by " + e.getMessage()); + log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); throw new MessagingException(MessageBuilder.withPayload( "RocketMQTemplate startup failed, Caused by " + e.getMessage()) .build(), e); @@ -106,19 +114,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li protected void handleMessageInternal( final org.springframework.messaging.Message message) throws Exception { try { - StringBuilder topicWithTags = new StringBuilder(destination); + final StringBuilder topicWithTags = new StringBuilder(destination); String tags = null; if (message.getHeaders().get(RocketMQHeaders.TAGS) != null) { tags = message.getHeaders().get(RocketMQHeaders.TAGS).toString(); } if (!StringUtils.isEmpty(tags)) { - topicWithTags = topicWithTags.append(":").append(tags); + topicWithTags.append(":").append(tags); } SendResult sendRes = null; if (transactional) { - sendRes = rocketMQTemplate.sendMessageInTransaction(destination, + sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, topicWithTags.toString(), message, message.getHeaders() .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); + log.debug("transactional send to topic " + topicWithTags + " " + sendRes); } else { int delayLevel = 0; @@ -139,17 +148,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); + log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { rocketMQTemplate.asyncSend(topicWithTags.toString(), message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - + log.debug("async send to topic " + topicWithTags + " " + + sendResult); } @Override public void onException(Throwable e) { + log.error( + "RocketMQ Message hasn't been sent. Caused by " + + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy @@ -173,8 +187,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } } catch (Exception e) { - logger.error( - "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); + log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send(this.errorMessageStrategy .buildErrorMessage(new MessagingException(message, e), null)); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index b6d0e9f8..af292e74 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; +import org.apache.rocketmq.common.MixAll; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; @@ -26,24 +27,69 @@ import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") public class RocketMQBinderConfigurationProperties { - private String namesrvAddr = RocketMQBinderConstants.DEFAULT_NAME_SERVER; + /** + * The name server for rocketMQ, formats: `host:port;host:port`. + */ + private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER; - private String logLevel = "ERROR"; + /** + * The property of "access-key". + */ + private String accessKey; - public String getNamesrvAddr() { - return namesrvAddr; + /** + * The property of "secret-key". + */ + private String secretKey; + + /** + * Switch flag instance for message trace. + */ + private boolean enableMsgTrace = true; + + /** + * The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC; + + public String getNameServer() { + return nameServer; } - public void setNamesrvAddr(String namesrvAddr) { - this.namesrvAddr = namesrvAddr; + public void setNameServer(String nameServer) { + this.nameServer = nameServer; } - public String getLogLevel() { - return logLevel; + public String getAccessKey() { + return accessKey; } - public void setLogLevel(String logLevel) { - this.logLevel = logLevel; + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; } + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public boolean isEnableMsgTrace() { + return enableMsgTrace; + } + + public void setEnableMsgTrace(boolean enableMsgTrace) { + this.enableMsgTrace = enableMsgTrace; + } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 60af4129..a8d784e7 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -27,7 +27,12 @@ public class RocketMQProducerProperties { private Boolean enabled = true; /** - * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize} + * Name of producer. + */ + private String group; + + /** + * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}. */ private Integer maxMessageSize = 1024 * 1024 * 4; @@ -70,6 +75,14 @@ public class RocketMQProducerProperties { */ private boolean retryNextServer = false; + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + public Boolean getEnabled() { return enabled; } @@ -150,4 +163,4 @@ public class RocketMQProducerProperties { this.retryNextServer = retryNextServer; } -} +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index 155902c0..a45855c7 100644 --- a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -56,7 +56,7 @@ public class RocketMQAutoConfigurationTests { public void testProperties() { RocketMQBinderConfigurationProperties binderConfigurationProperties = context .getBean(RocketMQBinderConfigurationProperties.class); - assertThat(binderConfigurationProperties.getNamesrvAddr()) + assertThat(binderConfigurationProperties.getNameServer()) .isEqualTo("127.0.0.1:9876"); RocketMQExtendedBindingProperties bindingProperties = context .getBean(RocketMQExtendedBindingProperties.class);