From 6d3a26702aad98192aed923695ca4c845426f1ab Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 14:52:40 +0800 Subject: [PATCH 1/3] Polish #134 --- .../rocketmq/RocketMQBinderConstants.java | 2 + .../RocketMQMessageChannelBinder.java | 65 ++++++++++++++++++- .../RocketMQMessageHeaderAccessor.java | 10 +++ .../integration/RocketMQMessageHandler.java | 48 ++++++++++++-- .../RocketMQProducerProperties.java | 37 +++++++++++ 5 files changed, 153 insertions(+), 9 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 65128be4..66517074 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -32,6 +32,8 @@ public interface RocketMQBinderConstants { String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; + String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG"; + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; /** diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index d5109cda..ded2be10 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,9 @@ package org.springframework.cloud.stream.binder.rocketmq; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; @@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.util.ClassUtils; /** * @author Timur Valiev @@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - return new RocketMQMessageHandler(destination.getName(), - producerProperties.getExtension(), + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( + destination.getName(), producerProperties, rocketBinderConfigurationProperties, instrumentationManager); + if (producerProperties.getExtension().getTransactional()) { + // transaction message check LocalTransactionExecuter + messageHandler.setLocalTransactionExecuter( + getClassConfiguration(destination.getName(), + producerProperties.getExtension().getExecuter(), + LocalTransactionExecuter.class)); + // transaction message check TransactionCheckListener + messageHandler.setTransactionCheckListener( + getClassConfiguration(destination.getName(), + producerProperties.getExtension() + .getTransactionCheckListener(), + TransactionCheckListener.class)); + } + + return messageHandler; } else { throw new RuntimeException("Binding for channel " + destination.getName() @@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { return extendedBindingProperties.getExtendedProducerProperties(channelName); } + + private T getClassConfiguration(String destName, String className, + Class interfaceClass) { + if (StringUtils.isEmpty(className)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, should set " + + interfaceClass.getSimpleName() + " configuration" + + interfaceClass.getSimpleName() + " should be set, like " + + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour" + + interfaceClass.getSimpleName() + "'"); + } + else if (StringUtils.isNotEmpty(className)) { + Class fieldClass; + // check class exists + try { + fieldClass = ClassUtils.forName(className, + RocketMQMessageChannelBinder.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " class is not found"); + } + // check interface incompatible + if (!interfaceClass.isAssignableFrom(fieldClass)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " is incompatible with " + interfaceClass.getSimpleName() + + " interface"); + } + try { + return (T) fieldClass.newInstance(); + } + catch (Exception e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " instance error", e); + } + } + return null; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java index 4f0ca012..7707c07e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java @@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG; import java.util.HashMap; import java.util.Map; @@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { return this; } + public Object getTransactionalArg() { + return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG); + } + + public Object withTransactionalArg(Object arg) { + setHeader(ROCKET_TRANSACTIONAL_ARG, arg); + return this; + } + public SendResult getSendResult() { return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index ece595e6..ba8054ad 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -23,10 +23,14 @@ import java.util.Optional; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private InstrumentationManager instrumentationManager; - private final RocketMQProducerProperties producerProperties; + private LocalTransactionExecuter localTransactionExecuter; + + private TransactionCheckListener transactionCheckListener; + + private final ExtendedProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - protected volatile boolean running = false; + private volatile boolean running = false; public RocketMQMessageHandler(String destination, - RocketMQProducerProperties producerProperties, + ExtendedProducerProperties producerProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, InstrumentationManager instrumentationManager) { this.destination = destination; @@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li @Override public void start() { - producer = new DefaultMQProducer(destination); + if (producerProperties.getExtension().getTransactional()) { + producer = new TransactionMQProducer(destination); + if (transactionCheckListener != null) { + ((TransactionMQProducer) producer) + .setTransactionCheckListener(transactionCheckListener); + } + } + else { + producer = new DefaultMQProducer(destination); + } Optional.ofNullable(instrumentationManager).ifPresent(manager -> { producerInstrumentation = manager.getProducerInstrumentation(destination); @@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - if (producerProperties.getMaxMessageSize() > 0) { - producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + if (producerProperties.getExtension().getMaxMessageSize() > 0) { + producer.setMaxMessageSize( + producerProperties.getExtension().getMaxMessageSize()); } try { @@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li toSend.putUserProperty(entry.getKey(), entry.getValue()); } - SendResult sendRes = producer.send(toSend); + SendResult sendRes; + if (producerProperties.getExtension().getTransactional()) { + sendRes = producer.sendMessageInTransaction(toSend, + localTransactionExecuter, headerAccessor.getTransactionalArg()); + } + else { + sendRes = producer.send(toSend); + } if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { throw new MQClientException("message hasn't been sent", null); @@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } + public void setLocalTransactionExecuter( + LocalTransactionExecuter localTransactionExecuter) { + this.localTransactionExecuter = localTransactionExecuter; + } + + public void setTransactionCheckListener( + TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 6afefd3b..1a05ad50 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -17,6 +17,8 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; /** * @author Timur Valiev @@ -31,6 +33,18 @@ public class RocketMQProducerProperties { */ private Integer maxMessageSize = 0; + private Boolean transactional = false; + + /** + * full class name of {@link LocalTransactionExecuter} + */ + private String executer; + + /** + * full class name of {@link TransactionCheckListener} + */ + private String transactionCheckListener; + public Boolean getEnabled() { return enabled; } @@ -47,4 +61,27 @@ public class RocketMQProducerProperties { this.maxMessageSize = maxMessageSize; } + public Boolean getTransactional() { + return transactional; + } + + public void setTransactional(Boolean transactional) { + this.transactional = transactional; + } + + public String getExecuter() { + return executer; + } + + public void setExecuter(String executer) { + this.executer = executer; + } + + public String getTransactionCheckListener() { + return transactionCheckListener; + } + + public void setTransactionCheckListener(String transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } } From 7d3a1b9adf46e21d49bc23ea6a499e37e1e38082 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 15:00:11 +0800 Subject: [PATCH 2/3] add transaction message in rocketmq example --- .../examples/MyTransactionCheckListener.java | 18 +++++++++ .../cloud/examples/MyTransactionExecuter.java | 20 ++++++++++ .../cloud/examples/ReceiveService.java | 5 +++ .../cloud/examples/RocketMQApplication.java | 38 ++++++++++++++++++- .../alibaba/cloud/examples/SenderService.java | 20 +++++++--- .../src/main/resources/application.properties | 15 +++++++- 6 files changed, 107 insertions(+), 9 deletions(-) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java new file mode 100644 index 00000000..65d72662 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java @@ -0,0 +1,18 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; + +/** + * @author Jim + */ +public class MyTransactionCheckListener implements TransactionCheckListener { + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + System.out.println("TransactionCheckListener: " + new String(msg.getBody())); + return LocalTransactionState.COMMIT_MESSAGE; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java new file mode 100644 index 00000000..752d4e5f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java @@ -0,0 +1,20 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; + +/** + * @author Jim + */ +public class MyTransactionExecuter implements LocalTransactionExecuter { + @Override + public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + if ("1".equals(msg.getUserProperty("test"))) { + System.out.println(new String(msg.getBody()) + " rollback"); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + System.out.println(new String(msg.getBody()) + " commit"); + return LocalTransactionState.COMMIT_MESSAGE; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index f251902d..a486ebc0 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -30,4 +30,9 @@ public class ReceiveService { System.out.println("input1 receive again: " + receiveMsg); } + @StreamListener("input4") + public void receiveTransactionalMsg(String transactionMsg) { + System.out.println("input4 receive transaction msg: " + transactionMsg); + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java index 2d4deda6..f736a0e5 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java @@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ Source.class, MySink.class }) +@EnableBinding({ MySource.class, MySink.class }) public class RocketMQApplication { public interface MySink { @@ -29,6 +31,16 @@ public class RocketMQApplication { @Input("input3") SubscribableChannel input3(); + @Input("input4") + SubscribableChannel input4(); + } + + public interface MySource { + @Output("output1") + MessageChannel output1(); + + @Output("output2") + MessageChannel output2(); } public static void main(String[] args) { @@ -40,6 +52,11 @@ public class RocketMQApplication { return new CustomRunner(); } + @Bean + public CustomRunnerWithTransactional customRunnerWithTransactional() { + return new CustomRunnerWithTransactional(); + } + public static class CustomRunner implements CommandLineRunner { @Autowired private SenderService senderService; @@ -62,4 +79,21 @@ public class RocketMQApplication { } } + public static class CustomRunnerWithTransactional implements CommandLineRunner { + @Autowired + private SenderService senderService; + + @Override + public void run(String... args) throws Exception { + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg1", false); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg2", true); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg3", true); + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg4", false); + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index eaf0001f..e84ada23 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -5,7 +5,7 @@ import java.util.stream.Stream; import org.apache.rocketmq.common.message.MessageConst; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils; public class SenderService { @Autowired - private Source source; + private MySource source; public void send(String msg) throws Exception { - source.output().send(MessageBuilder.withPayload(msg).build()); + source.output1().send(MessageBuilder.withPayload(msg).build()); } public void sendWithTags(T msg, String tag) throws Exception { Message message = MessageBuilder.createMessage(msg, new MessageHeaders(Stream.of(tag).collect(Collectors .toMap(str -> MessageConst.PROPERTY_TAGS, String::toString)))); - source.output().send(message); + source.output1().send(message); } public void sendObject(T msg, String tag) throws Exception { @@ -37,7 +37,17 @@ public class SenderService { .setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); - source.output().send(message); + source.output1().send(message); + } + + public void sendTransactionalMsg(T msg, boolean error) throws Exception { + MessageBuilder builder = MessageBuilder.withPayload(msg) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); + if (error) { + builder.setHeader("test", "1"); + } + Message message = builder.build(); + source.output2().send(message); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index c92dd24a..3bfe511f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 -spring.cloud.stream.bindings.output.destination=test-topic -spring.cloud.stream.bindings.output.content-type=application/json +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter +spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain @@ -26,6 +32,11 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 +spring.cloud.stream.bindings.input4.destination=TransactionTopic +spring.cloud.stream.bindings.input4.content-type=text/plain +spring.cloud.stream.bindings.input4.group=transaction-group +spring.cloud.stream.bindings.input4.consumer.concurrency=210 + spring.application.name=rocketmq-example server.port=28081 From 5643c952a83bbf233d9f20230092ed511f899336 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 15:03:53 +0800 Subject: [PATCH 3/3] update docs --- .../src/main/asciidoc-zh/rocketmq.adoc | 7 +++++-- .../src/main/asciidoc-zh/sentinel.adoc | 8 ++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc index 5443b7dd..3724fd41 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc @@ -164,6 +164,9 @@ Provider端支持的配置: ^|配置项 ^|含义 ^| 默认值 |`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true |`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4) +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`| +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`| |==== Consumer端支持的配置: @@ -173,8 +176,8 @@ Consumer端支持的配置: |==== ^|配置项 ^|含义| 默认值 |`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割| -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息| +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤,订阅所有消息)| +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)| |`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false |`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false |==== diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc index ed1b0d83..c1e699b1 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc @@ -142,7 +142,7 @@ class EchoServiceFallback implements EchoService { } ``` -NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://requesturl +NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:protocol://requesturl `EchoService` 接口中方法 `echo` 对应的资源名为 `GET:http://service-provider/echo/{str}`。 @@ -150,17 +150,17 @@ NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://reque ### RestTemplate 支持 -Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。 +Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelRestTemplate` 注解。 ```java @Bean -@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class) +@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class) public RestTemplate restTemplate() { return new RestTemplate(); } ``` -`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。 +`@SentinelRestTemplate` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。 限流的资源规则提供两种粒度: