1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Code refactoring and some new feature support

This commit is contained in:
zkzlx 2021-02-01 11:24:25 +08:00
parent cbd64061ce
commit a6e01c9984
18 changed files with 2120 additions and 0 deletions

View File

@ -0,0 +1,45 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExtendedBindingHandlerMappingsProviderConfiguration {
@Bean
public MappingsProvider rocketExtendedPropertiesDefaultMappingsProvider() {
return () -> {
Map<ConfigurationPropertyName, ConfigurationPropertyName> mappings = new HashMap<>();
mappings.put(
ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.bindings"),
ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.default"));
mappings.put(
ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.streams"),
ConfigurationPropertyName
.of("spring.cloud.stream.rocketmq.streams.default"));
return mappings;
};
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQConfigBeanPostProcessor;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
/**
* issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ RocketMQExtendedBindingProperties.class,
RocketMQBinderConfigurationProperties.class })
public class RocketMQBinderAutoConfiguration {
@Autowired
private RocketMQExtendedBindingProperties extendedBindingProperties;
@Autowired
private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Bean
public RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() {
return new RocketMQConfigBeanPostProcessor();
}
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
public CompositeMessageConverter rocketMQMessageConverter() {
return new RocketMQMessageConverter().getMessageConverter();
}
@Bean
@ConditionalOnEnabledHealthIndicator("rocketmq")
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() {
return new RocketMQBinderHealthIndicator();
}
@Bean
public RocketMQTopicProvisioner rocketMQTopicProvisioner() {
return new RocketMQTopicProvisioner();
}
@Bean
public RocketMQMessageChannelBinder rocketMQMessageChannelBinder(
RocketMQTopicProvisioner provisioningProvider) {
return new RocketMQMessageChannelBinder(rocketBinderConfigurationProperties,
extendedBindingProperties, provisioningProvider);
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.contants;
import org.apache.rocketmq.common.message.MessageConst;
import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
/**
* @author zkzlx
*/
public class RocketMQConst extends MessageConst {
/**
* Header key for RocketMQ Transactional Args.
*/
public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
/**
* Default NameServer value.
*/
public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
/**
* Default group for SCS RocketMQ Binder.
*/
public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
/**
* RocketMQ re-consume times.
*/
public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
/**
* It is mainly provided for conversion between rocketMq-message and Spring-message,
* and parameters are passed through HEADERS.
*/
public static class Headers {
public static final String KEYS = MessageConst.PROPERTY_KEYS;
public static final String TAGS = MessageConst.PROPERTY_TAGS;
public static final String TOPIC = "MQ_TOPIC";
/**
* The ID of the message.
*/
public static final String MESSAGE_ID = "MQ_MESSAGE_ID";
/**
* The timestamp that the message producer invokes the message sending API.
*/
public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP";
/**
* The IP and port number of the message producer
*/
public static final String BORN_HOST = "MQ_BORN_HOST";
/**
* Message flag, MQ is not processed and is available for use by applications.
*/
public static final String FLAG = "MQ_FLAG";
/**
* Message consumption queue ID
*/
public static final String QUEUE_ID = "MQ_QUEUE_ID";
/**
* Message system Flag, such as whether or not to compress, whether or not to
* transactional messages.
*/
public static final String SYS_FLAG = "MQ_SYS_FLAG";
/**
* The transaction ID of the transaction message.
*/
public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID";
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.convert;
import java.util.ArrayList;
import java.util.List;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.util.ClassUtils;
/**
* The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME}
* @author zkzlx
*/
public class RocketMQMessageConverter {
public static final String DEFAULT_NAME = "rocketMQMessageConverter";
private static final boolean JACKSON_PRESENT;
private static final boolean FASTJSON_PRESENT;
static {
ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
JACKSON_PRESENT = ClassUtils
.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader)
&& ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
classLoader);
FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader)
&& ClassUtils.isPresent(
"com.alibaba.fastjson.support.config.FastJsonConfig",
classLoader);
}
private CompositeMessageConverter messageConverter;
public RocketMQMessageConverter() {
List<MessageConverter> messageConverters = new ArrayList<>();
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
byteArrayMessageConverter.setContentTypeResolver(null);
messageConverters.add(byteArrayMessageConverter);
messageConverters.add(new StringMessageConverter());
if (JACKSON_PRESENT) {
messageConverters.add(new MappingJackson2MessageConverter());
}
if (FASTJSON_PRESENT) {
try {
messageConverters.add((MessageConverter) ClassUtils.forName(
"com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
ClassUtils.getDefaultClassLoader()).newInstance());
}
catch (ClassNotFoundException | IllegalAccessException
| InstantiationException ignored) {
// ignore this exception
}
}
messageConverter = new CompositeMessageConverter(messageConverters);
}
public CompositeMessageConverter getMessageConverter() {
return messageConverter;
}
public void setMessageConverter(CompositeMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.custom;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.util.StringUtils;
/**
* Gets the beans configured in the configuration file
*
* @author junboXiang
*/
public final class RocketMQBeanContainerCache {
private static final Class<?>[] CLASSES = new Class[] {
CompositeMessageConverter.class, AllocateMessageQueueStrategy.class,
MessageQueueSelector.class, MessageListener.class, TransactionListener.class,
SendCallback.class, CheckForbiddenHook.class, SendMessageHook.class,
ErrorAcknowledgeHandler.class };
private static final Map<String, Object> BEANS_CACHE = new ConcurrentHashMap<>();
static void putBean(String beanName, Object beanObj) {
BEANS_CACHE.put(beanName, beanObj);
}
static Class<?>[] getClassAry() {
return CLASSES;
}
public static <T> T getBean(String beanName, Class<T> clazz) {
return getBean(beanName, clazz, null);
}
public static <T> T getBean(String beanName, Class<T> clazz, T defaultObj) {
if (StringUtils.isEmpty(beanName)) {
return defaultObj;
}
Object obj = BEANS_CACHE.get(beanName);
if (null == obj) {
return defaultObj;
}
if (clazz.isAssignableFrom(obj.getClass())) {
return (T) obj;
}
return defaultObj;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.custom;
import java.util.stream.Stream;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
/**
* find RocketMQ bean by annotations
*
* @author junboXiang
*
*/
public class RocketMQConfigBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
Stream.of(RocketMQBeanContainerCache.getClassAry()).forEach(clazz -> {
if (clazz.isAssignableFrom(bean.getClass())) {
RocketMQBeanContainerCache.putBean(beanName, bean);
}
});
return bean;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.extend;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.messaging.Message;
/**
* @author zkzlx
*/
public interface ErrorAcknowledgeHandler {
/**
* Ack state handling, including receive, reject, and retry, when a consumption
* exception occurs.
* @param message
* @return see {@link Status}
*/
Status handler(Message<?> message);
}

View File

@ -0,0 +1,156 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Extended function related to producer . eg:initial
*
* @author zkzlx
*/
public final class RocketMQConsumerFactory {
private final static Logger log = LoggerFactory
.getLogger(RocketMQConsumerFactory.class);
public static DefaultMQPushConsumer initPushConsumer(
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
.getExtension();
Assert.notNull(consumerProperties.getGroup(),
"Property 'group' is required - consumerGroup");
Assert.notNull(consumerProperties.getNameServer(),
"Property 'nameServer' is required");
AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
.getBean(consumerProperties.getAllocateMessageQueueStrategy(),
AllocateMessageQueueStrategy.class,
new AllocateMessageQueueAveragely());
RPCHook rpcHook = null;
if (!StringUtils.isEmpty(consumerProperties.getAccessKey())
&& !StringUtils.isEmpty(consumerProperties.getSecretKey())) {
rpcHook = new AclClientRPCHook(
new SessionCredentials(consumerProperties.getAccessKey(),
consumerProperties.getSecretKey()));
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
consumerProperties.getGroup(), rpcHook, allocateMessageQueueStrategy,
consumerProperties.getEnableMsgTrace(),
consumerProperties.getCustomizedTraceTopic());
consumer.setVipChannelEnabled(
null == rpcHook && consumerProperties.getVipChannelEnabled());
consumer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
consumer.setNamespace(consumerProperties.getNamespace());
consumer.setNamesrvAddr(consumerProperties.getNameServer());
consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
consumer.setUseTLS(consumerProperties.getUseTLS());
consumer.setPullTimeDelayMillsWhenException(
consumerProperties.getPullTimeDelayMillsWhenException());
consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
consumer.setHeartbeatBrokerInterval(
consumerProperties.getHeartbeatBrokerInterval());
consumer.setPersistConsumerOffsetInterval(
consumerProperties.getPersistConsumerOffsetInterval());
consumer.setPullInterval(consumerProperties.getPush().getPullInterval());
consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency());
consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency());
return consumer;
}
/**
* todo Compatible with versions less than 4.6 ?
* @return
*/
public static DefaultLitePullConsumer initPullConsumer(
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
.getExtension();
Assert.notNull(consumerProperties.getGroup(),
"Property 'group' is required - consumerGroup");
Assert.notNull(consumerProperties.getNameServer(),
"Property 'nameServer' is required");
AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
.getBean(consumerProperties.getAllocateMessageQueueStrategy(),
AllocateMessageQueueStrategy.class,
new AllocateMessageQueueAveragely());
RPCHook rpcHook = null;
if (!StringUtils.isEmpty(consumerProperties.getAccessKey())
&& !StringUtils.isEmpty(consumerProperties.getSecretKey())) {
rpcHook = new AclClientRPCHook(
new SessionCredentials(consumerProperties.getAccessKey(),
consumerProperties.getSecretKey()));
}
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(
consumerProperties.getNamespace(), consumerProperties.getGroup(),
rpcHook);
consumer.setVipChannelEnabled(
null == rpcHook && consumerProperties.getVipChannelEnabled());
consumer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);
consumer.setNamesrvAddr(consumerProperties.getNameServer());
consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
consumer.setUseTLS(consumerProperties.getUseTLS());
consumer.setPullTimeDelayMillsWhenException(
consumerProperties.getPullTimeDelayMillsWhenException());
consumer.setConsumerTimeoutMillisWhenSuspend(
consumerProperties.getPull().getConsumerTimeoutMillisWhenSuspend());
consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
consumer.setHeartbeatBrokerInterval(
consumerProperties.getHeartbeatBrokerInterval());
consumer.setPersistConsumerOffsetInterval(
consumerProperties.getPersistConsumerOffsetInterval());
consumer.setPollTimeoutMillis(
consumerProperties.getPull().getPollTimeoutMillis());
consumer.setPullThreadNums(extendedConsumerProperties.getConcurrency());
// The internal queues are cached by a maximum of 1000
consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension()
.getPull().getPullThresholdForAll());
return consumer;
}
private static MessageModel getMessageModel(String messageModel) {
for (MessageModel model : MessageModel.values()) {
if (model.getModeCN().equalsIgnoreCase(messageModel)) {
return model;
}
}
return MessageModel.CLUSTERING;
}
}

View File

@ -0,0 +1,228 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound;
import java.util.List;
import java.util.function.Supplier;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* TODO Describe what it does
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport
implements OrderlyShutdownCapable {
private static final Logger log = LoggerFactory
.getLogger(RocketMQInboundChannelAdapter.class);
private RetryTemplate retryTemplate;
private RecoveryCallback<Object> recoveryCallback;
private DefaultMQPushConsumer pushConsumer;
private final String topic;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
public RocketMQInboundChannelAdapter(String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
this.topic = topic;
this.extendedConsumerProperties = extendedConsumerProperties;
}
@Override
protected void onInit() {
if (extendedConsumerProperties.getExtension() == null
|| !extendedConsumerProperties.getExtension().getEnabled()) {
return;
}
Instrumentation instrumentation = new Instrumentation(topic, this);
try {
super.onInit();
if (this.retryTemplate != null) {
Assert.state(getErrorChannel() == null,
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
this.retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
});
}
pushConsumer = RocketMQConsumerFactory
.initPushConsumer(extendedConsumerProperties);
// prepare register consumer message listener,the next step is to be
// compatible with a custom MessageListener.
if (extendedConsumerProperties.getExtension().getPush().getOrderly()) {
pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs,
context) -> RocketMQInboundChannelAdapter.this
.consumeMessage(msgs, () -> {
context.setSuspendCurrentQueueTimeMillis(
extendedConsumerProperties.getExtension()
.getPush()
.getSuspendCurrentQueueTimeMillis());
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}, () -> ConsumeOrderlyStatus.SUCCESS));
}
else {
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> RocketMQInboundChannelAdapter.this
.consumeMessage(msgs, () -> {
context.setDelayLevelWhenNextConsume(
extendedConsumerProperties.getExtension()
.getPush()
.getDelayLevelWhenNextConsume());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS));
}
instrumentation.markStartedSuccessfully();
}
catch (Exception e) {
instrumentation.markStartFailed(e);
log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
.build(), e);
}
finally {
InstrumentationManager.addHealthInstrumentation(instrumentation);
}
}
/**
* The actual execution of a user-defined input consumption service method.
* @param messageExtList rocket mq message list
* @param failSupplier {@link ConsumeConcurrentlyStatus} or
* {@link ConsumeOrderlyStatus}
* @param sucSupplier {@link ConsumeConcurrentlyStatus} or
* {@link ConsumeOrderlyStatus}
* @param <R>
* @return
*/
private <R> R consumeMessage(List<MessageExt> messageExtList,
Supplier<R> failSupplier, Supplier<R> sucSupplier) {
if (CollectionUtils.isEmpty(messageExtList)) {
throw new MessagingException(
"DefaultMQPushConsumer consuming failed, Caused by messageExtList is empty");
}
for (MessageExt messageExt : messageExtList) {
try {
Message<?> message = RocketMQMessageConverterSupport
.convertMessage2Spring(messageExt);
if (this.retryTemplate != null) {
this.retryTemplate.execute(context -> {
this.sendMessage(message);
return message;
}, this.recoveryCallback);
}
else {
this.sendMessage(message);
}
}
catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
return failSupplier.get();
}
}
return sucSupplier.get();
}
@Override
protected void doStart() {
if (extendedConsumerProperties.getExtension() == null
|| !extendedConsumerProperties.getExtension().getEnabled()) {
return;
}
try {
pushConsumer.subscribe(topic, RocketMQUtils.getMessageSelector(
extendedConsumerProperties.getExtension().getSubscription()));
pushConsumer.start();
}
catch (Exception e) {
log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
.build(), e);
}
}
@Override
protected void doStop() {
if (pushConsumer != null) {
pushConsumer.shutdown();
}
}
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
public void setRecoveryCallback(RecoveryCallback<Object> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
@Override
public int beforeShutdown() {
this.stop();
return 0;
}
@Override
public int afterShutdown() {
return 0;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.messaging.Message;
/**
* By default, if consumption fails, the corresponding MessageQueue will always be
* retried, that is, the consumption of other messages in the MessageQueue will be
* blocked.
*
* @author zkzlx
*/
public class DefaultErrorAcknowledgeHandler implements ErrorAcknowledgeHandler {
/**
* Ack state handling, including receive, reject, and retry, when a consumption
* exception occurs.
*
* @param message
* @return see {@link Status}
*/
@Override
public Status handler(Message<?> message) {
return Status.REQUEUE;
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.util.Assert;
/**
* A pollable {@link org.springframework.integration.core.MessageSource} for RocketMQ.
* @author zkzlx
*/
public class RocketMQAckCallback implements AcknowledgmentCallback {
private final static Logger log = LoggerFactory.getLogger(RocketMQAckCallback.class);
private boolean acknowledged;
private boolean autoAckEnabled = true;
private MessageExt messageExt;
private AssignedMessageQueue assignedMessageQueue;
private DefaultLitePullConsumer consumer;
private final MessageQueue messageQueue;
public RocketMQAckCallback(DefaultLitePullConsumer consumer,
AssignedMessageQueue assignedMessageQueue, MessageQueue messageQueue,
MessageExt messageExt) {
this.messageExt = messageExt;
this.consumer = consumer;
this.assignedMessageQueue = assignedMessageQueue;
this.messageQueue = messageQueue;
}
@Override
public boolean isAcknowledged() {
return this.acknowledged;
}
@Override
public void noAutoAck() {
this.autoAckEnabled = false;
}
@Override
public boolean isAutoAck() {
return this.autoAckEnabled;
}
@Override
public void acknowledge(Status status) {
Assert.notNull(status, "'status' cannot be null");
if (this.acknowledged) {
throw new IllegalStateException("Already acknowledged");
}
synchronized (messageQueue) {
try {
long offset = messageExt.getQueueOffset();
switch (status) {
case REJECT:
case ACCEPT:
long consumerOffset = assignedMessageQueue
.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue
.getProcessQueue(messageQueue);
if (processQueue != null && !processQueue.isDropped()) {
consumer.getOffsetStore().updateOffset(messageQueue,
consumerOffset, false);
}
}
if (consumer.getMessageModel() == MessageModel.BROADCASTING) {
consumer.getOffsetStore().persist(messageQueue);
}
break;
case REQUEUE:
consumer.seek(messageQueue, offset);
break;
default:
break;
}
}
catch (MQClientException e) {
throw new IllegalStateException(e);
}
finally {
this.acknowledged = true;
}
}
}
}

View File

@ -0,0 +1,161 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
import java.lang.reflect.Field;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageSource extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageSource.class);
private DefaultLitePullConsumer consumer;
private AssignedMessageQueue assignedMessageQueue;
private volatile boolean running;
private final String topic;
private final MessageSelector messageSelector;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
public RocketMQMessageSource(String name,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
this.topic = name;
this.messageSelector = RocketMQUtils.getMessageSelector(
extendedConsumerProperties.getExtension().getSubscription());
this.extendedConsumerProperties = extendedConsumerProperties;
}
@Override
public synchronized void start() {
Instrumentation instrumentation = new Instrumentation(topic, this);
try {
if (this.isRunning()) {
throw new IllegalStateException(
"pull consumer already running. " + this.toString());
}
this.consumer = RocketMQConsumerFactory
.initPullConsumer(extendedConsumerProperties);
// This parameter must be 1, otherwise doReceive cannot be handled singly.
this.consumer.setPullBatchSize(1);
this.consumer.subscribe(topic, messageSelector);
this.consumer.setAutoCommit(false);
this.assignedMessageQueue = acquireAssignedMessageQueue(this.consumer);
this.consumer.start();
instrumentation.markStartedSuccessfully();
}
catch (MQClientException e) {
instrumentation.markStartFailed(e);
log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e);
}
finally {
InstrumentationManager.addHealthInstrumentation(instrumentation);
}
this.running = true;
}
private AssignedMessageQueue acquireAssignedMessageQueue(
DefaultLitePullConsumer consumer) {
Field field = ReflectionUtils.findField(DefaultLitePullConsumer.class,
"defaultLitePullConsumerImpl");
assert field != null;
field.setAccessible(true);
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) ReflectionUtils
.getField(field, consumer);
field = ReflectionUtils.findField(DefaultLitePullConsumerImpl.class,
"assignedMessageQueue");
assert field != null;
field.setAccessible(true);
return (AssignedMessageQueue) ReflectionUtils.getField(field,
defaultLitePullConsumerImpl);
}
@Override
public synchronized void stop() {
if (this.isRunning() && null != consumer) {
consumer.unsubscribe(topic);
consumer.shutdown();
this.running = false;
}
}
@Override
public synchronized boolean isRunning() {
return running;
}
@Override
protected synchronized Object doReceive() {
List<MessageExt> messageExtList = consumer.poll();
if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
return null;
}
MessageExt messageExt = messageExtList.get(0);
MessageQueue messageQueue = null;
for (MessageQueue queue : assignedMessageQueue.getAssignedMessageQueues()) {
if (queue.getQueueId() == messageExt.getQueueId()) {
messageQueue = queue;
break;
}
}
Message message = RocketMQMessageConverterSupport
.convertMessage2Spring(messageExtList.get(0));
return MessageBuilder.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
new RocketMQAckCallback(this.consumer, assignedMessageQueue,
messageQueue, messageExt))
.build();
}
@Override
public String getComponentType() {
return "rocketmq:message-source";
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;
import java.lang.reflect.Field;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Extended function related to producer . eg:initial
*
* @author zkzlx
*/
public final class RocketMQProduceFactory {
private final static Logger log = LoggerFactory
.getLogger(RocketMQProduceFactory.class);
/**
* init for the producer,including convert producer params.
* @return
*/
public static DefaultMQProducer initRocketMQProducer(String topic,
RocketMQProducerProperties producerProperties) {
Assert.notNull(producerProperties.getGroup(),
"Property 'group' is required - producerGroup");
Assert.notNull(producerProperties.getNameServer(),
"Property 'nameServer' is required");
RPCHook rpcHook = null;
if (!StringUtils.isEmpty(producerProperties.getAccessKey())
&& !StringUtils.isEmpty(producerProperties.getSecretKey())) {
rpcHook = new AclClientRPCHook(
new SessionCredentials(producerProperties.getAccessKey(),
producerProperties.getSecretKey()));
}
DefaultMQProducer producer;
if (RocketMQProducerProperties.ProducerType.Trans
.equalsName(producerProperties.getProducerType())) {
producer = new TransactionMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook);
if (producerProperties.getEnableMsgTrace()) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE,
producerProperties.getCustomizedTraceTopic(), rpcHook);
dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
Field field = DefaultMQProducer.class
.getDeclaredField("traceDispatcher");
field.setAccessible(true);
field.set(producer, dispatcher);
producer.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(dispatcher));
}
catch (Throwable e) {
log.error(
"system mq-trace hook init failed ,maybe can't send msg trace data");
}
}
}
else {
producer = new DefaultMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook,
producerProperties.getEnableMsgTrace(),
producerProperties.getCustomizedTraceTopic());
}
producer.setVipChannelEnabled(
null == rpcHook && producerProperties.getVipChannelEnabled());
producer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid()));
producer.setNamesrvAddr(producerProperties.getNameServer());
producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout());
producer.setRetryTimesWhenSendFailed(
producerProperties.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(
producerProperties.getRetryTimesWhenSendAsyncFailed());
producer.setCompressMsgBodyOverHowmuch(
producerProperties.getCompressMsgBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(
producerProperties.getRetryAnotherBroker());
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
producer.setUseTLS(producerProperties.getUseTLS());
CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean(
producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class);
if (null != checkForbiddenHook) {
producer.getDefaultMQProducerImpl()
.registerCheckForbiddenHook(checkForbiddenHook);
}
SendMessageHook sendMessageHook = RocketMQBeanContainerCache
.getBean(producerProperties.getSendMessageHook(), SendMessageHook.class);
if (null != sendMessageHook) {
producer.getDefaultMQProducerImpl().registerSendMessageHook(sendMessageHook);
}
return producer;
}
}

