From 9f797f586e4b60028c8b8089acee1c5339d53aec Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Sat, 2 Feb 2019 13:44:33 +0800 Subject: [PATCH] update rocketmq binder demo --- .../examples/MyTransactionCheckListener.java | 18 ------- .../cloud/examples/MyTransactionExecuter.java | 20 -------- .../cloud/examples/ReceiveService.java | 5 -- .../cloud/examples/RocketMQApplication.java | 4 +- .../examples/TransactionListenerImpl.java | 51 +++++++++++++++++++ .../src/main/resources/application.properties | 9 ++-- 6 files changed, 56 insertions(+), 51 deletions(-) delete mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java delete mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java deleted file mode 100644 index 65d72662..00000000 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.springframework.cloud.alibaba.cloud.examples; - -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.apache.rocketmq.common.message.MessageExt; - -/** - * @author Jim - */ -public class MyTransactionCheckListener implements TransactionCheckListener { - - @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - System.out.println("TransactionCheckListener: " + new String(msg.getBody())); - return LocalTransactionState.COMMIT_MESSAGE; - } - -} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java deleted file mode 100644 index 752d4e5f..00000000 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.springframework.cloud.alibaba.cloud.examples; - -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.common.message.Message; - -/** - * @author Jim - */ -public class MyTransactionExecuter implements LocalTransactionExecuter { - @Override - public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { - if ("1".equals(msg.getUserProperty("test"))) { - System.out.println(new String(msg.getBody()) + " rollback"); - return LocalTransactionState.ROLLBACK_MESSAGE; - } - System.out.println(new String(msg.getBody()) + " commit"); - return LocalTransactionState.COMMIT_MESSAGE; - } -} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index a486ebc0..b507fb50 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -25,11 +25,6 @@ public class ReceiveService { System.out.println("input3 receive: " + foo); } - @StreamListener("input1") - public void receiveInput1Again(String receiveMsg) { - System.out.println("input1 receive again: " + receiveMsg); - } - @StreamListener("input4") public void receiveTransactionalMsg(String transactionMsg) { System.out.println("input4 receive transaction msg: " + transactionMsg); diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java index f736a0e5..c927d526 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java @@ -89,9 +89,9 @@ public class RocketMQApplication { senderService.sendTransactionalMsg("transactional-msg1", false); // ROLLBACK_MESSAGE message senderService.sendTransactionalMsg("transactional-msg2", true); - // ROLLBACK_MESSAGE message + // ROLLBACK_MESSAGE message senderService.sendTransactionalMsg("transactional-msg3", true); - // COMMIT_MESSAGE message + // COMMIT_MESSAGE message senderService.sendTransactionalMsg("transactional-msg4", false); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java new file mode 100644 index 00000000..56812aa6 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import java.util.HashMap; + +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; + +/** + * @author Jim + */ +@RocketMQTransactionListener(txProducerGroup = "TransactionTopic", corePoolSize = 5, maximumPoolSize = 10) +class TransactionListenerImpl implements RocketMQLocalTransactionListener { + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message msg, + Object arg) { + if ("1".equals(((HashMap) msg.getHeaders().get(RocketMQHeaders.PROPERTIES)) + .get("USERS_test"))) { + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " rollback"); + return RocketMQLocalTransactionState.ROLLBACK; + } + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " commit"); + return RocketMQLocalTransactionState.COMMIT; + } + + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { + System.out.println("check: " + new String((byte[]) msg.getPayload())); + return RocketMQLocalTransactionState.COMMIT; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index 3bfe511f..7d9c0c85 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -8,14 +8,11 @@ spring.cloud.stream.bindings.output1.content-type=application/json spring.cloud.stream.bindings.output2.destination=TransactionTopic spring.cloud.stream.bindings.output2.content-type=application/json spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true -spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter -spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true -spring.cloud.stream.bindings.input1.consumer.maxAttempts=1 spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.content-type=text/plain @@ -30,15 +27,15 @@ spring.cloud.stream.bindings.input3.content-type=application/json spring.cloud.stream.bindings.input3.group=test-group3 spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.bindings.input3.consumer.concurrency=20 -spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 spring.cloud.stream.bindings.input4.destination=TransactionTopic spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group -spring.cloud.stream.bindings.input4.consumer.concurrency=210 +spring.cloud.stream.bindings.input4.consumer.concurrency=5 spring.application.name=rocketmq-example server.port=28081 -management.endpoints.web.exposure.include=* \ No newline at end of file +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always \ No newline at end of file