1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Merge pull request #157 from fangjian0423/master

RocketMQ Binder support transactional message
This commit is contained in:
xiaojing 2018-12-12 17:08:48 +08:00 committed by GitHub
commit a2ed75676d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 269 additions and 24 deletions

View File

@ -164,6 +164,9 @@ Provider端支持的配置
^|配置项 ^|含义 ^| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效RocketMQ 默认值为4M = 1024 * 1024 * 4)
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`|
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`|
|====
Consumer端支持的配置
@ -173,8 +176,8 @@ Consumer端支持的配置
|====
^|配置项 ^|含义| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤订阅所有消息)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容sql的优先级更高)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false
|====

View File

@ -142,7 +142,7 @@ class EchoServiceFallback implements EchoService {
}
```
NOTE: Feign 对应的接口中的资源名策略定义httpmethod:http://requesturl
NOTE: Feign 对应的接口中的资源名策略定义httpmethod:protocol://requesturl
`EchoService` 接口中方法 `echo` 对应的资源名为 `GET:http://service-provider/echo/{str}`。
@ -150,17 +150,17 @@ NOTE: Feign 对应的接口中的资源名策略定义httpmethod:http://reque
### RestTemplate 支持
Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。
Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelRestTemplate` 注解。
```java
@Bean
@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
public RestTemplate restTemplate() {
return new RestTemplate();
}
```
`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
`@SentinelRestTemplate` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
限流的资源规则提供两种粒度:

View File

