diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index ded2be10..1321c84b 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -92,6 +92,9 @@ public class RocketMQMessageChannelBinder extends
TransactionCheckListener.class));
}
+ if (errorChannel != null) {
+ messageHandler.setSendFailureChannel(errorChannel);
+ }
return messageHandler;
}
else {
@@ -107,7 +110,7 @@ public class RocketMQMessageChannelBinder extends
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
- "'group must be configured for channel + " + destination.getName());
+ "'group must be configured for channel " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java
new file mode 100644
index 00000000..d8d7fc5d
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq.exception;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+
+/**
+ * An exception that is the payload of an {@code ErrorMessage} when occurs send failure.
+ *
+ * @author Jim
+ * @since 0.2.2
+ */
+public class RocketMQSendFailureException extends MessagingException {
+
+ private final org.apache.rocketmq.common.message.Message rocketmqMsg;
+
+ public RocketMQSendFailureException(Message> message,
+ org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) {
+ super(message, cause);
+ this.rocketmqMsg = rocketmqMsg;
+ }
+
+ public org.apache.rocketmq.common.message.Message getRocketmqMsg() {
+ return rocketmqMsg;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]";
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
index 7b70c067..55861e37 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
@@ -53,6 +53,7 @@ import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
+import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
@@ -71,6 +72,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private RecoveryCallback extends Object> recoveryCallback;
+ private DefaultMQPushConsumer consumer;
+
+ private CloudStreamMessageListener listener;
+
private final ExtendedConsumerProperties consumerProperties;
private final String destination;
@@ -90,6 +95,31 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
this.instrumentationManager = instrumentationManager;
}
+ @Override
+ protected void onInit() {
+ if (consumerProperties == null
+ || !consumerProperties.getExtension().getEnabled()) {
+ return;
+ }
+ super.onInit();
+ if (this.retryTemplate != null) {
+ Assert.state(getErrorChannel() == null,
+ "Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ + "send an error message when retries are exhausted");
+ }
+ this.consumer = consumersManager.getOrCreateConsumer(group, destination,
+ consumerProperties);
+
+ Boolean isOrderly = consumerProperties.getExtension().getOrderly();
+ this.listener = isOrderly ? new CloudStreamMessageListenerOrderly()
+ : new CloudStreamMessageListenerConcurrently();
+
+ if (retryTemplate != null) {
+ this.retryTemplate.registerListener(this.listener);
+ }
+ }
+
@Override
protected void doStart() {
if (consumerProperties == null
@@ -98,18 +128,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
String tags = consumerProperties.getExtension().getTags();
- Boolean isOrderly = consumerProperties.getExtension().getOrderly();
-
- DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
- destination, consumerProperties);
-
- final CloudStreamMessageListener listener = isOrderly
- ? new CloudStreamMessageListenerOrderly()
- : new CloudStreamMessageListenerConcurrently();
-
- if (retryTemplate != null) {
- retryTemplate.registerListener(listener);
- }
Set tagsSet = tags == null ? new HashSet<>()
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
@@ -122,11 +140,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
- consumer.subscribe(destination, MessageSelector
+ this.consumer.subscribe(destination, MessageSelector
.bySql(consumerProperties.getExtension().getSql()));
}
else {
- consumer.subscribe(destination, String.join(" || ", tagsSet));
+ this.consumer.subscribe(destination, String.join(" || ", tagsSet));
}
Optional.ofNullable(consumerInstrumentation)
.ifPresent(c -> c.markStartedSuccessfully());
@@ -139,7 +157,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
}
- consumer.registerMessageListener(listener);
+ this.consumer.registerMessageListener(this.listener);
try {
consumersManager.startConsumer(group);
@@ -214,10 +232,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
});
- throw new RuntimeException(
- "RocketMQ Message hasn't been processed successfully. Caused by ",
- e);
}
+ return null;
}
private Acknowledgement doSendMsgs(final List msgs,
@@ -229,18 +245,24 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
logger.debug(retryInfo + "consuming msg:\n" + msg);
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
Acknowledgement acknowledgement = new Acknowledgement();
- Message toChannel = MessageBuilder.withPayload(msg.getBody())
- .setHeaders(new RocketMQMessageHeaderAccessor()
- .withAcknowledgment(acknowledgement)
- .withTags(msg.getTags()).withKeys(msg.getKeys())
- .withFlag(msg.getFlag()).withRocketMessage(msg))
- .build();
+ Message toChannel = convertMessagingFromRocketMQMsg(msg,
+ acknowledgement);
acknowledgements.add(acknowledgement);
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
});
return acknowledgements.get(0);
}
+ private Message convertMessagingFromRocketMQMsg(MessageExt msg,
+ Acknowledgement acknowledgement) {
+ return MessageBuilder.withPayload(msg.getBody())
+ .setHeaders(new RocketMQMessageHeaderAccessor()
+ .withAcknowledgment(acknowledgement).withTags(msg.getTags())
+ .withKeys(msg.getKeys()).withFlag(msg.getFlag())
+ .withRocketMessage(msg))
+ .build();
+ }
+
@Override
public boolean open(RetryContext context,
RetryCallback callback) {
@@ -283,9 +305,16 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeConcurrentlyStatus consumeMessage(final List msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
- context.setDelayLevelWhenNextConsume(
- acknowledgement.getConsumeConcurrentlyDelayLevel());
- return acknowledgement.getConsumeConcurrentlyStatus();
+ if (acknowledgement != null) {
+ context.setDelayLevelWhenNextConsume(
+ acknowledgement.getConsumeConcurrentlyDelayLevel());
+ return acknowledgement.getConsumeConcurrentlyStatus();
+ }
+ else {
+ context.setDelayLevelWhenNextConsume(consumerProperties.getExtension()
+ .getError().getDelayLevelWhenNextConsume());
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
}
}
@@ -296,11 +325,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeOrderlyStatus consumeMessage(List msgs,
ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
- context.setSuspendCurrentQueueTimeMillis(
- (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
- return acknowledgement.getConsumeOrderlyStatus();
+ if (acknowledgement != null) {
+ context.setSuspendCurrentQueueTimeMillis(
+ (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
+ return acknowledgement.getConsumeOrderlyStatus();
+ }
+ else {
+ context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension()
+ .getError().getSuspendCurrentQueueTimeMillis());
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
}
-
}
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index ba8054ad..6efe7955 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -33,20 +33,28 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
+import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.integration.support.DefaultErrorMessageStrategy;
+import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.ErrorMessage;
+import org.springframework.util.Assert;
/**
* @author Jim
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
+ private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
+
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
@@ -57,6 +65,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private TransactionCheckListener transactionCheckListener;
+ private MessageChannel sendFailureChannel;
+
private final ExtendedProducerProperties producerProperties;
private final String destination;
@@ -88,6 +98,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
producer = new DefaultMQProducer(destination);
}
+ producer.setVipChannelEnabled(
+ producerProperties.getExtension().getVipChannelEnabled());
+
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
producerInstrumentation = manager.getProducerInstrumentation(destination);
manager.addHealthInstrumentation(producerInstrumentation);
@@ -131,8 +144,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
protected void handleMessageInternal(org.springframework.messaging.Message> message)
throws Exception {
+ Message toSend = null;
try {
- Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[]) message.getPayload());
}
@@ -166,7 +179,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
- throw new MQClientException("message hasn't been sent", null);
+ if (getSendFailureChannel() != null) {
+ this.getSendFailureChannel().send(message);
+ }
+ else {
+ throw new RocketMQSendFailureException(message, toSend,
+ new MQClientException("message hasn't been sent", null));
+ }
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
@@ -184,18 +203,60 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
.ifPresent(p -> p.markSentFailure());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
- throw new MessagingException(e.getMessage(), e);
+ if (getSendFailureChannel() != null) {
+ getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(
+ new RocketMQSendFailureException(message, toSend, e), null));
+ }
+ else {
+ throw new RocketMQSendFailureException(message, toSend, e);
+ }
}
}
+ /**
+ * Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in
+ * {@link DefaultMQProducer#sendMessageInTransaction}.
+ * @param localTransactionExecuter the executer running when produce msg.
+ */
public void setLocalTransactionExecuter(
LocalTransactionExecuter localTransactionExecuter) {
this.localTransactionExecuter = localTransactionExecuter;
}
+ /**
+ * Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in
+ * {@link TransactionMQProducer#setTransactionCheckListener}.
+ * @param transactionCheckListener the listener set in {@link TransactionMQProducer}.
+ */
public void setTransactionCheckListener(
TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
+
+ /**
+ * Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
+ * to this channel with a payload of a {@link RocketMQSendFailureException} with the
+ * failed message and cause.
+ * @param sendFailureChannel the failure channel.
+ * @since 0.2.2
+ */
+ public void setSendFailureChannel(MessageChannel sendFailureChannel) {
+ this.sendFailureChannel = sendFailureChannel;
+ }
+
+ /**
+ * Set the error message strategy implementation to use when sending error messages
+ * after send failures. Cannot be null.
+ * @param errorMessageStrategy the implementation.
+ * @since 0.2.2
+ */
+ public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
+ Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
+ this.errorMessageStrategy = errorMessageStrategy;
+ }
+
+ public MessageChannel getSendFailureChannel() {
+ return sendFailureChannel;
+ }
}
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
index 6343f153..8202fefb 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
@@ -52,6 +52,40 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true;
+ private Error error;
+
+ public static class Error {
+
+ /**
+ * Reconsume later timeMillis in ConsumeOrderlyContext.
+ */
+ private Long suspendCurrentQueueTimeMillis = 1000L;
+
+ /**
+ * Message consume retry strategy in ConsumeConcurrentlyContext.
+ *
+ * -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client
+ * control retry frequency
+ */
+ private Integer delayLevelWhenNextConsume = 0;
+
+ public Long getSuspendCurrentQueueTimeMillis() {
+ return suspendCurrentQueueTimeMillis;
+ }
+
+ public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) {
+ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ }
+
+ public Integer getDelayLevelWhenNextConsume() {
+ return delayLevelWhenNextConsume;
+ }
+
+ public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) {
+ this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ }
+ }
+
public String getTags() {
return tags;
}
@@ -91,4 +125,12 @@ public class RocketMQConsumerProperties {
public void setBroadcasting(Boolean broadcasting) {
this.broadcasting = broadcasting;
}
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
index 1a05ad50..b526d960 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
@@ -45,6 +45,8 @@ public class RocketMQProducerProperties {
*/
private String transactionCheckListener;
+ private Boolean vipChannelEnabled = true;
+
public Boolean getEnabled() {
return enabled;
}
@@ -84,4 +86,12 @@ public class RocketMQProducerProperties {
public void setTransactionCheckListener(String transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
+
+ public Boolean getVipChannelEnabled() {
+ return vipChannelEnabled;
+ }
+
+ public void setVipChannelEnabled(Boolean vipChannelEnabled) {
+ this.vipChannelEnabled = vipChannelEnabled;
+ }
}