diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index d4aaf88a..bc35de93 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -32,11 +32,13 @@ import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.acks.AcknowledgmentCallback.Status; +import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -53,7 +55,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; - +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -87,6 +89,7 @@ public class RocketMQMessageChannelBinder extends @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, + MessageChannel channel, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { @@ -150,12 +153,20 @@ public class RocketMQMessageChannelBinder extends producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); rocketMQTemplate.setProducer(producer); + if (producerProperties.isPartitioned()) { + rocketMQTemplate + .setMessageQueueSelector(new PartitionMessageQueueSelector()); + } } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), - instrumentationManager); + instrumentationManager, producerProperties, + ((AbstractMessageChannel) channel).getChannelInterceptors().stream() + .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) + .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor)) + .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); @@ -170,6 +181,14 @@ public class RocketMQMessageChannelBinder extends } } + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties producerProperties, + MessageChannel errorChannel) throws Exception { + throw new UnsupportedOperationException( + "The abstract binder should not call this method"); + } + @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 6aa0e0fe..9186b064 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -16,6 +16,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration; +import java.util.List; import java.util.Optional; import org.apache.rocketmq.client.exception.MQClientException; @@ -23,10 +24,14 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.context.Lifecycle; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.DefaultErrorMessageStrategy; @@ -41,6 +46,7 @@ import org.springframework.util.StringUtils; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; /** * @author Jim @@ -68,14 +74,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private volatile boolean running = false; + private ExtendedProducerProperties producerProperties; + + private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor; + public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, String groupName, Boolean transactional, - InstrumentationManager instrumentationManager) { + InstrumentationManager instrumentationManager, + ExtendedProducerProperties producerProperties, + MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) { this.rocketMQTemplate = rocketMQTemplate; this.destination = destination; this.groupName = groupName; this.transactional = transactional; this.instrumentationManager = instrumentationManager; + this.producerProperties = producerProperties; + this.partitioningInterceptor = partitioningInterceptor; } @Override @@ -97,6 +111,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li .build(), e); } } + if (producerProperties.isPartitioned()) { + try { + List messageQueues = rocketMQTemplate.getProducer() + .fetchPublishMessageQueues(destination); + if (producerProperties.getPartitionCount() != messageQueues.size()) { + logger.info(String.format( + "The partition count of topic '%s' will change from '%s' to '%s'", + destination, producerProperties.getPartitionCount(), + messageQueues.size())); + producerProperties.setPartitionCount(messageQueues.size()); + partitioningInterceptor + .setPartitionCount(producerProperties.getPartitionCount()); + } + } + catch (MQClientException e) { + logger.error("fetch publish message queues fail", e); + } + } running = true; } @@ -146,36 +178,51 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li catch (Exception e) { // ignore } + boolean needSelectQueue = message.getHeaders() + .containsKey(BinderHeaders.PARTITION_HEADER); if (sync) { - sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, - rocketMQTemplate.getProducer().getSendMsgTimeout(), - delayLevel); + if (needSelectQueue) { + sendRes = rocketMQTemplate.syncSendOrderly( + topicWithTags.toString(), message, "", + rocketMQTemplate.getProducer().getSendMsgTimeout()); + } + else { + sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), + message, + rocketMQTemplate.getProducer().getSendMsgTimeout(), + delayLevel); + } log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { - rocketMQTemplate.asyncSend(topicWithTags.toString(), message, - new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.debug("async send to topic " + topicWithTags + " " - + sendResult); - } + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.debug("async send to topic " + topicWithTags + " " + + sendResult); + } - @Override - public void onException(Throwable e) { - log.error( - "RocketMQ Message hasn't been sent. Caused by " - + e.getMessage()); - if (getSendFailureChannel() != null) { - getSendFailureChannel().send( - RocketMQMessageHandler.this.errorMessageStrategy - .buildErrorMessage( - new MessagingException( - message, e), - null)); - } - } - }); + @Override + public void onException(Throwable e) { + log.error("RocketMQ Message hasn't been sent. Caused by " + + e.getMessage()); + if (getSendFailureChannel() != null) { + getSendFailureChannel().send( + RocketMQMessageHandler.this.errorMessageStrategy + .buildErrorMessage(new MessagingException( + message, e), null)); + } + } + }; + if (needSelectQueue) { + rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), + message, "", sendCallback, + rocketMQTemplate.getProducer().getSendMsgTimeout()); + } + else { + rocketMQTemplate.asyncSend(topicWithTags.toString(), message, + sendCallback); + } } } if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java new file mode 100644 index 00000000..38603346 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java @@ -0,0 +1,39 @@ +package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector; + +import java.util.List; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; + +/** + * @author wangxing + * @create 2019/7/3 + */ +public class PartitionMessageQueueSelector implements MessageQueueSelector { + + private static final Logger LOGGER = LoggerFactory + .getLogger(PartitionMessageQueueSelector.class); + + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer partition = 0; + try { + partition = Math.abs( + Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER))); + if (partition >= mqs.size()) { + LOGGER.warn( + "the partition '{}' is greater than the number of queues '{}'.", + partition, mqs.size()); + partition = partition % mqs.size(); + } + } + catch (NumberFormatException ignored) { + } + return mqs.get(partition); + } + +} \ No newline at end of file