View File

@ -0,0 +1,286 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQProducerMessageHandler extends AbstractMessageHandler
implements Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQProducerMessageHandler.class);
private volatile boolean running = false;
private volatile boolean isTrans = false;
private ErrorMessageStrategy errorMessageStrategy;
private MessageChannel sendFailureChannel;
private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
private DefaultMQProducer defaultMQProducer;
private MessageQueueSelector messageQueueSelector;
private final ProducerDestination destination;
private final ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties;
private final RocketMQProducerProperties mqProducerProperties;
public RocketMQProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties,
RocketMQProducerProperties mqProducerProperties) {
this.destination = destination;
this.extendedProducerProperties = extendedProducerProperties;
this.mqProducerProperties = mqProducerProperties;
}
@Override
protected void onInit() {
if (null == mqProducerProperties || !mqProducerProperties.getEnabled()) {
return;
}
super.onInit();
this.defaultMQProducer = RocketMQProduceFactory
.initRocketMQProducer(destination.getName(), mqProducerProperties);
this.isTrans = defaultMQProducer instanceof TransactionMQProducer;
// Use the default if the partition is on and no customization is available.
this.messageQueueSelector = RocketMQBeanContainerCache.getBean(
mqProducerProperties.getMessageQueueSelector(),
MessageQueueSelector.class,
extendedProducerProperties.isPartitioned()
? new PartitionMessageQueueSelector()
: null);
}
@Override
public void start() {
Instrumentation instrumentation = new Instrumentation(destination.getName(),
this);
try {
defaultMQProducer.start();
// TransactionMQProducer does not currently support custom
// MessageQueueSelector.
if (!isTrans && extendedProducerProperties.isPartitioned()) {
List<MessageQueue> messageQueues = defaultMQProducer
.fetchPublishMessageQueues(destination.getName());
if (extendedProducerProperties.getPartitionCount() != messageQueues
.size()) {
logger.info(String.format(
"The partition count of topic '%s' will change from '%s' to '%s'",
destination.getName(),
extendedProducerProperties.getPartitionCount(),
messageQueues.size()));
extendedProducerProperties.setPartitionCount(messageQueues.size());
// may be npe!
partitioningInterceptor.setPartitionCount(
extendedProducerProperties.getPartitionCount());
}
}
running = true;
instrumentation.markStartedSuccessfully();
}
catch (MQClientException | NullPointerException e) {
instrumentation.markStartFailed(e);
log.error("The defaultMQProducer startup failure !!!", e);
}
finally {
InstrumentationManager.addHealthInstrumentation(instrumentation);
}
}
@Override
public void stop() {
if (running && null != defaultMQProducer) {
defaultMQProducer.shutdown();
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
protected void handleMessageInternal(Message<?> message) {
try {
org.apache.rocketmq.common.message.Message mqMessage = RocketMQMessageConverterSupport
.convertMessage2MQ(destination.getName(), message);
SendResult sendResult;
if (defaultMQProducer instanceof TransactionMQProducer) {
TransactionListener transactionListener = RocketMQBeanContainerCache
.getBean(mqProducerProperties.getTransactionListener(),
TransactionListener.class);
if (transactionListener == null) {
throw new MessagingException(
"TransactionMQProducer must have a TransactionMQProducer !!! ");
}
((TransactionMQProducer) defaultMQProducer)
.setTransactionListener(transactionListener);
log.info("send transaction message :" + mqMessage);
sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage,
message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS));
}
else {
log.info("send message :" + mqMessage);
sendResult = this.send(mqMessage, this.messageQueueSelector,
message.getHeaders(), message);
}
if (sendResult == null
|| !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.error("message send fail.SendStatus is not OK ");
this.doFail(message, new MessagingException(
"message send fail.SendStatus is not OK."));
}
}
catch (Exception e) {
log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage(),
e);
this.doFail(message, e);
}
}
private SendResult send(org.apache.rocketmq.common.message.Message mqMessage,
MessageQueueSelector selector, Object args, Message<?> message)
throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setSendStatus(SendStatus.SEND_OK);
if (RocketMQProducerProperties.SendType.OneWay
.equalsName(mqProducerProperties.getSendType())) {
if (null != selector) {
defaultMQProducer.sendOneway(mqMessage, selector, args);
}
else {
defaultMQProducer.sendOneway(mqMessage);
}
return sendResult;
}
if (RocketMQProducerProperties.SendType.Sync
.equalsName(mqProducerProperties.getSendType())) {
if (null != selector) {
return defaultMQProducer.send(mqMessage, selector, args);
}
return defaultMQProducer.send(mqMessage);
}
if (RocketMQProducerProperties.SendType.Async
.equalsName(mqProducerProperties.getSendType())) {
if (null != selector) {
defaultMQProducer.send(mqMessage, selector, args,
this.getSendCallback(message));
}
else {
defaultMQProducer.send(mqMessage, this.getSendCallback(message));
}
return sendResult;
}
throw new MessagingException(
"message hasn't been sent,cause by : the SendType must be in this values[OneWay, Async, Sync]");
}
/**
* https://github.com/alibaba/spring-cloud-alibaba/issues/1408
* @param message
* @return
*/
private SendCallback getSendCallback(Message<?> message) {
SendCallback sendCallback = RocketMQBeanContainerCache
.getBean(mqProducerProperties.getSendCallBack(), SendCallback.class);
if (null == sendCallback) {
sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
RocketMQProducerMessageHandler.this.doFail(message, e);
}
};
}
return sendCallback;
}
private void doFail(Message<?> message, Throwable e) {
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(getErrorMessageStrategy().buildErrorMessage(e,
ErrorMessageUtils.getAttributeAccessor(message, message)));
}
else {
throw new MessagingException(message, e);
}
}
public MessageChannel getSendFailureChannel() {
return sendFailureChannel;
}
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
this.sendFailureChannel = sendFailureChannel;
}
public ErrorMessageStrategy getErrorMessageStrategy() {
return errorMessageStrategy;
}
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
this.errorMessageStrategy = errorMessageStrategy;
}
public PartitioningInterceptor getPartitioningInterceptor() {
return partitioningInterceptor;
}
public RocketMQProducerMessageHandler setPartitioningInterceptor(
PartitioningInterceptor partitioningInterceptor) {
this.partitioningInterceptor = partitioningInterceptor;
return this;
}
}

View File

@ -0,0 +1,201 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import java.io.Serializable;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
/**
* @author zkzlx
*/
public class RocketMQCommonProperties implements Serializable {
private static final long serialVersionUID = -6724870154343284715L;
private boolean enabled = true;
private String nameServer;
/**
* The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Consumers of the same role is required to have exactly same subscriptions and
* consumerGroup to correctly achieve load balance. It's required and needs to be
* globally unique.
* </p>
* Producer group conceptually aggregates all producer instances of exactly same role,
* which is particularly important when transactional messages are involved.
* </p>
* <p>
* For non-transactional messages, it does not matter as long as it's unique per
* process.
* </p>
* <p>
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further
* discussion.
*/
private String group;
private String namespace;
private String accessChannel = AccessChannel.LOCAL.name();
/**
* Pulling topic information interval from the named server.
* see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
* updateTopicRouteInfoFromNameServer.
*/
private int pollNameServerInterval = 1000 * 30;
/**
* Heartbeat interval in microseconds with message broker.
* see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
* sendHeartbeatToAllBroker .
*/
private int heartbeatBrokerInterval = 1000 * 30;
/**
* Offset persistent interval for consumer.
* see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
* sendHeartbeatToAllBroker .
*/
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean vipChannelEnabled = false;
private boolean useTLS = TlsSystemConfig.tlsEnable;
private boolean enableMsgTrace = true;
private String customizedTraceTopic;
public boolean getEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel;
}
public int getPollNameServerInterval() {
return pollNameServerInterval;
}
public void setPollNameServerInterval(int pollNameServerInterval) {
this.pollNameServerInterval = pollNameServerInterval;
}
public int getHeartbeatBrokerInterval() {
return heartbeatBrokerInterval;
}
public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
this.heartbeatBrokerInterval = heartbeatBrokerInterval;
}
public int getPersistConsumerOffsetInterval() {
return persistConsumerOffsetInterval;
}
public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
}
public boolean getVipChannelEnabled() {
return vipChannelEnabled;
}
public void setVipChannelEnabled(boolean vipChannelEnabled) {
this.vipChannelEnabled = vipChannelEnabled;
}
public boolean getUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
public boolean getEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/**
* Container object for RocketMQ specific extended producer and consumer binding
* properties.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQSpecificPropertiesProvider
implements BinderSpecificPropertiesProvider {
/**
* Consumer specific binding properties. @see {@link RocketMQConsumerProperties}.
*/
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
/**
* Producer specific binding properties. @see {@link RocketMQProducerProperties}.
*/
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
/**
* @return {@link RocketMQConsumerProperties} Consumer specific binding
* properties. @see {@link RocketMQConsumerProperties}.
*/
@Override
public RocketMQConsumerProperties getConsumer() {
return this.consumer;
}
public void setConsumer(RocketMQConsumerProperties consumer) {
this.consumer = consumer;
}
/**
* @return {@link RocketMQProducerProperties} Producer specific binding
* properties. @see {@link RocketMQProducerProperties}.
*/
@Override
public RocketMQProducerProperties getProducer() {
return this.producer;
}
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
}

View File

@ -0,0 +1,185 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.support;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Objects;
import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst.Headers;
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
/**
*
* @author zkzlx
*/
public class RocketMQMessageConverterSupport {
private static final CompositeMessageConverter MESSAGE_CONVERTER = RocketMQBeanContainerCache
.getBean(RocketMQMessageConverter.DEFAULT_NAME,
CompositeMessageConverter.class,
new RocketMQMessageConverter().getMessageConverter());
public static Message convertMessage2Spring(MessageExt message) {
MessageBuilder messageBuilder = MessageBuilder.withPayload(message.getBody())
.setHeader(toRocketHeaderKey(Headers.KEYS), message.getKeys())
.setHeader(toRocketHeaderKey(Headers.TAGS), message.getTags())
.setHeader(toRocketHeaderKey(Headers.TOPIC), message.getTopic())
.setHeader(toRocketHeaderKey(Headers.MESSAGE_ID), message.getMsgId())
.setHeader(toRocketHeaderKey(Headers.BORN_TIMESTAMP),
message.getBornTimestamp())
.setHeader(toRocketHeaderKey(Headers.BORN_HOST),
message.getBornHostString())
.setHeader(toRocketHeaderKey(Headers.FLAG), message.getFlag())
.setHeader(toRocketHeaderKey(Headers.QUEUE_ID), message.getQueueId())
.setHeader(toRocketHeaderKey(Headers.SYS_FLAG), message.getSysFlag())
.setHeader(toRocketHeaderKey(Headers.TRANSACTION_ID),
message.getTransactionId());
addUserProperties(message.getProperties(), messageBuilder);
return messageBuilder.build();
}
public static String toRocketHeaderKey(String rawKey) {
return "ROCKET_" + rawKey;
}
private static void addUserProperties(Map<String, String> properties,
MessageBuilder messageBuilder) {
if (!CollectionUtils.isEmpty(properties)) {
properties.forEach((key, val) -> {
if (!MessageConst.STRING_HASH_SET.contains(key)
&& !MessageHeaders.ID.equals(key)
&& !MessageHeaders.TIMESTAMP.equals(key)) {
messageBuilder.setHeader(key, val);
}
});
}
}
public static org.apache.rocketmq.common.message.Message convertMessage2MQ(
String destination, Message<?> source) {
Message<?> message = MESSAGE_CONVERTER.toMessage(source.getPayload(),
source.getHeaders());
assert message != null;
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
message = builder.build();
return doConvert(destination, message);
}
private static org.apache.rocketmq.common.message.Message doConvert(String topic,
Message<?> message) {
Charset charset = Charset.defaultCharset();
Object payloadObj = message.getPayload();
byte[] payloads;
try {
if (payloadObj instanceof String) {
payloads = ((String) payloadObj).getBytes(charset);
}
else if (payloadObj instanceof byte[]) {
payloads = (byte[]) message.getPayload();
}
else {
String jsonObj = (String) MESSAGE_CONVERTER.fromMessage(message,
payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format(
"empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
MESSAGE_CONVERTER.getClass(), payloadObj.getClass(),
payloadObj));
}
payloads = jsonObj.getBytes(charset);
}
}
catch (Exception e) {
throw new RuntimeException("convert to RocketMQ message failed.", e);
}
return getAndWrapMessage(topic, message.getHeaders(), payloads);
}
private static org.apache.rocketmq.common.message.Message getAndWrapMessage(
String topic, MessageHeaders headers, byte[] payloads) {
if (topic == null || topic.length() < 1) {
return null;
}
if (payloads == null || payloads.length < 1) {
return null;
}
org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(
topic, payloads);
if (Objects.nonNull(headers) && !headers.isEmpty()) {
Object tag = headers.getOrDefault(Headers.TAGS,
headers.get(toRocketHeaderKey(Headers.TAGS)));
if (!StringUtils.isEmpty(tag)) {
rocketMsg.setTags(String.valueOf(tag));
}
Object keys = headers.getOrDefault(Headers.KEYS,
headers.get(toRocketHeaderKey(Headers.KEYS)));
if (!StringUtils.isEmpty(keys)) {
rocketMsg.setKeys(keys.toString());
}
Object flagObj = headers.getOrDefault(Headers.FLAG,
headers.get(toRocketHeaderKey(Headers.FLAG)));
int flag = 0;
int delayLevel = 0;
try {
flagObj = flagObj == null ? 0 : flagObj;
Object delayLevelObj = headers.getOrDefault(
RocketMQConst.PROPERTY_DELAY_TIME_LEVEL,
headers.get(toRocketHeaderKey(
RocketMQConst.PROPERTY_DELAY_TIME_LEVEL)));
delayLevelObj = delayLevelObj == null ? 0 : delayLevelObj;
delayLevel = Integer.parseInt(String.valueOf(delayLevelObj));
flag = Integer.parseInt(String.valueOf(flagObj));
}
catch (Exception ignored) {
}
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
rocketMsg.setFlag(flag);
Object waitStoreMsgOkObj = headers
.getOrDefault(RocketMQConst.PROPERTY_WAIT_STORE_MSG_OK, "true");
rocketMsg.setWaitStoreMsgOK(
Boolean.parseBoolean(String.valueOf(waitStoreMsgOkObj)));
headers.entrySet().stream()
.filter(entry -> !Objects.equals(entry.getKey(), Headers.FLAG))
.forEach(entry -> {
if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
rocketMsg.putUserProperty(entry.getKey(),
String.valueOf(entry.getValue()));
}
});
}
return rocketMsg;
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.utils;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQCommonProperties;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.springframework.util.StringUtils;
/**
* TODO Describe what it does
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQUtils {
public static <T extends RocketMQCommonProperties> T mergeRocketMQProperties(
RocketMQBinderConfigurationProperties binderConfigurationProperties,
T mqProperties) {
if (null == binderConfigurationProperties || mqProperties == null) {
return mqProperties;
}
if (StringUtils.isEmpty(mqProperties.getNameServer())) {
mqProperties.setNameServer(binderConfigurationProperties.getNameServer());
}
if (StringUtils.isEmpty(mqProperties.getSecretKey())) {
mqProperties.setSecretKey(binderConfigurationProperties.getSecretKey());
}
if (StringUtils.isEmpty(mqProperties.getAccessKey())) {
mqProperties.setAccessKey(binderConfigurationProperties.getAccessKey());
}
if (StringUtils.isEmpty(mqProperties.getAccessChannel())) {
mqProperties
.setAccessChannel(binderConfigurationProperties.getAccessChannel());
}
if (StringUtils.isEmpty(mqProperties.getNamespace())) {
mqProperties.setNamespace(binderConfigurationProperties.getNamespace());
}
if (StringUtils.isEmpty(mqProperties.getGroup())) {
mqProperties.setGroup(binderConfigurationProperties.getGroup());
}
if (StringUtils.isEmpty(mqProperties.getCustomizedTraceTopic())) {
mqProperties.setCustomizedTraceTopic(
binderConfigurationProperties.getCustomizedTraceTopic());
}
mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer()));
return mqProperties;
}
public static String getInstanceName(RPCHook rpcHook, String identify) {
String separator = "|";
StringBuilder instanceName = new StringBuilder();
if (null != rpcHook) {
SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook)
.getSessionCredentials();
instanceName.append(sessionCredentials.getAccessKey()).append(separator)
.append(sessionCredentials.getSecretKey()).append(separator);
}
instanceName.append(identify).append(separator).append(UtilAll.getPid());
return instanceName.toString();
}
public static String getNameServerStr(String nameServer) {
if (StringUtils.isEmpty(nameServer)) {
return null;
}
return nameServer.replaceAll(",", ";");
}
private static final String SQL = "sql:";
public static MessageSelector getMessageSelector(String expression) {
if (StringUtils.hasText(expression) && expression.startsWith(SQL)) {
return MessageSelector.bySql(expression.replaceFirst(SQL, ""));
}
return MessageSelector.byTag(expression);
}
}