1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Update the code format and replace the partition number setting at startup.

This commit is contained in:
wangxing 2019-08-13 10:58:32 +08:00
parent e41319f1b1
commit de2f3f2131
3 changed files with 110 additions and 51 deletions

View File

@ -16,18 +16,9 @@
package com.alibaba.cloud.stream.binder.rocketmq;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@ -41,19 +32,31 @@ import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -86,6 +89,7 @@ public class RocketMQMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel channel,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
@ -149,13 +153,20 @@ public class RocketMQMessageChannelBinder extends
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
rocketMQTemplate.setMessageQueueSelector(new PartitionMessageQueueSelector());
if (producerProperties.isPartitioned()) {
rocketMQTemplate
.setMessageQueueSelector(new PartitionMessageQueueSelector());
}
}
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), producerGroup,
producerProperties.getExtension().getTransactional(),
instrumentationManager);
instrumentationManager, producerProperties,
((AbstractMessageChannel) channel).getChannelInterceptors().stream()
.filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
.map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
.findFirst().orElse(null));
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync());
@ -170,6 +181,13 @@ public class RocketMQMessageChannelBinder extends
}
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel errorChannel) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
String group,

View File

@ -16,19 +16,22 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import java.util.List;
import java.util.Optional;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
@ -40,7 +43,10 @@ import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Optional;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -68,14 +74,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private volatile boolean running = false;
private ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
String groupName, Boolean transactional,
InstrumentationManager instrumentationManager) {
InstrumentationManager instrumentationManager,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
this.rocketMQTemplate = rocketMQTemplate;
this.destination = destination;
this.groupName = groupName;
this.transactional = transactional;
this.instrumentationManager = instrumentationManager;
this.producerProperties = producerProperties;
this.partitioningInterceptor = partitioningInterceptor;
}
@Override
@ -97,6 +111,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
.build(), e);
}
}
if (producerProperties.isPartitioned()) {
try {
List<MessageQueue> messageQueues = rocketMQTemplate.getProducer()
.fetchPublishMessageQueues(destination);
if (producerProperties.getPartitionCount() != messageQueues.size()) {
logger.info(String.format(
"The partition count will change from '%s' to '%s'",
producerProperties.getPartitionCount(),
messageQueues.size()));
producerProperties.setPartitionCount(messageQueues.size());
partitioningInterceptor
.setPartitionCount(producerProperties.getPartitionCount());
}
}
catch (MQClientException e) {
logger.error("fetch publish message queues fail", e);
}
}
running = true;
}
@ -146,13 +178,17 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
catch (Exception e) {
// ignore
}
boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER);
boolean needSelectQueue = message.getHeaders()
.containsKey(BinderHeaders.PARTITION_HEADER);
if (sync) {
if (needSelectQueue) {
sendRes = rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "",
sendRes = rocketMQTemplate.syncSendOrderly(
topicWithTags.toString(), message, "",
rocketMQTemplate.getProducer().getSendMsgTimeout());
} else {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
}
else {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
}
@ -168,24 +204,24 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
public void onException(Throwable e) {
log.error(
"RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
log.error("RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(
new MessagingException(
message, e),
null));
.buildErrorMessage(new MessagingException(
message, e), null));
}
}
};
if (needSelectQueue) {
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback,
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
message, "", sendCallback,
rocketMQTemplate.getProducer().getSendMsgTimeout());
} else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback);
}
else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
sendCallback);
}
}
}

View File

@ -1,5 +1,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector;
import java.util.List;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
@ -7,28 +9,31 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import java.util.List;
/**
* @author wangxing
* @create 2019/7/3
*/
public class PartitionMessageQueueSelector implements MessageQueueSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMessageQueueSelector.class);
private static final Logger LOGGER = LoggerFactory
.getLogger(PartitionMessageQueueSelector.class);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer partition = 0;
try {
partition = Math.abs(Integer.valueOf(msg.getProperty(BinderHeaders.PARTITION_HEADER)));
if (partition >= mqs.size()) {
LOGGER.warn("the partition '{}' is greater than the number of queues '{}'.", partition, mqs.size());
partition = partition % mqs.size();
}
} catch (NumberFormatException ignored) {
}
return mqs.get(partition);
}
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer partition = 0;
try {
partition = Math.abs(
Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER)));
if (partition >= mqs.size()) {
LOGGER.warn(
"the partition '{}' is greater than the number of queues '{}'.",
partition, mqs.size());
partition = partition % mqs.size();
}
}
catch (NumberFormatException ignored) {
}
return mqs.get(partition);
}
}