From 443a43fe96fa1833a169dd59164ba2a965277d2f Mon Sep 17 00:00:00 2001 From: wangxing Date: Wed, 3 Jul 2019 11:03:25 +0800 Subject: [PATCH 1/4] Use partition to implement rocketmq queue selection. #667 --- .../RocketMQMessageChannelBinder.java | 9 +-- .../integration/RocketMQMessageHandler.java | 66 +++++++++++-------- .../PartitionMessageQueueSelector.java | 34 ++++++++++ 3 files changed, 78 insertions(+), 31 deletions(-) create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 6df97fca..4961ad64 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,9 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq; -import java.util.HashMap; -import java.util.Map; - +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -42,6 +40,7 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import org.springframework.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.StaticMessageHeaderAccessor; @@ -53,7 +52,8 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; +import java.util.Map; /** * @author Jim @@ -149,6 +149,7 @@ public class RocketMQMessageChannelBinder extends producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); rocketMQTemplate.setProducer(producer); + rocketMQTemplate.setMessageQueueSelector(new PartitionMessageQueueSelector()); } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 65dbb9e5..e6aa0de8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -16,8 +16,6 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; -import java.util.Optional; - import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; @@ -27,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -41,6 +40,8 @@ import org.springframework.messaging.support.ErrorMessage; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import java.util.Optional; + /** * @author Jim */ @@ -145,36 +146,47 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li catch (Exception e) { // ignore } + boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER); if (sync) { - sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, - rocketMQTemplate.getProducer().getSendMsgTimeout(), - delayLevel); + if (needSelectQueue) { + sendRes = rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "", + rocketMQTemplate.getProducer().getSendMsgTimeout()); + } else { + sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, + rocketMQTemplate.getProducer().getSendMsgTimeout(), + delayLevel); + } log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { - rocketMQTemplate.asyncSend(topicWithTags.toString(), message, - new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.debug("async send to topic " + topicWithTags + " " - + sendResult); - } + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.debug("async send to topic " + topicWithTags + " " + + sendResult); + } - @Override - public void onException(Throwable e) { - log.error( - "RocketMQ Message hasn't been sent. Caused by " - + e.getMessage()); - if (getSendFailureChannel() != null) { - getSendFailureChannel().send( - RocketMQMessageHandler.this.errorMessageStrategy - .buildErrorMessage( - new MessagingException( - message, e), - null)); - } - } - }); + @Override + public void onException(Throwable e) { + log.error( + "RocketMQ Message hasn't been sent. Caused by " + + e.getMessage()); + if (getSendFailureChannel() != null) { + getSendFailureChannel().send( + RocketMQMessageHandler.this.errorMessageStrategy + .buildErrorMessage( + new MessagingException( + message, e), + null)); + } + } + }; + if (needSelectQueue) { + rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback, + rocketMQTemplate.getProducer().getSendMsgTimeout()); + } else { + rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback); + } } } if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java new file mode 100644 index 00000000..e07f0b36 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java @@ -0,0 +1,34 @@ +package org.springframework.cloud.stream.binder.rocketmq.provisioning.selector; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; + +import java.util.List; + +/** + * @author wangxing + * @create 2019/7/3 + */ +public class PartitionMessageQueueSelector implements MessageQueueSelector { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMessageQueueSelector.class); + + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer partition = 0; + try { + partition = Integer.valueOf(msg.getProperty(BinderHeaders.PARTITION_HEADER)); + } catch (NumberFormatException ignored) { + } + if (partition >= mqs.size()) { + LOGGER.warn("the partition '{}' is greater than the number of queues '{}'.", partition, mqs.size()); + partition = partition % mqs.size(); + } + return mqs.get(partition); + } + +} \ No newline at end of file From de2f3f2131dda97fc0e321a857cc3004579b6535 Mon Sep 17 00:00:00 2001 From: wangxing <1271029739@qq.com> Date: Tue, 13 Aug 2019 10:58:32 +0800 Subject: [PATCH 2/4] Update the code format and replace the partition number setting at startup. --- .../RocketMQMessageChannelBinder.java | 50 +++++++++---- .../integration/RocketMQMessageHandler.java | 74 ++++++++++++++----- .../PartitionMessageQueueSelector.java | 37 ++++++---- 3 files changed, 110 insertions(+), 51 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 53466927..25712e42 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,18 +16,9 @@ package com.alibaba.cloud.stream.binder.rocketmq; -import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; -import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; -import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; -import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; +import java.util.Map; + import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -41,19 +32,31 @@ import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.acks.AcknowledgmentCallback.Status; +import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; -import java.util.HashMap; -import java.util.Map; +import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; +import com.fasterxml.jackson.databind.ObjectMapper; /** * @author Jim @@ -86,6 +89,7 @@ public class RocketMQMessageChannelBinder extends @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, + MessageChannel channel, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { @@ -149,13 +153,20 @@ public class RocketMQMessageChannelBinder extends producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); rocketMQTemplate.setProducer(producer); - rocketMQTemplate.setMessageQueueSelector(new PartitionMessageQueueSelector()); + if (producerProperties.isPartitioned()) { + rocketMQTemplate + .setMessageQueueSelector(new PartitionMessageQueueSelector()); + } } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), - instrumentationManager); + instrumentationManager, producerProperties, + ((AbstractMessageChannel) channel).getChannelInterceptors().stream() + .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) + .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor)) + .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); @@ -170,6 +181,13 @@ public class RocketMQMessageChannelBinder extends } } + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties producerProperties, + MessageChannel errorChannel) throws Exception { + throw new UnsupportedOperationException(); + } + @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 794005db..376e7f61 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -16,19 +16,22 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration; -import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; -import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; -import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import java.util.List; +import java.util.Optional; + import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.context.Lifecycle; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.DefaultErrorMessageStrategy; @@ -40,7 +43,10 @@ import org.springframework.messaging.support.ErrorMessage; import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import java.util.Optional; +import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; /** * @author Jim @@ -68,14 +74,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private volatile boolean running = false; + private ExtendedProducerProperties producerProperties; + + private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor; + public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, String groupName, Boolean transactional, - InstrumentationManager instrumentationManager) { + InstrumentationManager instrumentationManager, + ExtendedProducerProperties producerProperties, + MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) { this.rocketMQTemplate = rocketMQTemplate; this.destination = destination; this.groupName = groupName; this.transactional = transactional; this.instrumentationManager = instrumentationManager; + this.producerProperties = producerProperties; + this.partitioningInterceptor = partitioningInterceptor; } @Override @@ -97,6 +111,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li .build(), e); } } + if (producerProperties.isPartitioned()) { + try { + List messageQueues = rocketMQTemplate.getProducer() + .fetchPublishMessageQueues(destination); + if (producerProperties.getPartitionCount() != messageQueues.size()) { + logger.info(String.format( + "The partition count will change from '%s' to '%s'", + producerProperties.getPartitionCount(), + messageQueues.size())); + producerProperties.setPartitionCount(messageQueues.size()); + partitioningInterceptor + .setPartitionCount(producerProperties.getPartitionCount()); + } + } + catch (MQClientException e) { + logger.error("fetch publish message queues fail", e); + } + } running = true; } @@ -146,13 +178,17 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li catch (Exception e) { // ignore } - boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER); + boolean needSelectQueue = message.getHeaders() + .containsKey(BinderHeaders.PARTITION_HEADER); if (sync) { if (needSelectQueue) { - sendRes = rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "", + sendRes = rocketMQTemplate.syncSendOrderly( + topicWithTags.toString(), message, "", rocketMQTemplate.getProducer().getSendMsgTimeout()); - } else { - sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, + } + else { + sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), + message, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); } @@ -168,24 +204,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li @Override public void onException(Throwable e) { - log.error( - "RocketMQ Message hasn't been sent. Caused by " - + e.getMessage()); + log.error("RocketMQ Message hasn't been sent. Caused by " + + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy - .buildErrorMessage( - new MessagingException( - message, e), - null)); + .buildErrorMessage(new MessagingException( + message, e), null)); } } }; if (needSelectQueue) { - rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback, + rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), + message, "", sendCallback, rocketMQTemplate.getProducer().getSendMsgTimeout()); - } else { - rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback); + } + else { + rocketMQTemplate.asyncSend(topicWithTags.toString(), message, + sendCallback); } } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java index f8e599a4..38603346 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java @@ -1,5 +1,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector; +import java.util.List; + import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; @@ -7,28 +9,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.BinderHeaders; -import java.util.List; - /** * @author wangxing * @create 2019/7/3 */ public class PartitionMessageQueueSelector implements MessageQueueSelector { - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMessageQueueSelector.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(PartitionMessageQueueSelector.class); - @Override - public MessageQueue select(List mqs, Message msg, Object arg) { - Integer partition = 0; - try { - partition = Math.abs(Integer.valueOf(msg.getProperty(BinderHeaders.PARTITION_HEADER))); - if (partition >= mqs.size()) { - LOGGER.warn("the partition '{}' is greater than the number of queues '{}'.", partition, mqs.size()); - partition = partition % mqs.size(); - } - } catch (NumberFormatException ignored) { - } - return mqs.get(partition); - } + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer partition = 0; + try { + partition = Math.abs( + Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER))); + if (partition >= mqs.size()) { + LOGGER.warn( + "the partition '{}' is greater than the number of queues '{}'.", + partition, mqs.size()); + partition = partition % mqs.size(); + } + } + catch (NumberFormatException ignored) { + } + return mqs.get(partition); + } } \ No newline at end of file From 7073be1186193b69c772a68b42f60ee07ae09622 Mon Sep 17 00:00:00 2001 From: wangxing <1271029739@qq.com> Date: Tue, 13 Aug 2019 13:45:08 +0800 Subject: [PATCH 3/4] Optimize documents and logs. --- .../binder/rocketmq/RocketMQMessageChannelBinder.java | 10 ++++++++++ .../rocketmq/integration/RocketMQMessageHandler.java | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 25712e42..62e54fc0 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -181,6 +181,16 @@ public class RocketMQMessageChannelBinder extends } } + /** + * The abstract binder should not call this method. + * @param destination + * @param producerProperties + * @param errorChannel + * @return + * @throws Exception + * @see RocketMQMessageChannelBinder#createProducerMessageHandler(ProducerDestination, + * ExtendedProducerProperties, MessageChannel, MessageChannel) + */ @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 376e7f61..9186b064 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -117,8 +117,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li .fetchPublishMessageQueues(destination); if (producerProperties.getPartitionCount() != messageQueues.size()) { logger.info(String.format( - "The partition count will change from '%s' to '%s'", - producerProperties.getPartitionCount(), + "The partition count of topic '%s' will change from '%s' to '%s'", + destination, producerProperties.getPartitionCount(), messageQueues.size())); producerProperties.setPartitionCount(messageQueues.size()); partitioningInterceptor From 3ad57fb7bfcf5d94c9093056960875e647f1669b Mon Sep 17 00:00:00 2001 From: wangxing <1271029739@qq.com> Date: Tue, 13 Aug 2019 14:25:41 +0800 Subject: [PATCH 4/4] Change exception msg. --- .../rocketmq/RocketMQMessageChannelBinder.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 62e54fc0..bc35de93 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -181,21 +181,12 @@ public class RocketMQMessageChannelBinder extends } } - /** - * The abstract binder should not call this method. - * @param destination - * @param producerProperties - * @param errorChannel - * @return - * @throws Exception - * @see RocketMQMessageChannelBinder#createProducerMessageHandler(ProducerDestination, - * ExtendedProducerProperties, MessageChannel, MessageChannel) - */ @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException( + "The abstract binder should not call this method"); } @Override