@ -0,0 +1,18 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class MyTransactionCheckListener implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("TransactionCheckListener: " + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@ -0,0 +1,20 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class MyTransactionExecuter implements LocalTransactionExecuter {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
if ("1".equals(msg.getUserProperty("test"))) {
System.out.println(new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println(new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@ -30,4 +30,9 @@ public class ReceiveService {
System.out.println("input1 receive again: " + receiveMsg);
}
@StreamListener("input4")
public void receiveTransactionalMsg(String transactionMsg) {
System.out.println("input4 receive transaction msg: " + transactionMsg);
}
}

View File

@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@SpringBootApplication
@EnableBinding({ Source.class, MySink.class })
@EnableBinding({ MySource.class, MySink.class })
public class RocketMQApplication {
public interface MySink {
@ -29,6 +31,16 @@ public class RocketMQApplication {
@Input("input3")
SubscribableChannel input3();
@Input("input4")
SubscribableChannel input4();
}
public interface MySource {
@Output("output1")
MessageChannel output1();
@Output("output2")
MessageChannel output2();
}
public static void main(String[] args) {
@ -40,6 +52,11 @@ public class RocketMQApplication {
return new CustomRunner();
}
@Bean
public CustomRunnerWithTransactional customRunnerWithTransactional() {
return new CustomRunnerWithTransactional();
}
public static class CustomRunner implements CommandLineRunner {
@Autowired
private SenderService senderService;
@ -62,4 +79,21 @@ public class RocketMQApplication {
}
}
public static class CustomRunnerWithTransactional implements CommandLineRunner {
@Autowired
private SenderService senderService;
@Override
public void run(String... args) throws Exception {
// COMMIT_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg1", false);
// ROLLBACK_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg2", true);
// ROLLBACK_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg3", true);
// COMMIT_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg4", false);
}
}
}

View File

@ -5,7 +5,7 @@ import java.util.stream.Stream;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils;
public class SenderService {
@Autowired
private Source source;
private MySource source;
public void send(String msg) throws Exception {
source.output().send(MessageBuilder.withPayload(msg).build());
source.output1().send(MessageBuilder.withPayload(msg).build());
}
public <T> void sendWithTags(T msg, String tag) throws Exception {
Message message = MessageBuilder.createMessage(msg,
new MessageHeaders(Stream.of(tag).collect(Collectors
.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
source.output().send(message);
source.output1().send(message);
}
public <T> void sendObject(T msg, String tag) throws Exception {
@ -37,7 +37,17 @@ public class SenderService {
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
source.output().send(message);
source.output1().send(message);
}
public <T> void sendTransactionalMsg(T msg, boolean error) throws Exception {
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
if (error) {
builder.setHeader("test", "1");
}
Message message = builder.build();
source.output2().send(message);
}
}

View File

@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output1.destination=test-topic
spring.cloud.stream.bindings.output1.content-type=application/json
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter
spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
@ -26,6 +32,11 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=210
spring.application.name=rocketmq-example
server.port=28081

View File

@ -32,6 +32,8 @@ public interface RocketMQBinderConstants {
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG";
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**

View File

@ -16,6 +16,9 @@
package org.springframework.cloud.stream.binder.rocketmq;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.ClassUtils;
/**
* @author Timur Valiev
@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
return new RocketMQMessageHandler(destination.getName(),
producerProperties.getExtension(),
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
destination.getName(), producerProperties,
rocketBinderConfigurationProperties, instrumentationManager);
if (producerProperties.getExtension().getTransactional()) {
// transaction message check LocalTransactionExecuter
messageHandler.setLocalTransactionExecuter(
getClassConfiguration(destination.getName(),
producerProperties.getExtension().getExecuter(),
LocalTransactionExecuter.class));
// transaction message check TransactionCheckListener
messageHandler.setTransactionCheckListener(
getClassConfiguration(destination.getName(),
producerProperties.getExtension()
.getTransactionCheckListener(),
TransactionCheckListener.class));
}
return messageHandler;
}
else {
throw new RuntimeException("Binding for channel " + destination.getName()
@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
}
private <T> T getClassConfiguration(String destName, String className,
Class<T> interfaceClass) {
if (StringUtils.isEmpty(className)) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, should set "
+ interfaceClass.getSimpleName() + " configuration"
+ interfaceClass.getSimpleName() + " should be set, like "
+ "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour"
+ interfaceClass.getSimpleName() + "'");
}
else if (StringUtils.isNotEmpty(className)) {
Class fieldClass;
// check class exists
try {
fieldClass = ClassUtils.forName(className,
RocketMQMessageChannelBinder.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, but " + className
+ " class is not found");
}
// check interface incompatible
if (!interfaceClass.isAssignableFrom(fieldClass)) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, but " + className
+ " is incompatible with " + interfaceClass.getSimpleName()
+ " interface");
}
try {
return (T) fieldClass.newInstance();
}
catch (Exception e) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, but " + className
+ " instance error", e);
}
}
return null;
}
}

View File

@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG;
import java.util.HashMap;
import java.util.Map;
@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
return this;
}
public Object getTransactionalArg() {
return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG);
}
public Object withTransactionalArg(Object arg) {
setHeader(ROCKET_TRANSACTIONAL_ARG, arg);
return this;
}
public SendResult getSendResult() {
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
}

View File

@ -23,10 +23,14 @@ import java.util.Optional;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private InstrumentationManager instrumentationManager;
private final RocketMQProducerProperties producerProperties;
private LocalTransactionExecuter localTransactionExecuter;
private TransactionCheckListener transactionCheckListener;
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
protected volatile boolean running = false;
private volatile boolean running = false;
public RocketMQMessageHandler(String destination,
RocketMQProducerProperties producerProperties,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
this.destination = destination;
@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
public void start() {
producer = new DefaultMQProducer(destination);
if (producerProperties.getExtension().getTransactional()) {
producer = new TransactionMQProducer(destination);
if (transactionCheckListener != null) {
((TransactionMQProducer) producer)
.setTransactionCheckListener(transactionCheckListener);
}
}
else {
producer = new DefaultMQProducer(destination);
}
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
producerInstrumentation = manager.getProducerInstrumentation(destination);
@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
if (producerProperties.getMaxMessageSize() > 0) {
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
if (producerProperties.getExtension().getMaxMessageSize() > 0) {
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
}
try {
@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
toSend.putUserProperty(entry.getKey(), entry.getValue());
}
SendResult sendRes = producer.send(toSend);
SendResult sendRes;
if (producerProperties.getExtension().getTransactional()) {
sendRes = producer.sendMessageInTransaction(toSend,
localTransactionExecuter, headerAccessor.getTransactionalArg());
}
else {
sendRes = producer.send(toSend);
}
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new MQClientException("message hasn't been sent", null);
@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
public void setLocalTransactionExecuter(
LocalTransactionExecuter localTransactionExecuter) {
this.localTransactionExecuter = localTransactionExecuter;
}
public void setTransactionCheckListener(
TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
}

View File

@ -17,6 +17,8 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
/**
* @author Timur Valiev
@ -31,6 +33,18 @@ public class RocketMQProducerProperties {
*/
private Integer maxMessageSize = 0;
private Boolean transactional = false;
/**
* full class name of {@link LocalTransactionExecuter}
*/
private String executer;
/**
* full class name of {@link TransactionCheckListener}
*/
private String transactionCheckListener;
public Boolean getEnabled() {
return enabled;
}
@ -47,4 +61,27 @@ public class RocketMQProducerProperties {
this.maxMessageSize = maxMessageSize;
}
public Boolean getTransactional() {
return transactional;
}
public void setTransactional(Boolean transactional) {
this.transactional = transactional;
}
public String getExecuter() {
return executer;
}
public void setExecuter(String executer) {
this.executer = executer;
}
public String getTransactionCheckListener() {
return transactionCheckListener;
}
public void setTransactionCheckListener(String transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
}