From 7cc3b0291790ba88f9f9e70899daa4c4dce71ba8 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 28 Feb 2019 15:31:28 +0800 Subject: [PATCH] update rocketmq examples --- spring-cloud-alibaba-examples/pom.xml | 3 +- .../{ => rocketmq-consume-example}/pom.xml | 11 ++-- .../cloud/alibaba/cloud/examples/Foo.java | 0 .../cloud/examples/ReceiveService.java | 8 +-- .../examples/RocketMQConsumerApplication.java | 36 +++++++++++++ .../src/main/resources/application.properties | 15 ++---- .../rocketmq-produce-example/pom.xml | 54 +++++++++++++++++++ .../cloud/alibaba/cloud/examples/Foo.java | 39 ++++++++++++++ .../examples/RocketMQProduceApplication.java} | 34 +++--------- .../alibaba/cloud/examples/SenderService.java | 10 ++-- .../examples/TransactionListenerImpl.java | 15 ++++-- .../src/main/resources/application.properties | 20 +++++++ 12 files changed, 185 insertions(+), 60 deletions(-) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/pom.xml (83%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java (100%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java (80%) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-consume-example}/src/main/resources/application.properties (72%) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java rename spring-cloud-alibaba-examples/rocketmq-example/{src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java => rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java} (69%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-produce-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java (86%) rename spring-cloud-alibaba-examples/rocketmq-example/{ => rocketmq-produce-example}/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java (77%) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index bedfeff2..96f45557 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -34,7 +34,8 @@ fescar-example/storage-service fescar-example/account-service acm-example/acm-local-example - rocketmq-example + rocketmq-example/rocketmq-consume-example + rocketmq-example/rocketmq-produce-example sms-example spring-cloud-bus-rocketmq-example schedulerx-example/schedulerx-simple-task-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml similarity index 83% rename from spring-cloud-alibaba-examples/rocketmq-example/pom.xml rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml index 84ef5029..cbdf8bca 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml @@ -6,14 +6,14 @@ org.springframework.cloud spring-cloud-alibaba-examples 0.2.2.BUILD-SNAPSHOT - ../pom.xml + ../../pom.xml 4.0.0 - rocketmq-example + rocketmq-consume-example jar - Example demonstrating how to use rocketmq + Example demonstrating how to use rocketmq consume @@ -32,11 +32,6 @@ spring-boot-starter-actuator - - io.dropwizard.metrics - metrics-core - - diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java similarity index 100% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java similarity index 80% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index b507fb50..35fc25a6 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -25,9 +25,9 @@ public class ReceiveService { System.out.println("input3 receive: " + foo); } - @StreamListener("input4") - public void receiveTransactionalMsg(String transactionMsg) { - System.out.println("input4 receive transaction msg: " + transactionMsg); - } + @StreamListener("input4") + public void receiveTransactionalMsg(String transactionMsg) { + System.out.println("input4 receive transaction msg: " + transactionMsg); + } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java new file mode 100644 index 00000000..25204919 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java @@ -0,0 +1,36 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +/** + * @author Jim + */ +@SpringBootApplication +@EnableBinding({ MySink.class }) +public class RocketMQConsumerApplication { + + public interface MySink { + + @Input("input1") + SubscribableChannel input1(); + + @Input("input2") + SubscribableChannel input2(); + + @Input("input3") + SubscribableChannel input3(); + + @Input("input4") + SubscribableChannel input4(); + } + + public static void main(String[] args) { + SpringApplication.run(RocketMQConsumerApplication.class, args); + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties similarity index 72% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties index 7d9c0c85..ec27539c 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties @@ -1,13 +1,4 @@ -spring.cloud.stream.default-binder=rocketmq - -spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 - -spring.cloud.stream.bindings.output1.destination=test-topic -spring.cloud.stream.bindings.output1.content-type=application/json - -spring.cloud.stream.bindings.output2.destination=TransactionTopic -spring.cloud.stream.bindings.output2.content-type=application/json -spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain @@ -33,9 +24,9 @@ spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group spring.cloud.stream.bindings.input4.consumer.concurrency=5 -spring.application.name=rocketmq-example +spring.application.name=rocketmq-consume-example -server.port=28081 +server.port=28082 management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml new file mode 100644 index 00000000..426a330c --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml @@ -0,0 +1,54 @@ + + + + + org.springframework.cloud + spring-cloud-alibaba-examples + 0.2.2.BUILD-SNAPSHOT + ../../pom.xml + + 4.0.0 + + + rocketmq-produce-example + jar + Example demonstrating how to use rocketmq produce + + + + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java new file mode 100644 index 00000000..e98b6a10 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java @@ -0,0 +1,39 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +/** + * @author Jim + */ +public class Foo { + + private int id; + private String bar; + + public Foo() { + } + + public Foo(int id, String bar) { + this.id = id; + this.bar = bar; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getBar() { + return bar; + } + + public void setBar(String bar) { + this.bar = bar; + } + + @Override + public String toString() { + return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}'; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java similarity index 69% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java index c927d526..bd157be0 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java @@ -4,36 +4,18 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ MySource.class, MySink.class }) -public class RocketMQApplication { - - public interface MySink { - - @Input("input1") - SubscribableChannel input1(); - - @Input("input2") - SubscribableChannel input2(); - - @Input("input3") - SubscribableChannel input3(); - - @Input("input4") - SubscribableChannel input4(); - } +@EnableBinding({ MySource.class }) +public class RocketMQProduceApplication { public interface MySource { @Output("output1") @@ -44,7 +26,7 @@ public class RocketMQApplication { } public static void main(String[] args) { - SpringApplication.run(RocketMQApplication.class, args); + SpringApplication.run(RocketMQProduceApplication.class, args); } @Bean @@ -86,13 +68,13 @@ public class RocketMQApplication { @Override public void run(String... args) throws Exception { // COMMIT_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg1", false); + senderService.sendTransactionalMsg("transactional-msg1", 1); // ROLLBACK_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg2", true); + senderService.sendTransactionalMsg("transactional-msg2", 2); // ROLLBACK_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg3", true); + senderService.sendTransactionalMsg("transactional-msg3", 3); // COMMIT_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg4", false); + senderService.sendTransactionalMsg("transactional-msg4", 4); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java similarity index 86% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index e84ada23..b6d49cf8 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -4,8 +4,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -40,12 +41,11 @@ public class SenderService { source.output1().send(message); } - public void sendTransactionalMsg(T msg, boolean error) throws Exception { + public void sendTransactionalMsg(T msg, int num) throws Exception { MessageBuilder builder = MessageBuilder.withPayload(msg) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); - if (error) { - builder.setHeader("test", "1"); - } + builder.setHeader("test", String.valueOf(num)); + builder.setHeader(RocketMQHeaders.TAGS, "binder"); Message message = builder.build(); source.output2().send(message); } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java similarity index 77% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java index 56812aa6..3e28c826 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java @@ -27,13 +27,20 @@ import org.springframework.messaging.Message; /** * @author Jim */ -@RocketMQTransactionListener(txProducerGroup = "TransactionTopic", corePoolSize = 5, maximumPoolSize = 10) -class TransactionListenerImpl implements RocketMQLocalTransactionListener { +@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10) +public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { - if ("1".equals(((HashMap) msg.getHeaders().get(RocketMQHeaders.PROPERTIES)) - .get("USERS_test"))) { + Object num = ((HashMap) msg.getHeaders().get(RocketMQHeaders.PROPERTIES)) + .get("USERS_test"); + + if ("1".equals(num)) { + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " unknown"); + return RocketMQLocalTransactionState.UNKNOWN; + } + else if ("2".equals(num)) { System.out.println( "executer: " + new String((byte[]) msg.getPayload()) + " rollback"); return RocketMQLocalTransactionState.ROLLBACK; diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties new file mode 100644 index 00000000..beca964a --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties @@ -0,0 +1,20 @@ +logging.level.org.springframework.cloud.stream.binder.rocketmq=DEBUG + +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 + +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group +spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup + +spring.application.name=rocketmq-produce-example + +server.port=28081 + +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always \ No newline at end of file