diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java new file mode 100644 index 00000000..65d72662 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java @@ -0,0 +1,18 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; + +/** + * @author Jim + */ +public class MyTransactionCheckListener implements TransactionCheckListener { + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + System.out.println("TransactionCheckListener: " + new String(msg.getBody())); + return LocalTransactionState.COMMIT_MESSAGE; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java new file mode 100644 index 00000000..752d4e5f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java @@ -0,0 +1,20 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; + +/** + * @author Jim + */ +public class MyTransactionExecuter implements LocalTransactionExecuter { + @Override + public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + if ("1".equals(msg.getUserProperty("test"))) { + System.out.println(new String(msg.getBody()) + " rollback"); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + System.out.println(new String(msg.getBody()) + " commit"); + return LocalTransactionState.COMMIT_MESSAGE; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index f251902d..d590aa28 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -30,4 +30,9 @@ public class ReceiveService { System.out.println("input1 receive again: " + receiveMsg); } + @StreamListener("input4") + public void receiveTransactionalMsg(String transactionMsg) { + System.out.println("input4 receive transaction msg: " + transactionMsg); + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java index e4a8b1ab..87da880a 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java @@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ Source.class, MySink.class }) +@EnableBinding({ MySource.class, MySink.class }) public class RocketMQApplication { public interface MySink { @@ -29,17 +31,33 @@ public class RocketMQApplication { @Input("input3") SubscribableChannel input3(); + @Input("input4") + SubscribableChannel input4(); + + } + + public interface MySource { + @Output("output1") + MessageChannel output1(); + + @Output("output2") + MessageChannel output2(); } public static void main(String[] args) { SpringApplication.run(RocketMQApplication.class, args); - } + } @Bean public CustomRunner customRunner() { return new CustomRunner(); } + @Bean + public CustomRunnerWithTransactional customRunnerWithTransactional() { + return new CustomRunnerWithTransactional(); + } + public static class CustomRunner implements CommandLineRunner { @Autowired private SenderService senderService; @@ -62,4 +80,21 @@ public class RocketMQApplication { } } + public static class CustomRunnerWithTransactional implements CommandLineRunner { + @Autowired + private SenderService senderService; + + @Override + public void run(String... args) throws Exception { + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg1", false); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg2", true); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg3", true); + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg4", false); + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index 1c886d01..9614cc5d 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -5,7 +5,7 @@ import java.util.Map; import org.apache.rocketmq.common.message.MessageConst; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils; public class SenderService { @Autowired - private Source source; + private MySource source; public void send(String msg) throws Exception { - source.output().send(MessageBuilder.withPayload(msg).build()); + source.output1().send(MessageBuilder.withPayload(msg).build()); } public void sendWithTags(T msg, String tag) throws Exception { Map map = new HashMap<>(); map.put(MessageConst.PROPERTY_TAGS, tag); Message message = MessageBuilder.createMessage(msg, new MessageHeaders(map)); - source.output().send(message); + source.output1().send(message); } public void sendObject(T msg, String tag) throws Exception { @@ -37,7 +37,17 @@ public class SenderService { .setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); - source.output().send(message); + source.output1().send(message); + } + + public void sendTransactionalMsg(T msg, boolean error) throws Exception { + MessageBuilder builder = MessageBuilder.withPayload(msg) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); + if (error) { + builder.setHeader("test", "1"); + } + Message message = builder.build(); + source.output2().send(message); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index 0ba6a54e..54bf902b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 -spring.cloud.stream.bindings.output.destination=test-topic -spring.cloud.stream.bindings.output.content-type=application/json +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter +spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain @@ -26,5 +32,12 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 +spring.cloud.stream.bindings.input4.destination=TransactionTopic +spring.cloud.stream.bindings.input4.content-type=text/plain +spring.cloud.stream.bindings.input4.group=transaction-group +spring.cloud.stream.bindings.input4.consumer.concurrency=210 + +spring.application.name=rocketmq-example + server.port=28081 management.security.enabled=false \ No newline at end of file