diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 7b70c067..ea88fe13 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -53,6 +53,7 @@ import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -71,6 +72,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { private RecoveryCallback recoveryCallback; + private DefaultMQPushConsumer consumer; + + private CloudStreamMessageListener listener; + private final ExtendedConsumerProperties consumerProperties; private final String destination; @@ -90,6 +95,31 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { this.instrumentationManager = instrumentationManager; } + @Override + protected void onInit() { + if (consumerProperties == null + || !consumerProperties.getExtension().getEnabled()) { + return; + } + super.onInit(); + if (this.retryTemplate != null) { + Assert.state(getErrorChannel() == null, + "Cannot have an 'errorChannel' property when a 'RetryTemplate' is " + + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + + "send an error message when retries are exhausted"); + } + this.consumer = consumersManager.getOrCreateConsumer(group, destination, + consumerProperties); + + Boolean isOrderly = consumerProperties.getExtension().getOrderly(); + this.listener = isOrderly ? new CloudStreamMessageListenerOrderly() + : new CloudStreamMessageListenerConcurrently(); + + if (retryTemplate != null) { + this.retryTemplate.registerListener(this.listener); + } + } + @Override protected void doStart() { if (consumerProperties == null @@ -98,18 +128,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } String tags = consumerProperties.getExtension().getTags(); - Boolean isOrderly = consumerProperties.getExtension().getOrderly(); - - DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, - destination, consumerProperties); - - final CloudStreamMessageListener listener = isOrderly - ? new CloudStreamMessageListenerOrderly() - : new CloudStreamMessageListenerConcurrently(); - - if (retryTemplate != null) { - retryTemplate.registerListener(listener); - } Set tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim) @@ -122,11 +140,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { try { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { - consumer.subscribe(destination, MessageSelector + this.consumer.subscribe(destination, MessageSelector .bySql(consumerProperties.getExtension().getSql())); } else { - consumer.subscribe(destination, String.join(" || ", tagsSet)); + this.consumer.subscribe(destination, String.join(" || ", tagsSet)); } Optional.ofNullable(consumerInstrumentation) .ifPresent(c -> c.markStartedSuccessfully()); @@ -139,7 +157,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); } - consumer.registerMessageListener(listener); + this.consumer.registerMessageListener(this.listener); try { consumersManager.startConsumer(group); @@ -229,18 +247,24 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { logger.debug(retryInfo + "consuming msg:\n" + msg); logger.debug(retryInfo + "message body:\n" + new String(msg.getBody())); Acknowledgement acknowledgement = new Acknowledgement(); - Message toChannel = MessageBuilder.withPayload(msg.getBody()) - .setHeaders(new RocketMQMessageHeaderAccessor() - .withAcknowledgment(acknowledgement) - .withTags(msg.getTags()).withKeys(msg.getKeys()) - .withFlag(msg.getFlag()).withRocketMessage(msg)) - .build(); + Message toChannel = convertMessagingFromRocketMQMsg(msg, + acknowledgement); acknowledgements.add(acknowledgement); RocketMQInboundChannelAdapter.this.sendMessage(toChannel); }); return acknowledgements.get(0); } + private Message convertMessagingFromRocketMQMsg(MessageExt msg, + Acknowledgement acknowledgement) { + return MessageBuilder.withPayload(msg.getBody()) + .setHeaders(new RocketMQMessageHeaderAccessor() + .withAcknowledgment(acknowledgement).withTags(msg.getTags()) + .withKeys(msg.getKeys()).withFlag(msg.getFlag()) + .withRocketMessage(msg)) + .build(); + } + @Override public boolean open(RetryContext context, RetryCallback callback) {