diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties
index 2779db52..e2e77bd0 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties
@@ -3,20 +3,20 @@ spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
-spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
+spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
-spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
-spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
+spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false
+spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
-spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
+spring.cloud.stream.rocketmq.bindings.input3.consumer.subscription=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input4.destination=TransactionTopic
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java
index cd9e5093..f7db15b5 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java
@@ -20,8 +20,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.alibaba.cloud.examples.RocketMQProduceApplication.MySource;
+import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
@@ -62,7 +62,7 @@ public class SenderService {
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
builder.setHeader("test", String.valueOf(num));
- builder.setHeader(RocketMQHeaders.TAGS, "binder");
+ builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
Message message = builder.build();
source.output2().send(message);
}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java
index e58beb22..1cc1b329 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java
@@ -16,43 +16,43 @@
package com.alibaba.cloud.examples;
-import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
-import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
-import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
-import org.springframework.messaging.Message;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.stereotype.Component;
/**
* @author Jim
*/
-@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,
- maximumPoolSize = 10)
-public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+@Component("myTransactionListener")
+public class TransactionListenerImpl implements TransactionListener {
@Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
- Object arg) {
- Object num = msg.getHeaders().get("test");
+ public LocalTransactionState executeLocalTransaction(Message msg,
+ Object arg) {
+ Object num = msg.getProperty("test");
if ("1".equals(num)) {
System.out.println(
- "executer: " + new String((byte[]) msg.getPayload()) + " unknown");
- return RocketMQLocalTransactionState.UNKNOWN;
+ "executer: " + new String(msg.getBody()) + " unknown");
+ return LocalTransactionState.UNKNOW;
}
else if ("2".equals(num)) {
System.out.println(
- "executer: " + new String((byte[]) msg.getPayload()) + " rollback");
- return RocketMQLocalTransactionState.ROLLBACK;
+ "executer: " + new String(msg.getBody()) + " rollback");
+ return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println(
- "executer: " + new String((byte[]) msg.getPayload()) + " commit");
- return RocketMQLocalTransactionState.COMMIT;
+ "executer: " + new String(msg.getBody()) + " commit");
+ return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
- System.out.println("check: " + new String((byte[]) msg.getPayload()));
- return RocketMQLocalTransactionState.COMMIT;
+ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+ System.out.println("check: " + new String(msg.getBody()));
+ return LocalTransactionState.COMMIT_MESSAGE;
}
}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties
index 772bf456..08ab26ca 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties
@@ -11,6 +11,7 @@ spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
+spring.cloud.stream.rocketmq.bindings.output2.producer.transactionListener=myTransactionListener
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
index e83a6a17..694c8519 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
@@ -18,18 +18,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.contants;
import org.apache.rocketmq.common.message.MessageConst;
-import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
-
/**
* @author zkzlx
*/
public class RocketMQConst extends MessageConst {
- /**
- * Header key for RocketMQ Transactional Args.
- */
- public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
-
/**
* Default NameServer value.
*/
@@ -38,12 +31,8 @@ public class RocketMQConst extends MessageConst {
/**
* Default group for SCS RocketMQ Binder.
*/
- public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
+ public static final String DEFAULT_GROUP = "binder_default_group_name";
- /**
- * RocketMQ re-consume times.
- */
- public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
index 2ca91384..a378fc61 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
@@ -17,6 +17,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
import java.lang.reflect.Field;
+import java.util.Iterator;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
@@ -62,6 +63,8 @@ public class RocketMQMessageSource extends AbstractMessageSource