1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Polish #245. Enhance errorChannel for consuming messages

This commit is contained in:
fangjian0423 2019-01-09 02:36:26 +08:00
parent 4af9faed88
commit 0cb0f7516f

View File

@ -53,6 +53,7 @@ import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext; import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener; import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
@ -71,6 +72,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private RecoveryCallback<? extends Object> recoveryCallback; private RecoveryCallback<? extends Object> recoveryCallback;
private DefaultMQPushConsumer consumer;
private CloudStreamMessageListener listener;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination; private final String destination;
@ -90,6 +95,31 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@Override
protected void onInit() {
if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return;
}
super.onInit();
if (this.retryTemplate != null) {
Assert.state(getErrorChannel() == null,
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
}
this.consumer = consumersManager.getOrCreateConsumer(group, destination,
consumerProperties);
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
this.listener = isOrderly ? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) {
this.retryTemplate.registerListener(this.listener);
}
}
@Override @Override
protected void doStart() { protected void doStart() {
if (consumerProperties == null if (consumerProperties == null
@ -98,18 +128,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
} }
String tags = consumerProperties.getExtension().getTags(); String tags = consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly
? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) {
retryTemplate.registerListener(listener);
}
Set<String> tagsSet = tags == null ? new HashSet<>() Set<String> tagsSet = tags == null ? new HashSet<>()
: Arrays.stream(tags.split("\\|\\|")).map(String::trim) : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
@ -122,11 +140,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
try { try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
consumer.subscribe(destination, MessageSelector this.consumer.subscribe(destination, MessageSelector
.bySql(consumerProperties.getExtension().getSql())); .bySql(consumerProperties.getExtension().getSql()));
} }
else { else {
consumer.subscribe(destination, String.join(" || ", tagsSet)); this.consumer.subscribe(destination, String.join(" || ", tagsSet));
} }
Optional.ofNullable(consumerInstrumentation) Optional.ofNullable(consumerInstrumentation)
.ifPresent(c -> c.markStartedSuccessfully()); .ifPresent(c -> c.markStartedSuccessfully());
@ -139,7 +157,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
} }
consumer.registerMessageListener(listener); this.consumer.registerMessageListener(this.listener);
try { try {
consumersManager.startConsumer(group); consumersManager.startConsumer(group);
@ -229,18 +247,24 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
logger.debug(retryInfo + "consuming msg:\n" + msg); logger.debug(retryInfo + "consuming msg:\n" + msg);
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody())); logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
Acknowledgement acknowledgement = new Acknowledgement(); Acknowledgement acknowledgement = new Acknowledgement();
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()) Message<byte[]> toChannel = convertMessagingFromRocketMQMsg(msg,
.setHeaders(new RocketMQMessageHeaderAccessor() acknowledgement);
.withAcknowledgment(acknowledgement)
.withTags(msg.getTags()).withKeys(msg.getKeys())
.withFlag(msg.getFlag()).withRocketMessage(msg))
.build();
acknowledgements.add(acknowledgement); acknowledgements.add(acknowledgement);
RocketMQInboundChannelAdapter.this.sendMessage(toChannel); RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
}); });
return acknowledgements.get(0); return acknowledgements.get(0);
} }
private Message convertMessagingFromRocketMQMsg(MessageExt msg,
Acknowledgement acknowledgement) {
return MessageBuilder.withPayload(msg.getBody())
.setHeaders(new RocketMQMessageHeaderAccessor()
.withAcknowledgment(acknowledgement).withTags(msg.getTags())
.withKeys(msg.getKeys()).withFlag(msg.getFlag())
.withRocketMessage(msg))
.build();
}
@Override @Override
public <T, E extends Throwable> boolean open(RetryContext context, public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) { RetryCallback<T, E> callback) {