From e12b7aac670d22234c6f9d79268799f881858c35 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 8 Nov 2018 22:39:10 +0800 Subject: [PATCH] do some refactor aboud rocketmq binder --- .../rocketmq/RocketMQBinderConstants.java | 4 +- .../consuming/CloudStreamMessageListener.java | 64 +++++++++---------- .../consuming/ConsumerPropertiesWrapper.java | 42 ------------ .../rocketmq/consuming/ConsumersManager.java | 8 ++- .../RocketMQInboundChannelAdapter.java | 19 ++++-- .../integration/RocketMQMessageHandler.java | 4 +- .../RocketMQConsumerProperties.java | 13 ++++ 7 files changed, 65 insertions(+), 89 deletions(-) delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index f03fc48e..4086f772 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -10,9 +10,9 @@ public interface RocketMQBinderConstants { */ String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE"; - String ROCKET_FLAG = "ROCKET_FLAG"; + String ROCKET_FLAG = "ROCKETMQ_FLAG"; - String ROCKET_SEND_RESULT = "ROCKET_SEND_RESULT"; + String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java index 9acd6076..8de146d5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java @@ -12,7 +12,6 @@ import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAcc import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.StringUtils; /** * @author Timur Valiev @@ -21,56 +20,51 @@ import org.springframework.util.StringUtils; public abstract class CloudStreamMessageListener implements MessageListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private ConsumerPropertiesWrapper consumerPropertiesWrapper; - private final InstrumentationManager instrumentationManager; private final Consumer sendMsgAction; + private String topic; + CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer sendMsgAction) { this.instrumentationManager = instrumentationManager; this.sendMsgAction = sendMsgAction; } - public String getTagsString() { - return String.join(" || ", consumerPropertiesWrapper.getTagsSet()); - } - - public void setConsumerPropertiesWrapper(String group, String topic, String tags) { - this.consumerPropertiesWrapper = new ConsumerPropertiesWrapper(group, topic, tags); - } - Acknowledgement consumeMessage(final List msgs) { List acknowledgements = new ArrayList<>(); msgs.forEach(msg -> { logger.info("consuming msg:\n" + msg); logger.debug("message body:\n" + new String(msg.getBody())); - if (consumerPropertiesWrapper != null && msg.getTopic().equals(consumerPropertiesWrapper.getTopic())) { - if (StringUtils.isEmpty(consumerPropertiesWrapper.getTags()) || consumerPropertiesWrapper.getTagsSet() - .contains(msg.getTags())) { - try { - Acknowledgement acknowledgement = new Acknowledgement(); - Message toChannel = MessageBuilder.withPayload(msg.getBody()). - setHeaders(new RocketMQMessageHeaderAccessor(). - withAcknowledgment(acknowledgement). - withTags(msg.getTags()). - withKeys(msg.getKeys()). - withFlag(msg.getFlag()). - withRocketMessage(msg) - ).build(); - acknowledgements.add(acknowledgement); - sendMsgAction.accept(toChannel); - instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic()) - .markConsumed(); - } catch (Exception e) { - logger.error("Rocket Message hasn't been processed successfully. Caused by ", e); - instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic()) - .markConsumedFailure(); - throw e; - } - } + try { + Acknowledgement acknowledgement = new Acknowledgement(); + Message toChannel = MessageBuilder.withPayload(msg.getBody()). + setHeaders(new RocketMQMessageHeaderAccessor(). + withAcknowledgment(acknowledgement). + withTags(msg.getTags()). + withKeys(msg.getKeys()). + withFlag(msg.getFlag()). + withRocketMessage(msg) + ).build(); + acknowledgements.add(acknowledgement); + sendMsgAction.accept(toChannel); + instrumentationManager.getConsumerInstrumentation(getTopic()) + .markConsumed(); + } catch (Exception e) { + logger.error("RocketMQ Message hasn't been processed successfully. Caused by ", e); + instrumentationManager.getConsumerInstrumentation(getTopic()) + .markConsumedFailure(); + throw e; } }); return acknowledgements.get(0); } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java deleted file mode 100644 index 0e3b9111..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * @author Timur Valiev - * @author Jim - */ -class ConsumerPropertiesWrapper { - private final String tags; - private final String group; - private final String topic; - private final Set tagsSet; - - ConsumerPropertiesWrapper(String group, String topic, String tags) { - this.tags = tags; - this.group = group; - this.topic = topic; - tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect( - Collectors.toSet()); - } - - String getTags() { - return tags; - } - - String getGroup() { - return group; - } - - String getTopic() { - return topic; - } - - Set getTagsSet() { - return tagsSet; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java index 3b6c2647..04606915 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java @@ -7,6 +7,7 @@ import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; @@ -52,7 +53,10 @@ public class ConsumersManager { started.put(group, false); consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); - logger.info("Rocket consuming for SCS group {} created", group); + if (consumerProperties.getExtension().getBroadcasting()) { + consumer.setMessageModel(MessageModel.BROADCASTING); + } + logger.info("RocketMQ consuming for SCS group {} created", group); return consumer; } @@ -90,7 +94,7 @@ public class ConsumersManager { groupInstrumentation.markStartedSuccessfully(); } catch (MQClientException e) { groupInstrumentation.markStartFailed(e); - logger.error("Rocket Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); + logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); throw e; } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index c5494a4b..8a36168d 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -1,5 +1,10 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.exception.MQClientException; @@ -62,8 +67,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly( instrumentationManager, msg -> sendMessage(msg)) : new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg)); + listener.setTopic(destination); - listener.setConsumerPropertiesWrapper(group, destination, tags); + Set tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect( + Collectors.toSet()); consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination); instrumentationManager.addHealthInstrumentation(consumerInstrumentation); @@ -72,13 +79,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql())); } else { - consumer.subscribe(destination, listener.getTagsString()); + consumer.subscribe(destination, String.join(" || ", tagsSet)); } consumerInstrumentation.markStartedSuccessfully(); } catch (MQClientException e) { consumerInstrumentation.markStartFailed(e); - logger.error("Rocket Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e); - throw new RuntimeException("Rocket Consumer hasn't been subscribed.", e); + logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e); + throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); } consumer.registerMessageListener(listener); @@ -86,8 +93,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { try { consumersManager.startConsumer(group); } catch (MQClientException e) { - logger.error("Rocket Consumer startup failed. Caused by " + e.getErrorMessage(), e); - throw new RuntimeException("Rocket Consumer startup failed.", e); + logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), e); + throw new RuntimeException("RocketMQ Consumer startup failed.", e); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 01bc7245..a70283fb 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li producerInstrumentation.markStartedSuccessfully(); } catch (MQClientException e) { producerInstrumentation.markStartFailed(e); - logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage()); + logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); } running = true; @@ -122,7 +122,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedOperationException e) { producerInstrumentation.markSentFailure(); - logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage()); + logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 410914ac..bd169c79 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -4,6 +4,7 @@ import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; /** * @author Timur Valiev @@ -23,6 +24,11 @@ public class RocketMQConsumerProperties { */ private String sql; + /** + * {@link MessageModel#BROADCASTING} + */ + private Boolean broadcasting = false; + /** * if orderly is true, using {@link MessageListenerOrderly} * else if orderly if false, using {@link MessageListenerConcurrently} @@ -63,4 +69,11 @@ public class RocketMQConsumerProperties { this.enabled = enabled; } + public Boolean getBroadcasting() { + return broadcasting; + } + + public void setBroadcasting(Boolean broadcasting) { + this.broadcasting = broadcasting; + } }