mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Polish #134
This commit is contained in:
parent
098b103166
commit
6d3a26702a
@ -32,6 +32,8 @@ public interface RocketMQBinderConstants {
|
|||||||
|
|
||||||
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
|
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
|
||||||
|
|
||||||
|
String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG";
|
||||||
|
|
||||||
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -16,6 +16,9 @@
|
|||||||
|
|
||||||
package org.springframework.cloud.stream.binder.rocketmq;
|
package org.springframework.cloud.stream.binder.rocketmq;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||||
|
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||||
@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
|||||||
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.core.MessageProducer;
|
||||||
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageChannel;
|
||||||
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
import org.springframework.util.ClassUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Timur Valiev
|
* @author Timur Valiev
|
||||||
@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||||
MessageChannel errorChannel) throws Exception {
|
MessageChannel errorChannel) throws Exception {
|
||||||
if (producerProperties.getExtension().getEnabled()) {
|
if (producerProperties.getExtension().getEnabled()) {
|
||||||
return new RocketMQMessageHandler(destination.getName(),
|
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||||
producerProperties.getExtension(),
|
destination.getName(), producerProperties,
|
||||||
rocketBinderConfigurationProperties, instrumentationManager);
|
rocketBinderConfigurationProperties, instrumentationManager);
|
||||||
|
if (producerProperties.getExtension().getTransactional()) {
|
||||||
|
// transaction message check LocalTransactionExecuter
|
||||||
|
messageHandler.setLocalTransactionExecuter(
|
||||||
|
getClassConfiguration(destination.getName(),
|
||||||
|
producerProperties.getExtension().getExecuter(),
|
||||||
|
LocalTransactionExecuter.class));
|
||||||
|
// transaction message check TransactionCheckListener
|
||||||
|
messageHandler.setTransactionCheckListener(
|
||||||
|
getClassConfiguration(destination.getName(),
|
||||||
|
producerProperties.getExtension()
|
||||||
|
.getTransactionCheckListener(),
|
||||||
|
TransactionCheckListener.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageHandler;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
throw new RuntimeException("Binding for channel " + destination.getName()
|
throw new RuntimeException("Binding for channel " + destination.getName()
|
||||||
@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
|
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
|
||||||
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T> T getClassConfiguration(String destName, String className,
|
||||||
|
Class<T> interfaceClass) {
|
||||||
|
if (StringUtils.isEmpty(className)) {
|
||||||
|
throw new RuntimeException("Binding for channel " + destName
|
||||||
|
+ " using transactional message, should set "
|
||||||
|
+ interfaceClass.getSimpleName() + " configuration"
|
||||||
|
+ interfaceClass.getSimpleName() + " should be set, like "
|
||||||
|
+ "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour"
|
||||||
|
+ interfaceClass.getSimpleName() + "'");
|
||||||
|
}
|
||||||
|
else if (StringUtils.isNotEmpty(className)) {
|
||||||
|
Class fieldClass;
|
||||||
|
// check class exists
|
||||||
|
try {
|
||||||
|
fieldClass = ClassUtils.forName(className,
|
||||||
|
RocketMQMessageChannelBinder.class.getClassLoader());
|
||||||
|
}
|
||||||
|
catch (ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException("Binding for channel " + destName
|
||||||
|
+ " using transactional message, but " + className
|
||||||
|
+ " class is not found");
|
||||||
|
}
|
||||||
|
// check interface incompatible
|
||||||
|
if (!interfaceClass.isAssignableFrom(fieldClass)) {
|
||||||
|
throw new RuntimeException("Binding for channel " + destName
|
||||||
|
+ " using transactional message, but " + className
|
||||||
|
+ " is incompatible with " + interfaceClass.getSimpleName()
|
||||||
|
+ " interface");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return (T) fieldClass.newInstance();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException("Binding for channel " + destName
|
||||||
|
+ " using transactional message, but " + className
|
||||||
|
+ " instance error", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
|
|||||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
|
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
|
||||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
|
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
|
||||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
|
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
|
||||||
|
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Object getTransactionalArg() {
|
||||||
|
return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object withTransactionalArg(Object arg) {
|
||||||
|
setHeader(ROCKET_TRANSACTIONAL_ARG, arg);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public SendResult getSendResult() {
|
public SendResult getSendResult() {
|
||||||
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
|
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
|
||||||
}
|
}
|
||||||
|
@ -23,10 +23,14 @@ import java.util.Optional;
|
|||||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
import org.apache.rocketmq.client.producer.SendStatus;
|
import org.apache.rocketmq.client.producer.SendStatus;
|
||||||
|
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||||
|
import org.apache.rocketmq.client.producer.TransactionMQProducer;
|
||||||
import org.apache.rocketmq.common.message.Message;
|
import org.apache.rocketmq.common.message.Message;
|
||||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||||
@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
|
|
||||||
private InstrumentationManager instrumentationManager;
|
private InstrumentationManager instrumentationManager;
|
||||||
|
|
||||||
private final RocketMQProducerProperties producerProperties;
|
private LocalTransactionExecuter localTransactionExecuter;
|
||||||
|
|
||||||
|
private TransactionCheckListener transactionCheckListener;
|
||||||
|
|
||||||
|
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
|
||||||
|
|
||||||
private final String destination;
|
private final String destination;
|
||||||
|
|
||||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||||
|
|
||||||
protected volatile boolean running = false;
|
private volatile boolean running = false;
|
||||||
|
|
||||||
public RocketMQMessageHandler(String destination,
|
public RocketMQMessageHandler(String destination,
|
||||||
RocketMQProducerProperties producerProperties,
|
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||||
InstrumentationManager instrumentationManager) {
|
InstrumentationManager instrumentationManager) {
|
||||||
this.destination = destination;
|
this.destination = destination;
|
||||||
@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
if (producerProperties.getExtension().getTransactional()) {
|
||||||
|
producer = new TransactionMQProducer(destination);
|
||||||
|
if (transactionCheckListener != null) {
|
||||||
|
((TransactionMQProducer) producer)
|
||||||
|
.setTransactionCheckListener(transactionCheckListener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
producer = new DefaultMQProducer(destination);
|
producer = new DefaultMQProducer(destination);
|
||||||
|
}
|
||||||
|
|
||||||
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
||||||
producerInstrumentation = manager.getProducerInstrumentation(destination);
|
producerInstrumentation = manager.getProducerInstrumentation(destination);
|
||||||
@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
|
|
||||||
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||||
|
|
||||||
if (producerProperties.getMaxMessageSize() > 0) {
|
if (producerProperties.getExtension().getMaxMessageSize() > 0) {
|
||||||
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
|
producer.setMaxMessageSize(
|
||||||
|
producerProperties.getExtension().getMaxMessageSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
toSend.putUserProperty(entry.getKey(), entry.getValue());
|
toSend.putUserProperty(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
SendResult sendRes = producer.send(toSend);
|
SendResult sendRes;
|
||||||
|
if (producerProperties.getExtension().getTransactional()) {
|
||||||
|
sendRes = producer.sendMessageInTransaction(toSend,
|
||||||
|
localTransactionExecuter, headerAccessor.getTransactionalArg());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
sendRes = producer.send(toSend);
|
||||||
|
}
|
||||||
|
|
||||||
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||||
throw new MQClientException("message hasn't been sent", null);
|
throw new MQClientException("message hasn't been sent", null);
|
||||||
@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setLocalTransactionExecuter(
|
||||||
|
LocalTransactionExecuter localTransactionExecuter) {
|
||||||
|
this.localTransactionExecuter = localTransactionExecuter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTransactionCheckListener(
|
||||||
|
TransactionCheckListener transactionCheckListener) {
|
||||||
|
this.transactionCheckListener = transactionCheckListener;
|
||||||
|
}
|
||||||
}
|
}
|
@ -17,6 +17,8 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||||
|
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||||
|
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Timur Valiev
|
* @author Timur Valiev
|
||||||
@ -31,6 +33,18 @@ public class RocketMQProducerProperties {
|
|||||||
*/
|
*/
|
||||||
private Integer maxMessageSize = 0;
|
private Integer maxMessageSize = 0;
|
||||||
|
|
||||||
|
private Boolean transactional = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* full class name of {@link LocalTransactionExecuter}
|
||||||
|
*/
|
||||||
|
private String executer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* full class name of {@link TransactionCheckListener}
|
||||||
|
*/
|
||||||
|
private String transactionCheckListener;
|
||||||
|
|
||||||
public Boolean getEnabled() {
|
public Boolean getEnabled() {
|
||||||
return enabled;
|
return enabled;
|
||||||
}
|
}
|
||||||
@ -47,4 +61,27 @@ public class RocketMQProducerProperties {
|
|||||||
this.maxMessageSize = maxMessageSize;
|
this.maxMessageSize = maxMessageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean getTransactional() {
|
||||||
|
return transactional;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTransactional(Boolean transactional) {
|
||||||
|
this.transactional = transactional;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getExecuter() {
|
||||||
|
return executer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExecuter(String executer) {
|
||||||
|
this.executer = executer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTransactionCheckListener() {
|
||||||
|
return transactionCheckListener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTransactionCheckListener(String transactionCheckListener) {
|
||||||
|
this.transactionCheckListener = transactionCheckListener;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user