diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc
index 5443b7dd..3724fd41 100644
--- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc
+++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc
@@ -164,6 +164,9 @@ Provider端支持的配置:
^|配置项 ^|含义 ^| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4)
+|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false
+|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`|
+|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`|
|====
Consumer端支持的配置:
@@ -173,8 +176,8 @@ Consumer端支持的配置:
|====
^|配置项 ^|含义| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true
-|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割|
-|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息|
+|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤,订阅所有消息)|
+|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false
|====
diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc
index ed1b0d83..c1e699b1 100644
--- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc
+++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc
@@ -142,7 +142,7 @@ class EchoServiceFallback implements EchoService {
}
```
-NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://requesturl
+NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:protocol://requesturl
`EchoService` 接口中方法 `echo` 对应的资源名为 `GET:http://service-provider/echo/{str}`。
@@ -150,17 +150,17 @@ NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://reque
### RestTemplate 支持
-Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。
+Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelRestTemplate` 注解。
```java
@Bean
-@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
+@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
public RestTemplate restTemplate() {
return new RestTemplate();
}
```
-`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
+`@SentinelRestTemplate` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
限流的资源规则提供两种粒度:
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java
new file mode 100644
index 00000000..65d72662
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java
@@ -0,0 +1,18 @@
+package org.springframework.cloud.alibaba.cloud.examples;
+
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * @author Jim
+ */
+public class MyTransactionCheckListener implements TransactionCheckListener {
+
+ @Override
+ public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
+ System.out.println("TransactionCheckListener: " + new String(msg.getBody()));
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java
new file mode 100644
index 00000000..752d4e5f
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java
@@ -0,0 +1,20 @@
+package org.springframework.cloud.alibaba.cloud.examples;
+
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.common.message.Message;
+
+/**
+ * @author Jim
+ */
+public class MyTransactionExecuter implements LocalTransactionExecuter {
+ @Override
+ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
+ if ("1".equals(msg.getUserProperty("test"))) {
+ System.out.println(new String(msg.getBody()) + " rollback");
+ return LocalTransactionState.ROLLBACK_MESSAGE;
+ }
+ System.out.println(new String(msg.getBody()) + " commit");
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java
index f251902d..a486ebc0 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java
@@ -30,4 +30,9 @@ public class ReceiveService {
System.out.println("input1 receive again: " + receiveMsg);
}
+ @StreamListener("input4")
+ public void receiveTransactionalMsg(String transactionMsg) {
+ System.out.println("input4 receive transaction msg: " + transactionMsg);
+ }
+
}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java
index 2d4deda6..f736a0e5 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java
@@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink;
+import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
-import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
* @author Jim
*/
@SpringBootApplication
-@EnableBinding({ Source.class, MySink.class })
+@EnableBinding({ MySource.class, MySink.class })
public class RocketMQApplication {
public interface MySink {
@@ -29,6 +31,16 @@ public class RocketMQApplication {
@Input("input3")
SubscribableChannel input3();
+ @Input("input4")
+ SubscribableChannel input4();
+ }
+
+ public interface MySource {
+ @Output("output1")
+ MessageChannel output1();
+
+ @Output("output2")
+ MessageChannel output2();
}
public static void main(String[] args) {
@@ -40,6 +52,11 @@ public class RocketMQApplication {
return new CustomRunner();
}
+ @Bean
+ public CustomRunnerWithTransactional customRunnerWithTransactional() {
+ return new CustomRunnerWithTransactional();
+ }
+
public static class CustomRunner implements CommandLineRunner {
@Autowired
private SenderService senderService;
@@ -62,4 +79,21 @@ public class RocketMQApplication {
}
}
+ public static class CustomRunnerWithTransactional implements CommandLineRunner {
+ @Autowired
+ private SenderService senderService;
+
+ @Override
+ public void run(String... args) throws Exception {
+ // COMMIT_MESSAGE message
+ senderService.sendTransactionalMsg("transactional-msg1", false);
+ // ROLLBACK_MESSAGE message
+ senderService.sendTransactionalMsg("transactional-msg2", true);
+ // ROLLBACK_MESSAGE message
+ senderService.sendTransactionalMsg("transactional-msg3", true);
+ // COMMIT_MESSAGE message
+ senderService.sendTransactionalMsg("transactional-msg4", false);
+ }
+ }
+
}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java
index eaf0001f..e84ada23 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java
@@ -5,7 +5,7 @@ import java.util.stream.Stream;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils;
public class SenderService {
@Autowired
- private Source source;
+ private MySource source;
public void send(String msg) throws Exception {
- source.output().send(MessageBuilder.withPayload(msg).build());
+ source.output1().send(MessageBuilder.withPayload(msg).build());
}
public void sendWithTags(T msg, String tag) throws Exception {
Message message = MessageBuilder.createMessage(msg,
new MessageHeaders(Stream.of(tag).collect(Collectors
.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
- source.output().send(message);
+ source.output1().send(message);
}
public void sendObject(T msg, String tag) throws Exception {
@@ -37,7 +37,17 @@ public class SenderService {
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
- source.output().send(message);
+ source.output1().send(message);
+ }
+
+ public void sendTransactionalMsg(T msg, boolean error) throws Exception {
+ MessageBuilder builder = MessageBuilder.withPayload(msg)
+ .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
+ if (error) {
+ builder.setHeader("test", "1");
+ }
+ Message message = builder.build();
+ source.output2().send(message);
}
}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties
index c92dd24a..3bfe511f 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties
@@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
-spring.cloud.stream.bindings.output.destination=test-topic
-spring.cloud.stream.bindings.output.content-type=application/json
+spring.cloud.stream.bindings.output1.destination=test-topic
+spring.cloud.stream.bindings.output1.content-type=application/json
+
+spring.cloud.stream.bindings.output2.destination=TransactionTopic
+spring.cloud.stream.bindings.output2.content-type=application/json
+spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
+spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter
+spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
@@ -26,6 +32,11 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
+spring.cloud.stream.bindings.input4.destination=TransactionTopic
+spring.cloud.stream.bindings.input4.content-type=text/plain
+spring.cloud.stream.bindings.input4.group=transaction-group
+spring.cloud.stream.bindings.input4.consumer.concurrency=210
+
spring.application.name=rocketmq-example
server.port=28081
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
index 65128be4..66517074 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -32,6 +32,8 @@ public interface RocketMQBinderConstants {
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
+ String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG";
+
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index d5109cda..ded2be10 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -16,6 +16,9 @@
package org.springframework.cloud.stream.binder.rocketmq;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
@@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
+import org.springframework.util.ClassUtils;
/**
* @author Timur Valiev
@@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends
ExtendedProducerProperties producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
- return new RocketMQMessageHandler(destination.getName(),
- producerProperties.getExtension(),
+ RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
+ destination.getName(), producerProperties,
rocketBinderConfigurationProperties, instrumentationManager);
+ if (producerProperties.getExtension().getTransactional()) {
+ // transaction message check LocalTransactionExecuter
+ messageHandler.setLocalTransactionExecuter(
+ getClassConfiguration(destination.getName(),
+ producerProperties.getExtension().getExecuter(),
+ LocalTransactionExecuter.class));
+ // transaction message check TransactionCheckListener
+ messageHandler.setTransactionCheckListener(
+ getClassConfiguration(destination.getName(),
+ producerProperties.getExtension()
+ .getTransactionCheckListener(),
+ TransactionCheckListener.class));
+ }
+
+ return messageHandler;
}
else {
throw new RuntimeException("Binding for channel " + destination.getName()
@@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
}
+
+ private T getClassConfiguration(String destName, String className,
+ Class interfaceClass) {
+ if (StringUtils.isEmpty(className)) {
+ throw new RuntimeException("Binding for channel " + destName
+ + " using transactional message, should set "
+ + interfaceClass.getSimpleName() + " configuration"
+ + interfaceClass.getSimpleName() + " should be set, like "
+ + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour"
+ + interfaceClass.getSimpleName() + "'");
+ }
+ else if (StringUtils.isNotEmpty(className)) {
+ Class fieldClass;
+ // check class exists
+ try {
+ fieldClass = ClassUtils.forName(className,
+ RocketMQMessageChannelBinder.class.getClassLoader());
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException("Binding for channel " + destName
+ + " using transactional message, but " + className
+ + " class is not found");
+ }
+ // check interface incompatible
+ if (!interfaceClass.isAssignableFrom(fieldClass)) {
+ throw new RuntimeException("Binding for channel " + destName
+ + " using transactional message, but " + className
+ + " is incompatible with " + interfaceClass.getSimpleName()
+ + " interface");
+ }
+ try {
+ return (T) fieldClass.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Binding for channel " + destName
+ + " using transactional message, but " + className
+ + " instance error", e);
+ }
+ }
+ return null;
+ }
+
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
index 4f0ca012..7707c07e 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
@@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG;
import java.util.HashMap;
import java.util.Map;
@@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
return this;
}
+ public Object getTransactionalArg() {
+ return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG);
+ }
+
+ public Object withTransactionalArg(Object arg) {
+ setHeader(ROCKET_TRANSACTIONAL_ARG, arg);
+ return this;
+ }
+
public SendResult getSendResult() {
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index ece595e6..ba8054ad 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -23,10 +23,14 @@ import java.util.Optional;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private InstrumentationManager instrumentationManager;
- private final RocketMQProducerProperties producerProperties;
+ private LocalTransactionExecuter localTransactionExecuter;
+
+ private TransactionCheckListener transactionCheckListener;
+
+ private final ExtendedProducerProperties producerProperties;
private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
- protected volatile boolean running = false;
+ private volatile boolean running = false;
public RocketMQMessageHandler(String destination,
- RocketMQProducerProperties producerProperties,
+ ExtendedProducerProperties producerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
this.destination = destination;
@@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
public void start() {
- producer = new DefaultMQProducer(destination);
+ if (producerProperties.getExtension().getTransactional()) {
+ producer = new TransactionMQProducer(destination);
+ if (transactionCheckListener != null) {
+ ((TransactionMQProducer) producer)
+ .setTransactionCheckListener(transactionCheckListener);
+ }
+ }
+ else {
+ producer = new DefaultMQProducer(destination);
+ }
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
producerInstrumentation = manager.getProducerInstrumentation(destination);
@@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
- if (producerProperties.getMaxMessageSize() > 0) {
- producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
+ if (producerProperties.getExtension().getMaxMessageSize() > 0) {
+ producer.setMaxMessageSize(
+ producerProperties.getExtension().getMaxMessageSize());
}
try {
@@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
toSend.putUserProperty(entry.getKey(), entry.getValue());
}
- SendResult sendRes = producer.send(toSend);
+ SendResult sendRes;
+ if (producerProperties.getExtension().getTransactional()) {
+ sendRes = producer.sendMessageInTransaction(toSend,
+ localTransactionExecuter, headerAccessor.getTransactionalArg());
+ }
+ else {
+ sendRes = producer.send(toSend);
+ }
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new MQClientException("message hasn't been sent", null);
@@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
+ public void setLocalTransactionExecuter(
+ LocalTransactionExecuter localTransactionExecuter) {
+ this.localTransactionExecuter = localTransactionExecuter;
+ }
+
+ public void setTransactionCheckListener(
+ TransactionCheckListener transactionCheckListener) {
+ this.transactionCheckListener = transactionCheckListener;
+ }
}
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
index 6afefd3b..1a05ad50 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
@@ -17,6 +17,8 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
/**
* @author Timur Valiev
@@ -31,6 +33,18 @@ public class RocketMQProducerProperties {
*/
private Integer maxMessageSize = 0;
+ private Boolean transactional = false;
+
+ /**
+ * full class name of {@link LocalTransactionExecuter}
+ */
+ private String executer;
+
+ /**
+ * full class name of {@link TransactionCheckListener}
+ */
+ private String transactionCheckListener;
+
public Boolean getEnabled() {
return enabled;
}
@@ -47,4 +61,27 @@ public class RocketMQProducerProperties {
this.maxMessageSize = maxMessageSize;
}
+ public Boolean getTransactional() {
+ return transactional;
+ }
+
+ public void setTransactional(Boolean transactional) {
+ this.transactional = transactional;
+ }
+
+ public String getExecuter() {
+ return executer;
+ }
+
+ public void setExecuter(String executer) {
+ this.executer = executer;
+ }
+
+ public String getTransactionCheckListener() {
+ return transactionCheckListener;
+ }
+
+ public void setTransactionCheckListener(String transactionCheckListener) {
+ this.transactionCheckListener = transactionCheckListener;
+ }
}