diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 6df97fca..4961ad64 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,9 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq; -import java.util.HashMap; -import java.util.Map; - +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -42,6 +40,7 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import org.springframework.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.StaticMessageHeaderAccessor; @@ -53,7 +52,8 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; +import java.util.Map; /** * @author Jim @@ -149,6 +149,7 @@ public class RocketMQMessageChannelBinder extends producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); rocketMQTemplate.setProducer(producer); + rocketMQTemplate.setMessageQueueSelector(new PartitionMessageQueueSelector()); } RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 65dbb9e5..e6aa0de8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -16,8 +16,6 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; -import java.util.Optional; - import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; @@ -27,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -41,6 +40,8 @@ import org.springframework.messaging.support.ErrorMessage; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import java.util.Optional; + /** * @author Jim */ @@ -145,36 +146,47 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li catch (Exception e) { // ignore } + boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER); if (sync) { - sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, - rocketMQTemplate.getProducer().getSendMsgTimeout(), - delayLevel); + if (needSelectQueue) { + sendRes = rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "", + rocketMQTemplate.getProducer().getSendMsgTimeout()); + } else { + sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, + rocketMQTemplate.getProducer().getSendMsgTimeout(), + delayLevel); + } log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { - rocketMQTemplate.asyncSend(topicWithTags.toString(), message, - new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.debug("async send to topic " + topicWithTags + " " - + sendResult); - } + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.debug("async send to topic " + topicWithTags + " " + + sendResult); + } - @Override - public void onException(Throwable e) { - log.error( - "RocketMQ Message hasn't been sent. Caused by " - + e.getMessage()); - if (getSendFailureChannel() != null) { - getSendFailureChannel().send( - RocketMQMessageHandler.this.errorMessageStrategy - .buildErrorMessage( - new MessagingException( - message, e), - null)); - } - } - }); + @Override + public void onException(Throwable e) { + log.error( + "RocketMQ Message hasn't been sent. Caused by " + + e.getMessage()); + if (getSendFailureChannel() != null) { + getSendFailureChannel().send( + RocketMQMessageHandler.this.errorMessageStrategy + .buildErrorMessage( + new MessagingException( + message, e), + null)); + } + } + }; + if (needSelectQueue) { + rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback, + rocketMQTemplate.getProducer().getSendMsgTimeout()); + } else { + rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback); + } } } if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java new file mode 100644 index 00000000..e07f0b36 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java @@ -0,0 +1,34 @@ +package org.springframework.cloud.stream.binder.rocketmq.provisioning.selector; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; + +import java.util.List; + +/** + * @author wangxing + * @create 2019/7/3 + */ +public class PartitionMessageQueueSelector implements MessageQueueSelector { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMessageQueueSelector.class); + + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer partition = 0; + try { + partition = Integer.valueOf(msg.getProperty(BinderHeaders.PARTITION_HEADER)); + } catch (NumberFormatException ignored) { + } + if (partition >= mqs.size()) { + LOGGER.warn("the partition '{}' is greater than the number of queues '{}'.", partition, mqs.size()); + partition = partition % mqs.size(); + } + return mqs.get(partition); + } + +} \ No newline at end of file