mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
do some refactor aboud rocketmq binder
This commit is contained in:
parent
a1e2694886
commit
e12b7aac67
@ -10,9 +10,9 @@ public interface RocketMQBinderConstants {
|
|||||||
*/
|
*/
|
||||||
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE";
|
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE";
|
||||||
|
|
||||||
String ROCKET_FLAG = "ROCKET_FLAG";
|
String ROCKET_FLAG = "ROCKETMQ_FLAG";
|
||||||
|
|
||||||
String ROCKET_SEND_RESULT = "ROCKET_SEND_RESULT";
|
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
|
||||||
|
|
||||||
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
||||||
|
|
||||||
|
@ -12,7 +12,6 @@ import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAcc
|
|||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
import org.springframework.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Timur Valiev
|
* @author Timur Valiev
|
||||||
@ -21,33 +20,22 @@ import org.springframework.util.StringUtils;
|
|||||||
public abstract class CloudStreamMessageListener implements MessageListener {
|
public abstract class CloudStreamMessageListener implements MessageListener {
|
||||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
private ConsumerPropertiesWrapper consumerPropertiesWrapper;
|
|
||||||
|
|
||||||
private final InstrumentationManager instrumentationManager;
|
private final InstrumentationManager instrumentationManager;
|
||||||
|
|
||||||
private final Consumer<Message> sendMsgAction;
|
private final Consumer<Message> sendMsgAction;
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer<Message> sendMsgAction) {
|
CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer<Message> sendMsgAction) {
|
||||||
this.instrumentationManager = instrumentationManager;
|
this.instrumentationManager = instrumentationManager;
|
||||||
this.sendMsgAction = sendMsgAction;
|
this.sendMsgAction = sendMsgAction;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTagsString() {
|
|
||||||
return String.join(" || ", consumerPropertiesWrapper.getTagsSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setConsumerPropertiesWrapper(String group, String topic, String tags) {
|
|
||||||
this.consumerPropertiesWrapper = new ConsumerPropertiesWrapper(group, topic, tags);
|
|
||||||
}
|
|
||||||
|
|
||||||
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
||||||
List<Acknowledgement> acknowledgements = new ArrayList<>();
|
List<Acknowledgement> acknowledgements = new ArrayList<>();
|
||||||
msgs.forEach(msg -> {
|
msgs.forEach(msg -> {
|
||||||
logger.info("consuming msg:\n" + msg);
|
logger.info("consuming msg:\n" + msg);
|
||||||
logger.debug("message body:\n" + new String(msg.getBody()));
|
logger.debug("message body:\n" + new String(msg.getBody()));
|
||||||
if (consumerPropertiesWrapper != null && msg.getTopic().equals(consumerPropertiesWrapper.getTopic())) {
|
|
||||||
if (StringUtils.isEmpty(consumerPropertiesWrapper.getTags()) || consumerPropertiesWrapper.getTagsSet()
|
|
||||||
.contains(msg.getTags())) {
|
|
||||||
try {
|
try {
|
||||||
Acknowledgement acknowledgement = new Acknowledgement();
|
Acknowledgement acknowledgement = new Acknowledgement();
|
||||||
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
|
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
|
||||||
@ -60,17 +48,23 @@ public abstract class CloudStreamMessageListener implements MessageListener {
|
|||||||
).build();
|
).build();
|
||||||
acknowledgements.add(acknowledgement);
|
acknowledgements.add(acknowledgement);
|
||||||
sendMsgAction.accept(toChannel);
|
sendMsgAction.accept(toChannel);
|
||||||
instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic())
|
instrumentationManager.getConsumerInstrumentation(getTopic())
|
||||||
.markConsumed();
|
.markConsumed();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
|
logger.error("RocketMQ Message hasn't been processed successfully. Caused by ", e);
|
||||||
instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic())
|
instrumentationManager.getConsumerInstrumentation(getTopic())
|
||||||
.markConsumedFailure();
|
.markConsumedFailure();
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
return acknowledgements.get(0);
|
return acknowledgements.get(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getTopic() {
|
||||||
|
return topic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTopic(String topic) {
|
||||||
|
this.topic = topic;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,42 +0,0 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Timur Valiev
|
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
|
||||||
*/
|
|
||||||
class ConsumerPropertiesWrapper {
|
|
||||||
private final String tags;
|
|
||||||
private final String group;
|
|
||||||
private final String topic;
|
|
||||||
private final Set<String> tagsSet;
|
|
||||||
|
|
||||||
ConsumerPropertiesWrapper(String group, String topic, String tags) {
|
|
||||||
this.tags = tags;
|
|
||||||
this.group = group;
|
|
||||||
this.topic = topic;
|
|
||||||
tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect(
|
|
||||||
Collectors.toSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
String getTags() {
|
|
||||||
return tags;
|
|
||||||
}
|
|
||||||
|
|
||||||
String getGroup() {
|
|
||||||
return group;
|
|
||||||
}
|
|
||||||
|
|
||||||
String getTopic() {
|
|
||||||
return topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
Set<String> getTagsSet() {
|
|
||||||
return tagsSet;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -7,6 +7,7 @@ import java.util.Set;
|
|||||||
|
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||||
@ -52,7 +53,10 @@ public class ConsumersManager {
|
|||||||
started.put(group, false);
|
started.put(group, false);
|
||||||
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
|
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
|
||||||
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
|
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
|
||||||
logger.info("Rocket consuming for SCS group {} created", group);
|
if (consumerProperties.getExtension().getBroadcasting()) {
|
||||||
|
consumer.setMessageModel(MessageModel.BROADCASTING);
|
||||||
|
}
|
||||||
|
logger.info("RocketMQ consuming for SCS group {} created", group);
|
||||||
return consumer;
|
return consumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,7 +94,7 @@ public class ConsumersManager {
|
|||||||
groupInstrumentation.markStartedSuccessfully();
|
groupInstrumentation.markStartedSuccessfully();
|
||||||
} catch (MQClientException e) {
|
} catch (MQClientException e) {
|
||||||
groupInstrumentation.markStartFailed(e);
|
groupInstrumentation.markStartFailed(e);
|
||||||
logger.error("Rocket Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
|
logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,10 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
@ -62,8 +67,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|||||||
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
|
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
|
||||||
instrumentationManager, msg -> sendMessage(msg))
|
instrumentationManager, msg -> sendMessage(msg))
|
||||||
: new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg));
|
: new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg));
|
||||||
|
listener.setTopic(destination);
|
||||||
|
|
||||||
listener.setConsumerPropertiesWrapper(group, destination, tags);
|
Set<String> tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect(
|
||||||
|
Collectors.toSet());
|
||||||
|
|
||||||
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
|
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
|
||||||
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
|
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
|
||||||
@ -72,13 +79,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|||||||
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
|
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
|
||||||
consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
|
consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
|
||||||
} else {
|
} else {
|
||||||
consumer.subscribe(destination, listener.getTagsString());
|
consumer.subscribe(destination, String.join(" || ", tagsSet));
|
||||||
}
|
}
|
||||||
consumerInstrumentation.markStartedSuccessfully();
|
consumerInstrumentation.markStartedSuccessfully();
|
||||||
} catch (MQClientException e) {
|
} catch (MQClientException e) {
|
||||||
consumerInstrumentation.markStartFailed(e);
|
consumerInstrumentation.markStartFailed(e);
|
||||||
logger.error("Rocket Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
|
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
|
||||||
throw new RuntimeException("Rocket Consumer hasn't been subscribed.", e);
|
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.registerMessageListener(listener);
|
consumer.registerMessageListener(listener);
|
||||||
@ -86,8 +93,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|||||||
try {
|
try {
|
||||||
consumersManager.startConsumer(group);
|
consumersManager.startConsumer(group);
|
||||||
} catch (MQClientException e) {
|
} catch (MQClientException e) {
|
||||||
logger.error("Rocket Consumer startup failed. Caused by " + e.getErrorMessage(), e);
|
logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), e);
|
||||||
throw new RuntimeException("Rocket Consumer startup failed.", e);
|
throw new RuntimeException("RocketMQ Consumer startup failed.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
producerInstrumentation.markStartedSuccessfully();
|
producerInstrumentation.markStartedSuccessfully();
|
||||||
} catch (MQClientException e) {
|
} catch (MQClientException e) {
|
||||||
producerInstrumentation.markStartFailed(e);
|
producerInstrumentation.markStartFailed(e);
|
||||||
logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage());
|
logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
|
||||||
throw new MessagingException(e.getMessage(), e);
|
throw new MessagingException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
running = true;
|
running = true;
|
||||||
@ -122,7 +122,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
|
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
|
||||||
UnsupportedOperationException e) {
|
UnsupportedOperationException e) {
|
||||||
producerInstrumentation.markSentFailure();
|
producerInstrumentation.markSentFailure();
|
||||||
logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage());
|
logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
|
||||||
throw new MessagingException(e.getMessage(), e);
|
throw new MessagingException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import org.apache.rocketmq.client.consumer.MQPushConsumer;
|
|||||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||||
|
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Timur Valiev
|
* @author Timur Valiev
|
||||||
@ -23,6 +24,11 @@ public class RocketMQConsumerProperties {
|
|||||||
*/
|
*/
|
||||||
private String sql;
|
private String sql;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link MessageModel#BROADCASTING}
|
||||||
|
*/
|
||||||
|
private Boolean broadcasting = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* if orderly is true, using {@link MessageListenerOrderly}
|
* if orderly is true, using {@link MessageListenerOrderly}
|
||||||
* else if orderly if false, using {@link MessageListenerConcurrently}
|
* else if orderly if false, using {@link MessageListenerConcurrently}
|
||||||
@ -63,4 +69,11 @@ public class RocketMQConsumerProperties {
|
|||||||
this.enabled = enabled;
|
this.enabled = enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean getBroadcasting() {
|
||||||
|
return broadcasting;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBroadcasting(Boolean broadcasting) {
|
||||||
|
this.broadcasting = broadcasting;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user