mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
support spring-retry and refactor MessageListener of RocketMQ
This commit is contained in:
parent
e12b7aac67
commit
b2af35d84b
@ -62,4 +62,17 @@ public class Acknowledgement {
|
|||||||
public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
|
public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
|
||||||
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
|
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Acknowledgement buildOrderlyInstance() {
|
||||||
|
Acknowledgement acknowledgement = new Acknowledgement();
|
||||||
|
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
|
||||||
|
return acknowledgement;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Acknowledgement buildConcurrentlyInstance() {
|
||||||
|
Acknowledgement acknowledgement = new Acknowledgement();
|
||||||
|
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
|
||||||
|
return acknowledgement;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,70 +0,0 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListener;
|
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Timur Valiev
|
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
|
||||||
*/
|
|
||||||
public abstract class CloudStreamMessageListener implements MessageListener {
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
||||||
|
|
||||||
private final InstrumentationManager instrumentationManager;
|
|
||||||
|
|
||||||
private final Consumer<Message> sendMsgAction;
|
|
||||||
|
|
||||||
private String topic;
|
|
||||||
|
|
||||||
CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer<Message> sendMsgAction) {
|
|
||||||
this.instrumentationManager = instrumentationManager;
|
|
||||||
this.sendMsgAction = sendMsgAction;
|
|
||||||
}
|
|
||||||
|
|
||||||
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
|
||||||
List<Acknowledgement> acknowledgements = new ArrayList<>();
|
|
||||||
msgs.forEach(msg -> {
|
|
||||||
logger.info("consuming msg:\n" + msg);
|
|
||||||
logger.debug("message body:\n" + new String(msg.getBody()));
|
|
||||||
try {
|
|
||||||
Acknowledgement acknowledgement = new Acknowledgement();
|
|
||||||
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
|
|
||||||
setHeaders(new RocketMQMessageHeaderAccessor().
|
|
||||||
withAcknowledgment(acknowledgement).
|
|
||||||
withTags(msg.getTags()).
|
|
||||||
withKeys(msg.getKeys()).
|
|
||||||
withFlag(msg.getFlag()).
|
|
||||||
withRocketMessage(msg)
|
|
||||||
).build();
|
|
||||||
acknowledgements.add(acknowledgement);
|
|
||||||
sendMsgAction.accept(toChannel);
|
|
||||||
instrumentationManager.getConsumerInstrumentation(getTopic())
|
|
||||||
.markConsumed();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("RocketMQ Message hasn't been processed successfully. Caused by ", e);
|
|
||||||
instrumentationManager.getConsumerInstrumentation(getTopic())
|
|
||||||
.markConsumedFailure();
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return acknowledgements.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getTopic() {
|
|
||||||
return topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTopic(String topic) {
|
|
||||||
this.topic = topic;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,33 +0,0 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Timur Valiev
|
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
|
||||||
*/
|
|
||||||
public class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
|
|
||||||
MessageListenerConcurrently {
|
|
||||||
|
|
||||||
public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager,
|
|
||||||
Consumer<Message> sendMsgAction) {
|
|
||||||
super(instrumentationManager, sendMsgAction);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
|
|
||||||
ConsumeConcurrentlyContext context) {
|
|
||||||
Acknowledgement acknowledgement = consumeMessage(msgs);
|
|
||||||
context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
|
|
||||||
return acknowledgement.getConsumeConcurrentlyStatus();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,31 +0,0 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Timur Valiev
|
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
|
||||||
*/
|
|
||||||
public class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly {
|
|
||||||
|
|
||||||
public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager,
|
|
||||||
Consumer<Message> sendMsgAction) {
|
|
||||||
super(instrumentationManager, sendMsgAction);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
|
|
||||||
Acknowledgement acknowledgement = consumeMessage(msgs);
|
|
||||||
context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
|
|
||||||
return acknowledgement.getConsumeOrderlyStatus();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,23 +1,40 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.ClassUtils;
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListener;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListener;
|
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerConcurrently;
|
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerOrderly;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
|
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||||
import org.springframework.integration.endpoint.MessageProducerSupport;
|
import org.springframework.integration.endpoint.MessageProducerSupport;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
import org.springframework.retry.RecoveryCallback;
|
import org.springframework.retry.RecoveryCallback;
|
||||||
|
import org.springframework.retry.RetryCallback;
|
||||||
|
import org.springframework.retry.RetryContext;
|
||||||
|
import org.springframework.retry.RetryListener;
|
||||||
import org.springframework.retry.support.RetryTemplate;
|
import org.springframework.retry.support.RetryTemplate;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
@ -65,12 +82,15 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|||||||
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
|
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
|
||||||
|
|
||||||
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
|
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
|
||||||
instrumentationManager, msg -> sendMessage(msg))
|
instrumentationManager)
|
||||||
: new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg));
|
: new CloudStreamMessageListenerConcurrently(instrumentationManager);
|
||||||
listener.setTopic(destination);
|
|
||||||
|
|
||||||
Set<String> tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect(
|
if (retryTemplate != null) {
|
||||||
Collectors.toSet());
|
retryTemplate.registerListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
|
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
|
||||||
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
|
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
|
||||||
@ -110,4 +130,123 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|||||||
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
|
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
|
||||||
this.recoveryCallback = recoveryCallback;
|
this.recoveryCallback = recoveryCallback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
|
private final InstrumentationManager instrumentationManager;
|
||||||
|
|
||||||
|
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
|
||||||
|
this.instrumentationManager = instrumentationManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
||||||
|
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
|
||||||
|
try {
|
||||||
|
if (enableRetry) {
|
||||||
|
return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
|
||||||
|
(RetryCallback<Acknowledgement, Exception>)context -> doSendMsgs(msgs, context),
|
||||||
|
new RecoveryCallback<Acknowledgement>() {
|
||||||
|
@Override
|
||||||
|
public Acknowledgement recover(RetryContext context) throws Exception {
|
||||||
|
RocketMQInboundChannelAdapter.this.recoveryCallback.recover(context);
|
||||||
|
if (ClassUtils.isAssignable(this.getClass(), MessageListenerConcurrently.class)) {
|
||||||
|
return Acknowledgement.buildConcurrentlyInstance();
|
||||||
|
} else {
|
||||||
|
return Acknowledgement.buildOrderlyInstance();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
Acknowledgement result = doSendMsgs(msgs, null);
|
||||||
|
instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
|
||||||
|
.markConsumed();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
|
||||||
|
instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
|
||||||
|
.markConsumedFailure();
|
||||||
|
throw new RuntimeException("Rocket Message hasn't been processed successfully. Caused by ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Acknowledgement doSendMsgs(final List<MessageExt> msgs, RetryContext context) {
|
||||||
|
List<Acknowledgement> acknowledgements = new ArrayList<>();
|
||||||
|
msgs.forEach(msg -> {
|
||||||
|
String retryInfo = context == null ? "" : "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
|
||||||
|
logger.debug(retryInfo + "consuming msg:\n" + msg);
|
||||||
|
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
|
||||||
|
Acknowledgement acknowledgement = new Acknowledgement();
|
||||||
|
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
|
||||||
|
setHeaders(new RocketMQMessageHeaderAccessor().
|
||||||
|
withAcknowledgment(acknowledgement).
|
||||||
|
withTags(msg.getTags()).
|
||||||
|
withKeys(msg.getKeys()).
|
||||||
|
withFlag(msg.getFlag()).
|
||||||
|
withRocketMessage(msg)
|
||||||
|
).build();
|
||||||
|
acknowledgements.add(acknowledgement);
|
||||||
|
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
|
||||||
|
});
|
||||||
|
return acknowledgements.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
|
||||||
|
Throwable throwable) {
|
||||||
|
if (throwable != null) {
|
||||||
|
instrumentationManager.getConsumerInstrumentation(
|
||||||
|
RocketMQInboundChannelAdapter.this.destination)
|
||||||
|
.markConsumedFailure();
|
||||||
|
} else {
|
||||||
|
instrumentationManager.getConsumerInstrumentation(
|
||||||
|
RocketMQInboundChannelAdapter.this.destination)
|
||||||
|
.markConsumed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
|
||||||
|
Throwable throwable) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
|
||||||
|
MessageListenerConcurrently {
|
||||||
|
|
||||||
|
public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager) {
|
||||||
|
super(instrumentationManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
|
||||||
|
ConsumeConcurrentlyContext context) {
|
||||||
|
Acknowledgement acknowledgement = consumeMessage(msgs);
|
||||||
|
context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
|
||||||
|
return acknowledgement.getConsumeConcurrentlyStatus();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements
|
||||||
|
MessageListenerOrderly {
|
||||||
|
|
||||||
|
public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager) {
|
||||||
|
super(instrumentationManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
|
||||||
|
Acknowledgement acknowledgement = consumeMessage(msgs);
|
||||||
|
context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
|
||||||
|
return acknowledgement.getConsumeOrderlyStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user