From de2f3f2131dda97fc0e321a857cc3004579b6535 Mon Sep 17 00:00:00 2001
From: wangxing <1271029739@qq.com>
Date: Tue, 13 Aug 2019 10:58:32 +0800
Subject: [PATCH] Update the code format and replace the partition number
setting at startup.
---
.../RocketMQMessageChannelBinder.java | 50 +++++++++----
.../integration/RocketMQMessageHandler.java | 74 ++++++++++++++-----
.../PartitionMessageQueueSelector.java | 37 ++++++----
3 files changed, 110 insertions(+), 51 deletions(-)
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index 53466927..25712e42 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -16,18 +16,9 @@
package com.alibaba.cloud.stream.binder.rocketmq;
-import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
-import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -41,19 +32,31 @@ import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
+import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
+import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
-import java.util.HashMap;
-import java.util.Map;
+import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
+import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
+import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Jim
@@ -86,6 +89,7 @@ public class RocketMQMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties producerProperties,
+ MessageChannel channel,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
@@ -149,13 +153,20 @@ public class RocketMQMessageChannelBinder extends
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
- rocketMQTemplate.setMessageQueueSelector(new PartitionMessageQueueSelector());
+ if (producerProperties.isPartitioned()) {
+ rocketMQTemplate
+ .setMessageQueueSelector(new PartitionMessageQueueSelector());
+ }
}
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), producerGroup,
producerProperties.getExtension().getTransactional(),
- instrumentationManager);
+ instrumentationManager, producerProperties,
+ ((AbstractMessageChannel) channel).getChannelInterceptors().stream()
+ .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
+ .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
+ .findFirst().orElse(null));
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync());
@@ -170,6 +181,13 @@ public class RocketMQMessageChannelBinder extends
}
}
+ @Override
+ protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
+ ExtendedProducerProperties producerProperties,
+ MessageChannel errorChannel) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
String group,
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index 794005db..376e7f61 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -16,19 +16,22 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration;
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import java.util.List;
+import java.util.Optional;
+
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
@@ -40,7 +43,10 @@ import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import java.util.Optional;
+import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
+import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
+import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
/**
* @author Jim
@@ -68,14 +74,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private volatile boolean running = false;
+ private ExtendedProducerProperties producerProperties;
+
+ private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
+
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
String groupName, Boolean transactional,
- InstrumentationManager instrumentationManager) {
+ InstrumentationManager instrumentationManager,
+ ExtendedProducerProperties producerProperties,
+ MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
this.rocketMQTemplate = rocketMQTemplate;
this.destination = destination;
this.groupName = groupName;
this.transactional = transactional;
this.instrumentationManager = instrumentationManager;
+ this.producerProperties = producerProperties;
+ this.partitioningInterceptor = partitioningInterceptor;
}
@Override
@@ -97,6 +111,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
.build(), e);
}
}
+ if (producerProperties.isPartitioned()) {
+ try {
+ List messageQueues = rocketMQTemplate.getProducer()
+ .fetchPublishMessageQueues(destination);
+ if (producerProperties.getPartitionCount() != messageQueues.size()) {
+ logger.info(String.format(
+ "The partition count will change from '%s' to '%s'",
+ producerProperties.getPartitionCount(),
+ messageQueues.size()));
+ producerProperties.setPartitionCount(messageQueues.size());
+ partitioningInterceptor
+ .setPartitionCount(producerProperties.getPartitionCount());
+ }
+ }
+ catch (MQClientException e) {
+ logger.error("fetch publish message queues fail", e);
+ }
+ }
running = true;
}
@@ -146,13 +178,17 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
catch (Exception e) {
// ignore
}
- boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER);
+ boolean needSelectQueue = message.getHeaders()
+ .containsKey(BinderHeaders.PARTITION_HEADER);
if (sync) {
if (needSelectQueue) {
- sendRes = rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "",
+ sendRes = rocketMQTemplate.syncSendOrderly(
+ topicWithTags.toString(), message, "",
rocketMQTemplate.getProducer().getSendMsgTimeout());
- } else {
- sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
+ }
+ else {
+ sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
+ message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
}
@@ -168,24 +204,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
public void onException(Throwable e) {
- log.error(
- "RocketMQ Message hasn't been sent. Caused by "
- + e.getMessage());
+ log.error("RocketMQ Message hasn't been sent. Caused by "
+ + e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
- .buildErrorMessage(
- new MessagingException(
- message, e),
- null));
+ .buildErrorMessage(new MessagingException(
+ message, e), null));
}
}
};
if (needSelectQueue) {
- rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback,
+ rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
+ message, "", sendCallback,
rocketMQTemplate.getProducer().getSendMsgTimeout());
- } else {
- rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback);
+ }
+ else {
+ rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
+ sendCallback);
}
}
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java
index f8e599a4..38603346 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java
@@ -1,5 +1,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector;
+import java.util.List;
+
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -7,28 +9,31 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
-import java.util.List;
-
/**
* @author wangxing
* @create 2019/7/3
*/
public class PartitionMessageQueueSelector implements MessageQueueSelector {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMessageQueueSelector.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PartitionMessageQueueSelector.class);
- @Override
- public MessageQueue select(List mqs, Message msg, Object arg) {
- Integer partition = 0;
- try {
- partition = Math.abs(Integer.valueOf(msg.getProperty(BinderHeaders.PARTITION_HEADER)));
- if (partition >= mqs.size()) {
- LOGGER.warn("the partition '{}' is greater than the number of queues '{}'.", partition, mqs.size());
- partition = partition % mqs.size();
- }
- } catch (NumberFormatException ignored) {
- }
- return mqs.get(partition);
- }
+ @Override
+ public MessageQueue select(List mqs, Message msg, Object arg) {
+ Integer partition = 0;
+ try {
+ partition = Math.abs(
+ Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER)));
+ if (partition >= mqs.size()) {
+ LOGGER.warn(
+ "the partition '{}' is greater than the number of queues '{}'.",
+ partition, mqs.size());
+ partition = partition % mqs.size();
+ }
+ }
+ catch (NumberFormatException ignored) {
+ }
+ return mqs.get(partition);
+ }
}
\ No newline at end of file