diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 088ee58c..30eb3760 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -43,9 +43,13 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProdu import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.AcknowledgmentCallback.Status; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -218,6 +222,28 @@ public class RocketMQMessageChannelBinder extends true)); } + @Override + protected MessageHandler getPolledConsumerErrorMessageHandler( + ConsumerDestination destination, String group, + ExtendedConsumerProperties properties) { + return message -> { + if (message.getPayload() instanceof MessagingException) { + AcknowledgmentCallback ack = StaticMessageHeaderAccessor + .getAcknowledgmentCallback( + ((MessagingException) message.getPayload()) + .getFailedMessage()); + if (ack != null) { + if (properties.getExtension().shouldRequeue()) { + ack.acknowledge(Status.REQUEUE); + } + else { + ack.acknowledge(Status.REJECT); + } + } + } + }; + } + @Override public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { return extendedBindingProperties.getExtendedConsumerProperties(channelName); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 00b41a03..343c9056 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -51,7 +52,9 @@ public class RocketMQConsumerProperties { private Boolean orderly = false; /** - * for concurrently listener. message consume retry strategy + * for concurrently listener. message consume retry strategy. see + * {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or + * discard, see {@link this#shouldRequeue}), others means requeue */ private int delayLevelWhenNextConsume = 0; @@ -141,4 +144,8 @@ public class RocketMQConsumerProperties { public void setFromStore(boolean fromStore) { this.fromStore = fromStore; } + + public boolean shouldRequeue() { + return delayLevelWhenNextConsume != -1; + } }