diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java index f2c16ed1..67589474 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java @@ -62,4 +62,17 @@ public class Acknowledgement { public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) { this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill; } + + public static Acknowledgement buildOrderlyInstance() { + Acknowledgement acknowledgement = new Acknowledgement(); + acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS); + return acknowledgement; + } + + public static Acknowledgement buildConcurrentlyInstance() { + Acknowledgement acknowledgement = new Acknowledgement(); + acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS); + return acknowledgement; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java deleted file mode 100644 index 8de146d5..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -import org.apache.rocketmq.client.consumer.listener.MessageListener; -import org.apache.rocketmq.common.message.MessageExt; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - -/** - * @author Timur Valiev - * @author Jim - */ -public abstract class CloudStreamMessageListener implements MessageListener { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final InstrumentationManager instrumentationManager; - - private final Consumer sendMsgAction; - - private String topic; - - CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer sendMsgAction) { - this.instrumentationManager = instrumentationManager; - this.sendMsgAction = sendMsgAction; - } - - Acknowledgement consumeMessage(final List msgs) { - List acknowledgements = new ArrayList<>(); - msgs.forEach(msg -> { - logger.info("consuming msg:\n" + msg); - logger.debug("message body:\n" + new String(msg.getBody())); - try { - Acknowledgement acknowledgement = new Acknowledgement(); - Message toChannel = MessageBuilder.withPayload(msg.getBody()). - setHeaders(new RocketMQMessageHeaderAccessor(). - withAcknowledgment(acknowledgement). - withTags(msg.getTags()). - withKeys(msg.getKeys()). - withFlag(msg.getFlag()). - withRocketMessage(msg) - ).build(); - acknowledgements.add(acknowledgement); - sendMsgAction.accept(toChannel); - instrumentationManager.getConsumerInstrumentation(getTopic()) - .markConsumed(); - } catch (Exception e) { - logger.error("RocketMQ Message hasn't been processed successfully. Caused by ", e); - instrumentationManager.getConsumerInstrumentation(getTopic()) - .markConsumedFailure(); - throw e; - } - }); - return acknowledgements.get(0); - } - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java deleted file mode 100644 index 8c4df7d8..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import java.util.List; -import java.util.function.Consumer; - -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.messaging.Message; - -/** - * @author Timur Valiev - * @author Jim - */ -public class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements - MessageListenerConcurrently { - - public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager, - Consumer sendMsgAction) { - super(instrumentationManager, sendMsgAction); - } - - @Override - public ConsumeConcurrentlyStatus consumeMessage(final List msgs, - ConsumeConcurrentlyContext context) { - Acknowledgement acknowledgement = consumeMessage(msgs); - context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel()); - return acknowledgement.getConsumeConcurrentlyStatus(); - } - -} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java deleted file mode 100644 index c63fb9bd..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import java.util.List; -import java.util.function.Consumer; - -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.messaging.Message; - -/** - * @author Timur Valiev - * @author Jim - */ -public class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly { - - public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager, - Consumer sendMsgAction) { - super(instrumentationManager, sendMsgAction); - } - - @Override - public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { - Acknowledgement acknowledgement = consumeMessage(msgs); - context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); - return acknowledgement.getConsumeOrderlyStatus(); - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 8a36168d..c4a4e8d9 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -1,23 +1,40 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.ClassUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListener; -import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerConcurrently; -import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerOrderly; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; +import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.StringUtils; @@ -65,12 +82,15 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties); final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly( - instrumentationManager, msg -> sendMessage(msg)) - : new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg)); - listener.setTopic(destination); + instrumentationManager) + : new CloudStreamMessageListenerConcurrently(instrumentationManager); - Set tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect( - Collectors.toSet()); + if (retryTemplate != null) { + retryTemplate.registerListener(listener); + } + + Set tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim) + .collect(Collectors.toSet()); consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination); instrumentationManager.addHealthInstrumentation(consumerInstrumentation); @@ -110,4 +130,123 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { public void setRecoveryCallback(RecoveryCallback recoveryCallback) { this.recoveryCallback = recoveryCallback; } + + protected class CloudStreamMessageListener implements MessageListener, RetryListener { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final InstrumentationManager instrumentationManager; + + CloudStreamMessageListener(InstrumentationManager instrumentationManager) { + this.instrumentationManager = instrumentationManager; + } + + Acknowledgement consumeMessage(final List msgs) { + boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; + try { + if (enableRetry) { + return RocketMQInboundChannelAdapter.this.retryTemplate.execute( + (RetryCallback)context -> doSendMsgs(msgs, context), + new RecoveryCallback() { + @Override + public Acknowledgement recover(RetryContext context) throws Exception { + RocketMQInboundChannelAdapter.this.recoveryCallback.recover(context); + if (ClassUtils.isAssignable(this.getClass(), MessageListenerConcurrently.class)) { + return Acknowledgement.buildConcurrentlyInstance(); + } else { + return Acknowledgement.buildOrderlyInstance(); + } + } + }); + } else { + Acknowledgement result = doSendMsgs(msgs, null); + instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + return result; + } + } catch (Exception e) { + logger.error("Rocket Message hasn't been processed successfully. Caused by ", e); + instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + throw new RuntimeException("Rocket Message hasn't been processed successfully. Caused by ", e); + } + } + + private Acknowledgement doSendMsgs(final List msgs, RetryContext context) { + List acknowledgements = new ArrayList<>(); + msgs.forEach(msg -> { + String retryInfo = context == null ? "" : "retryCount-" + String.valueOf(context.getRetryCount()) + "|"; + logger.debug(retryInfo + "consuming msg:\n" + msg); + logger.debug(retryInfo + "message body:\n" + new String(msg.getBody())); + Acknowledgement acknowledgement = new Acknowledgement(); + Message toChannel = MessageBuilder.withPayload(msg.getBody()). + setHeaders(new RocketMQMessageHeaderAccessor(). + withAcknowledgment(acknowledgement). + withTags(msg.getTags()). + withKeys(msg.getKeys()). + withFlag(msg.getFlag()). + withRocketMessage(msg) + ).build(); + acknowledgements.add(acknowledgement); + RocketMQInboundChannelAdapter.this.sendMessage(toChannel); + }); + return acknowledgements.get(0); + } + + @Override + public boolean open(RetryContext context, RetryCallback callback) { + return true; + } + + @Override + public void close(RetryContext context, RetryCallback callback, + Throwable throwable) { + if (throwable != null) { + instrumentationManager.getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + } else { + instrumentationManager.getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + } + } + + @Override + public void onError(RetryContext context, RetryCallback callback, + Throwable throwable) { + } + } + + protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements + MessageListenerConcurrently { + + public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager) { + super(instrumentationManager); + } + + @Override + public ConsumeConcurrentlyStatus consumeMessage(final List msgs, + ConsumeConcurrentlyContext context) { + Acknowledgement acknowledgement = consumeMessage(msgs); + context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel()); + return acknowledgement.getConsumeConcurrentlyStatus(); + } + } + + protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements + MessageListenerOrderly { + + public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager) { + super(instrumentationManager); + } + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + Acknowledgement acknowledgement = consumeMessage(msgs); + context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); + return acknowledgement.getConsumeOrderlyStatus(); + } + + } + }