From 6d3a26702aad98192aed923695ca4c845426f1ab Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 14:52:40 +0800 Subject: [PATCH] Polish #134 --- .../rocketmq/RocketMQBinderConstants.java | 2 + .../RocketMQMessageChannelBinder.java | 65 ++++++++++++++++++- .../RocketMQMessageHeaderAccessor.java | 10 +++ .../integration/RocketMQMessageHandler.java | 48 ++++++++++++-- .../RocketMQProducerProperties.java | 37 +++++++++++ 5 files changed, 153 insertions(+), 9 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 65128be4..66517074 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -32,6 +32,8 @@ public interface RocketMQBinderConstants { String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; + String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG"; + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; /** diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index d5109cda..ded2be10 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,9 @@ package org.springframework.cloud.stream.binder.rocketmq; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; @@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.util.ClassUtils; /** * @author Timur Valiev @@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - return new RocketMQMessageHandler(destination.getName(), - producerProperties.getExtension(), + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( + destination.getName(), producerProperties, rocketBinderConfigurationProperties, instrumentationManager); + if (producerProperties.getExtension().getTransactional()) { + // transaction message check LocalTransactionExecuter + messageHandler.setLocalTransactionExecuter( + getClassConfiguration(destination.getName(), + producerProperties.getExtension().getExecuter(), + LocalTransactionExecuter.class)); + // transaction message check TransactionCheckListener + messageHandler.setTransactionCheckListener( + getClassConfiguration(destination.getName(), + producerProperties.getExtension() + .getTransactionCheckListener(), + TransactionCheckListener.class)); + } + + return messageHandler; } else { throw new RuntimeException("Binding for channel " + destination.getName() @@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { return extendedBindingProperties.getExtendedProducerProperties(channelName); } + + private T getClassConfiguration(String destName, String className, + Class interfaceClass) { + if (StringUtils.isEmpty(className)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, should set " + + interfaceClass.getSimpleName() + " configuration" + + interfaceClass.getSimpleName() + " should be set, like " + + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour" + + interfaceClass.getSimpleName() + "'"); + } + else if (StringUtils.isNotEmpty(className)) { + Class fieldClass; + // check class exists + try { + fieldClass = ClassUtils.forName(className, + RocketMQMessageChannelBinder.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " class is not found"); + } + // check interface incompatible + if (!interfaceClass.isAssignableFrom(fieldClass)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " is incompatible with " + interfaceClass.getSimpleName() + + " interface"); + } + try { + return (T) fieldClass.newInstance(); + } + catch (Exception e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " instance error", e); + } + } + return null; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java index 4f0ca012..7707c07e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java @@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG; import java.util.HashMap; import java.util.Map; @@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { return this; } + public Object getTransactionalArg() { + return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG); + } + + public Object withTransactionalArg(Object arg) { + setHeader(ROCKET_TRANSACTIONAL_ARG, arg); + return this; + } + public SendResult getSendResult() { return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index ece595e6..ba8054ad 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -23,10 +23,14 @@ import java.util.Optional; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private InstrumentationManager instrumentationManager; - private final RocketMQProducerProperties producerProperties; + private LocalTransactionExecuter localTransactionExecuter; + + private TransactionCheckListener transactionCheckListener; + + private final ExtendedProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - protected volatile boolean running = false; + private volatile boolean running = false; public RocketMQMessageHandler(String destination, - RocketMQProducerProperties producerProperties, + ExtendedProducerProperties producerProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, InstrumentationManager instrumentationManager) { this.destination = destination; @@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li @Override public void start() { - producer = new DefaultMQProducer(destination); + if (producerProperties.getExtension().getTransactional()) { + producer = new TransactionMQProducer(destination); + if (transactionCheckListener != null) { + ((TransactionMQProducer) producer) + .setTransactionCheckListener(transactionCheckListener); + } + } + else { + producer = new DefaultMQProducer(destination); + } Optional.ofNullable(instrumentationManager).ifPresent(manager -> { producerInstrumentation = manager.getProducerInstrumentation(destination); @@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - if (producerProperties.getMaxMessageSize() > 0) { - producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + if (producerProperties.getExtension().getMaxMessageSize() > 0) { + producer.setMaxMessageSize( + producerProperties.getExtension().getMaxMessageSize()); } try { @@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li toSend.putUserProperty(entry.getKey(), entry.getValue()); } - SendResult sendRes = producer.send(toSend); + SendResult sendRes; + if (producerProperties.getExtension().getTransactional()) { + sendRes = producer.sendMessageInTransaction(toSend, + localTransactionExecuter, headerAccessor.getTransactionalArg()); + } + else { + sendRes = producer.send(toSend); + } if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { throw new MQClientException("message hasn't been sent", null); @@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } + public void setLocalTransactionExecuter( + LocalTransactionExecuter localTransactionExecuter) { + this.localTransactionExecuter = localTransactionExecuter; + } + + public void setTransactionCheckListener( + TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 6afefd3b..1a05ad50 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -17,6 +17,8 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; /** * @author Timur Valiev @@ -31,6 +33,18 @@ public class RocketMQProducerProperties { */ private Integer maxMessageSize = 0; + private Boolean transactional = false; + + /** + * full class name of {@link LocalTransactionExecuter} + */ + private String executer; + + /** + * full class name of {@link TransactionCheckListener} + */ + private String transactionCheckListener; + public Boolean getEnabled() { return enabled; } @@ -47,4 +61,27 @@ public class RocketMQProducerProperties { this.maxMessageSize = maxMessageSize; } + public Boolean getTransactional() { + return transactional; + } + + public void setTransactional(Boolean transactional) { + this.transactional = transactional; + } + + public String getExecuter() { + return executer; + } + + public void setExecuter(String executer) { + this.executer = executer; + } + + public String getTransactionCheckListener() { + return transactionCheckListener; + } + + public void setTransactionCheckListener(String transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } }