From a6e01c998417669fcd4d4b053db454d37725fe7a Mon Sep 17 00:00:00 2001
From: zkzlx
Date: Mon, 1 Feb 2021 11:24:25 +0800
Subject: [PATCH] Code refactoring and some new feature support
---
...gHandlerMappingsProviderConfiguration.java | 45 +++
.../RocketMQBinderAutoConfiguration.java | 81 +++++
.../rocketmq/contants/RocketMQConst.java | 90 ++++++
.../convert/RocketMQMessageConverter.java | 84 +++++
.../custom/RocketMQBeanContainerCache.java | 75 +++++
.../RocketMQConfigBeanPostProcessor.java | 43 +++
.../extend/ErrorAcknowledgeHandler.java | 35 +++
.../inbound/RocketMQConsumerFactory.java | 156 ++++++++++
.../RocketMQInboundChannelAdapter.java | 228 ++++++++++++++
.../pull/DefaultErrorAcknowledgeHandler.java | 43 +++
.../inbound/pull/RocketMQAckCallback.java | 112 +++++++
.../inbound/pull/RocketMQMessageSource.java | 161 ++++++++++
.../outbound/RocketMQProduceFactory.java | 131 ++++++++
.../RocketMQProducerMessageHandler.java | 286 ++++++++++++++++++
.../properties/RocketMQCommonProperties.java | 201 ++++++++++++
.../RocketMQSpecificPropertiesProvider.java | 66 ++++
.../RocketMQMessageConverterSupport.java | 185 +++++++++++
.../binder/rocketmq/utils/RocketMQUtils.java | 98 ++++++
18 files changed, 2120 insertions(+)
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java
create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
new file mode 100644
index 00000000..eb6e6ce5
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
+import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ExtendedBindingHandlerMappingsProviderConfiguration {
+
+ @Bean
+ public MappingsProvider rocketExtendedPropertiesDefaultMappingsProvider() {
+ return () -> {
+ Map mappings = new HashMap<>();
+ mappings.put(
+ ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.bindings"),
+ ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.default"));
+ mappings.put(
+ ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.streams"),
+ ConfigurationPropertyName
+ .of("spring.cloud.stream.rocketmq.streams.default"));
+ return mappings;
+ };
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java
new file mode 100644
index 00000000..abcb9b96
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
+
+import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
+import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
+import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
+import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQConfigBeanPostProcessor;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+
+/**
+ * issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681
+ * @author Timur Valiev
+ * @author Jim
+ */
+@Configuration(proxyBeanMethods = false)
+@EnableConfigurationProperties({ RocketMQExtendedBindingProperties.class,
+ RocketMQBinderConfigurationProperties.class })
+public class RocketMQBinderAutoConfiguration {
+
+ @Autowired
+ private RocketMQExtendedBindingProperties extendedBindingProperties;
+ @Autowired
+ private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ @Bean
+ public RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() {
+ return new RocketMQConfigBeanPostProcessor();
+ }
+
+ @Bean(RocketMQMessageConverter.DEFAULT_NAME)
+ @ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
+ public CompositeMessageConverter rocketMQMessageConverter() {
+ return new RocketMQMessageConverter().getMessageConverter();
+ }
+
+ @Bean
+ @ConditionalOnEnabledHealthIndicator("rocketmq")
+ @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
+ public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() {
+ return new RocketMQBinderHealthIndicator();
+ }
+
+ @Bean
+ public RocketMQTopicProvisioner rocketMQTopicProvisioner() {
+ return new RocketMQTopicProvisioner();
+ }
+
+ @Bean
+ public RocketMQMessageChannelBinder rocketMQMessageChannelBinder(
+ RocketMQTopicProvisioner provisioningProvider) {
+ return new RocketMQMessageChannelBinder(rocketBinderConfigurationProperties,
+ extendedBindingProperties, provisioningProvider);
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
new file mode 100644
index 00000000..e83a6a17
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.contants;
+
+import org.apache.rocketmq.common.message.MessageConst;
+
+import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
+
+/**
+ * @author zkzlx
+ */
+public class RocketMQConst extends MessageConst {
+
+ /**
+ * Header key for RocketMQ Transactional Args.
+ */
+ public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
+
+ /**
+ * Default NameServer value.
+ */
+ public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
+
+ /**
+ * Default group for SCS RocketMQ Binder.
+ */
+ public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
+
+ /**
+ * RocketMQ re-consume times.
+ */
+ public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
+
+ public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
+
+ /**
+ * It is mainly provided for conversion between rocketMq-message and Spring-message,
+ * and parameters are passed through HEADERS.
+ */
+ public static class Headers {
+ public static final String KEYS = MessageConst.PROPERTY_KEYS;
+ public static final String TAGS = MessageConst.PROPERTY_TAGS;
+ public static final String TOPIC = "MQ_TOPIC";
+ /**
+ * The ID of the message.
+ */
+ public static final String MESSAGE_ID = "MQ_MESSAGE_ID";
+ /**
+ * The timestamp that the message producer invokes the message sending API.
+ */
+ public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP";
+ /**
+ * The IP and port number of the message producer
+ */
+ public static final String BORN_HOST = "MQ_BORN_HOST";
+
+ /**
+ * Message flag, MQ is not processed and is available for use by applications.
+ */
+ public static final String FLAG = "MQ_FLAG";
+ /**
+ * Message consumption queue ID
+ */
+ public static final String QUEUE_ID = "MQ_QUEUE_ID";
+ /**
+ * Message system Flag, such as whether or not to compress, whether or not to
+ * transactional messages.
+ */
+ public static final String SYS_FLAG = "MQ_SYS_FLAG";
+ /**
+ * The transaction ID of the transaction message.
+ */
+ public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID";
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
new file mode 100644
index 00000000..58f17c6e
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.convert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.messaging.converter.ByteArrayMessageConverter;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.converter.StringMessageConverter;
+import org.springframework.util.ClassUtils;
+
+/**
+ * The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME}
+ * @author zkzlx
+ */
+public class RocketMQMessageConverter {
+
+ public static final String DEFAULT_NAME = "rocketMQMessageConverter";
+
+ private static final boolean JACKSON_PRESENT;
+ private static final boolean FASTJSON_PRESENT;
+
+ static {
+ ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
+ JACKSON_PRESENT = ClassUtils
+ .isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader)
+ && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
+ classLoader);
+ FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader)
+ && ClassUtils.isPresent(
+ "com.alibaba.fastjson.support.config.FastJsonConfig",
+ classLoader);
+ }
+
+ private CompositeMessageConverter messageConverter;
+
+ public RocketMQMessageConverter() {
+ List messageConverters = new ArrayList<>();
+ ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
+ byteArrayMessageConverter.setContentTypeResolver(null);
+ messageConverters.add(byteArrayMessageConverter);
+ messageConverters.add(new StringMessageConverter());
+ if (JACKSON_PRESENT) {
+ messageConverters.add(new MappingJackson2MessageConverter());
+ }
+ if (FASTJSON_PRESENT) {
+ try {
+ messageConverters.add((MessageConverter) ClassUtils.forName(
+ "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
+ ClassUtils.getDefaultClassLoader()).newInstance());
+ }
+ catch (ClassNotFoundException | IllegalAccessException
+ | InstantiationException ignored) {
+ // ignore this exception
+ }
+ }
+ messageConverter = new CompositeMessageConverter(messageConverters);
+ }
+
+ public CompositeMessageConverter getMessageConverter() {
+ return messageConverter;
+ }
+
+ public void setMessageConverter(CompositeMessageConverter messageConverter) {
+ this.messageConverter = messageConverter;
+ }
+}
\ No newline at end of file
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java
new file mode 100644
index 00000000..9afc9482
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.custom;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.client.hook.CheckForbiddenHook;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.TransactionListener;
+
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.util.StringUtils;
+
+/**
+ * Gets the beans configured in the configuration file
+ *
+ * @author junboXiang
+ */
+public final class RocketMQBeanContainerCache {
+
+ private static final Class>[] CLASSES = new Class[] {
+ CompositeMessageConverter.class, AllocateMessageQueueStrategy.class,
+ MessageQueueSelector.class, MessageListener.class, TransactionListener.class,
+ SendCallback.class, CheckForbiddenHook.class, SendMessageHook.class,
+ ErrorAcknowledgeHandler.class };
+
+ private static final Map BEANS_CACHE = new ConcurrentHashMap<>();
+
+ static void putBean(String beanName, Object beanObj) {
+ BEANS_CACHE.put(beanName, beanObj);
+ }
+
+ static Class>[] getClassAry() {
+ return CLASSES;
+ }
+
+ public static T getBean(String beanName, Class clazz) {
+ return getBean(beanName, clazz, null);
+ }
+
+ public static T getBean(String beanName, Class clazz, T defaultObj) {
+ if (StringUtils.isEmpty(beanName)) {
+ return defaultObj;
+ }
+ Object obj = BEANS_CACHE.get(beanName);
+ if (null == obj) {
+ return defaultObj;
+ }
+ if (clazz.isAssignableFrom(obj.getClass())) {
+ return (T) obj;
+ }
+ return defaultObj;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java
new file mode 100644
index 00000000..30bd6432
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.custom;
+
+import java.util.stream.Stream;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * find RocketMQ bean by annotations
+ *
+ * @author junboXiang
+ *
+ */
+public class RocketMQConfigBeanPostProcessor implements BeanPostProcessor {
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName)
+ throws BeansException {
+ Stream.of(RocketMQBeanContainerCache.getClassAry()).forEach(clazz -> {
+ if (clazz.isAssignableFrom(bean.getClass())) {
+ RocketMQBeanContainerCache.putBean(beanName, bean);
+ }
+ });
+ return bean;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java
new file mode 100644
index 00000000..fcf6de80
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.extend;
+
+import org.springframework.integration.acks.AcknowledgmentCallback.Status;
+import org.springframework.messaging.Message;
+
+/**
+ * @author zkzlx
+ */
+public interface ErrorAcknowledgeHandler {
+
+ /**
+ * Ack state handling, including receive, reject, and retry, when a consumption
+ * exception occurs.
+ * @param message
+ * @return see {@link Status}
+ */
+ Status handler(Message> message);
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java
new file mode 100644
index 00000000..87624958
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound;
+
+import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+/**
+ * Extended function related to producer . eg:initial
+ *
+ * @author zkzlx
+ */
+public final class RocketMQConsumerFactory {
+
+ private final static Logger log = LoggerFactory
+ .getLogger(RocketMQConsumerFactory.class);
+
+ public static DefaultMQPushConsumer initPushConsumer(
+ ExtendedConsumerProperties extendedConsumerProperties) {
+ RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
+ .getExtension();
+ Assert.notNull(consumerProperties.getGroup(),
+ "Property 'group' is required - consumerGroup");
+ Assert.notNull(consumerProperties.getNameServer(),
+ "Property 'nameServer' is required");
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
+ .getBean(consumerProperties.getAllocateMessageQueueStrategy(),
+ AllocateMessageQueueStrategy.class,
+ new AllocateMessageQueueAveragely());
+ RPCHook rpcHook = null;
+ if (!StringUtils.isEmpty(consumerProperties.getAccessKey())
+ && !StringUtils.isEmpty(consumerProperties.getSecretKey())) {
+ rpcHook = new AclClientRPCHook(
+ new SessionCredentials(consumerProperties.getAccessKey(),
+ consumerProperties.getSecretKey()));
+ }
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
+ consumerProperties.getGroup(), rpcHook, allocateMessageQueueStrategy,
+ consumerProperties.getEnableMsgTrace(),
+ consumerProperties.getCustomizedTraceTopic());
+ consumer.setVipChannelEnabled(
+ null == rpcHook && consumerProperties.getVipChannelEnabled());
+ consumer.setInstanceName(
+ RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
+ consumer.setNamespace(consumerProperties.getNamespace());
+ consumer.setNamesrvAddr(consumerProperties.getNameServer());
+ consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
+ consumer.setUseTLS(consumerProperties.getUseTLS());
+ consumer.setPullTimeDelayMillsWhenException(
+ consumerProperties.getPullTimeDelayMillsWhenException());
+ consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
+ consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
+ consumer.setHeartbeatBrokerInterval(
+ consumerProperties.getHeartbeatBrokerInterval());
+ consumer.setPersistConsumerOffsetInterval(
+ consumerProperties.getPersistConsumerOffsetInterval());
+ consumer.setPullInterval(consumerProperties.getPush().getPullInterval());
+ consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency());
+ consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency());
+ return consumer;
+ }
+
+ /**
+ * todo Compatible with versions less than 4.6 ?
+ * @return
+ */
+ public static DefaultLitePullConsumer initPullConsumer(
+ ExtendedConsumerProperties extendedConsumerProperties) {
+ RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
+ .getExtension();
+ Assert.notNull(consumerProperties.getGroup(),
+ "Property 'group' is required - consumerGroup");
+ Assert.notNull(consumerProperties.getNameServer(),
+ "Property 'nameServer' is required");
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
+ .getBean(consumerProperties.getAllocateMessageQueueStrategy(),
+ AllocateMessageQueueStrategy.class,
+ new AllocateMessageQueueAveragely());
+
+ RPCHook rpcHook = null;
+ if (!StringUtils.isEmpty(consumerProperties.getAccessKey())
+ && !StringUtils.isEmpty(consumerProperties.getSecretKey())) {
+ rpcHook = new AclClientRPCHook(
+ new SessionCredentials(consumerProperties.getAccessKey(),
+ consumerProperties.getSecretKey()));
+ }
+
+ DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(
+ consumerProperties.getNamespace(), consumerProperties.getGroup(),
+ rpcHook);
+ consumer.setVipChannelEnabled(
+ null == rpcHook && consumerProperties.getVipChannelEnabled());
+ consumer.setInstanceName(
+ RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
+ consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);
+ consumer.setNamesrvAddr(consumerProperties.getNameServer());
+ consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
+ consumer.setUseTLS(consumerProperties.getUseTLS());
+ consumer.setPullTimeDelayMillsWhenException(
+ consumerProperties.getPullTimeDelayMillsWhenException());
+ consumer.setConsumerTimeoutMillisWhenSuspend(
+ consumerProperties.getPull().getConsumerTimeoutMillisWhenSuspend());
+ consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
+ consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
+ consumer.setHeartbeatBrokerInterval(
+ consumerProperties.getHeartbeatBrokerInterval());
+ consumer.setPersistConsumerOffsetInterval(
+ consumerProperties.getPersistConsumerOffsetInterval());
+ consumer.setPollTimeoutMillis(
+ consumerProperties.getPull().getPollTimeoutMillis());
+ consumer.setPullThreadNums(extendedConsumerProperties.getConcurrency());
+ // The internal queues are cached by a maximum of 1000
+ consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension()
+ .getPull().getPullThresholdForAll());
+ return consumer;
+ }
+
+ private static MessageModel getMessageModel(String messageModel) {
+ for (MessageModel model : MessageModel.values()) {
+ if (model.getModeCN().equalsIgnoreCase(messageModel)) {
+ return model;
+ }
+ }
+ return MessageModel.CLUSTERING;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java
new file mode 100644
index 00000000..d0aec552
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
+import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
+import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.integration.context.OrderlyShutdownCapable;
+import org.springframework.integration.endpoint.MessageProducerSupport;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+import org.springframework.retry.RecoveryCallback;
+import org.springframework.retry.RetryCallback;
+import org.springframework.retry.RetryContext;
+import org.springframework.retry.RetryListener;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * TODO Describe what it does
+ * @author Jim
+ */
+public class RocketMQInboundChannelAdapter extends MessageProducerSupport
+ implements OrderlyShutdownCapable {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQInboundChannelAdapter.class);
+
+ private RetryTemplate retryTemplate;
+ private RecoveryCallback
+ * Producer group conceptually aggregates all producer instances of exactly same role,
+ * which is particularly important when transactional messages are involved.
+ *
+ *
+ * For non-transactional messages, it does not matter as long as it's unique per
+ * process.
+ *
+ *
+ * See here for further
+ * discussion.
+ */
+ private String group;
+
+ private String namespace;
+ private String accessChannel = AccessChannel.LOCAL.name();
+ /**
+ * Pulling topic information interval from the named server.
+ * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
+ * updateTopicRouteInfoFromNameServer.
+ */
+ private int pollNameServerInterval = 1000 * 30;
+ /**
+ * Heartbeat interval in microseconds with message broker.
+ * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
+ * sendHeartbeatToAllBroker .
+ */
+ private int heartbeatBrokerInterval = 1000 * 30;
+ /**
+ * Offset persistent interval for consumer.
+ * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
+ * sendHeartbeatToAllBroker .
+ */
+ private int persistConsumerOffsetInterval = 1000 * 5;
+
+ private boolean vipChannelEnabled = false;
+
+ private boolean useTLS = TlsSystemConfig.tlsEnable;
+
+ private boolean enableMsgTrace = true;
+ private String customizedTraceTopic;
+
+ public boolean getEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public String getNameServer() {
+ return nameServer;
+ }
+
+ public void setNameServer(String nameServer) {
+ this.nameServer = nameServer;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public String getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(String accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
+ public int getPollNameServerInterval() {
+ return pollNameServerInterval;
+ }
+
+ public void setPollNameServerInterval(int pollNameServerInterval) {
+ this.pollNameServerInterval = pollNameServerInterval;
+ }
+
+ public int getHeartbeatBrokerInterval() {
+ return heartbeatBrokerInterval;
+ }
+
+ public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
+ this.heartbeatBrokerInterval = heartbeatBrokerInterval;
+ }
+
+ public int getPersistConsumerOffsetInterval() {
+ return persistConsumerOffsetInterval;
+ }
+
+ public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
+ this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ }
+
+ public boolean getVipChannelEnabled() {
+ return vipChannelEnabled;
+ }
+
+ public void setVipChannelEnabled(boolean vipChannelEnabled) {
+ this.vipChannelEnabled = vipChannelEnabled;
+ }
+
+ public boolean getUseTLS() {
+ return useTLS;
+ }
+
+ public void setUseTLS(boolean useTLS) {
+ this.useTLS = useTLS;
+ }
+
+ public boolean getEnableMsgTrace() {
+ return enableMsgTrace;
+ }
+
+ public void setEnableMsgTrace(boolean enableMsgTrace) {
+ this.enableMsgTrace = enableMsgTrace;
+ }
+
+ public String getCustomizedTraceTopic() {
+ return customizedTraceTopic;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.customizedTraceTopic = customizedTraceTopic;
+ }
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java
new file mode 100644
index 00000000..99a1a36f
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.properties;
+
+import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
+
+/**
+ * Container object for RocketMQ specific extended producer and consumer binding
+ * properties.
+ *
+ * @author Jim
+ */
+public class RocketMQSpecificPropertiesProvider
+ implements BinderSpecificPropertiesProvider {
+
+ /**
+ * Consumer specific binding properties. @see {@link RocketMQConsumerProperties}.
+ */
+ private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
+
+ /**
+ * Producer specific binding properties. @see {@link RocketMQProducerProperties}.
+ */
+ private RocketMQProducerProperties producer = new RocketMQProducerProperties();
+
+ /**
+ * @return {@link RocketMQConsumerProperties} Consumer specific binding
+ * properties. @see {@link RocketMQConsumerProperties}.
+ */
+ @Override
+ public RocketMQConsumerProperties getConsumer() {
+ return this.consumer;
+ }
+
+ public void setConsumer(RocketMQConsumerProperties consumer) {
+ this.consumer = consumer;
+ }
+
+ /**
+ * @return {@link RocketMQProducerProperties} Producer specific binding
+ * properties. @see {@link RocketMQProducerProperties}.
+ */
+ @Override
+ public RocketMQProducerProperties getProducer() {
+ return this.producer;
+ }
+
+ public void setProducer(RocketMQProducerProperties producer) {
+ this.producer = producer;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java
new file mode 100644
index 00000000..d4b62e5f
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.support;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Objects;
+
+import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
+import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst.Headers;
+import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
+import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.util.StringUtils;
+
+/**
+ *
+ * @author zkzlx
+ */
+public class RocketMQMessageConverterSupport {
+
+ private static final CompositeMessageConverter MESSAGE_CONVERTER = RocketMQBeanContainerCache
+ .getBean(RocketMQMessageConverter.DEFAULT_NAME,
+ CompositeMessageConverter.class,
+ new RocketMQMessageConverter().getMessageConverter());
+
+ public static Message convertMessage2Spring(MessageExt message) {
+ MessageBuilder messageBuilder = MessageBuilder.withPayload(message.getBody())
+ .setHeader(toRocketHeaderKey(Headers.KEYS), message.getKeys())
+ .setHeader(toRocketHeaderKey(Headers.TAGS), message.getTags())
+ .setHeader(toRocketHeaderKey(Headers.TOPIC), message.getTopic())
+ .setHeader(toRocketHeaderKey(Headers.MESSAGE_ID), message.getMsgId())
+ .setHeader(toRocketHeaderKey(Headers.BORN_TIMESTAMP),
+ message.getBornTimestamp())
+ .setHeader(toRocketHeaderKey(Headers.BORN_HOST),
+ message.getBornHostString())
+ .setHeader(toRocketHeaderKey(Headers.FLAG), message.getFlag())
+ .setHeader(toRocketHeaderKey(Headers.QUEUE_ID), message.getQueueId())
+ .setHeader(toRocketHeaderKey(Headers.SYS_FLAG), message.getSysFlag())
+ .setHeader(toRocketHeaderKey(Headers.TRANSACTION_ID),
+ message.getTransactionId());
+ addUserProperties(message.getProperties(), messageBuilder);
+ return messageBuilder.build();
+ }
+
+ public static String toRocketHeaderKey(String rawKey) {
+ return "ROCKET_" + rawKey;
+ }
+
+ private static void addUserProperties(Map properties,
+ MessageBuilder messageBuilder) {
+ if (!CollectionUtils.isEmpty(properties)) {
+ properties.forEach((key, val) -> {
+ if (!MessageConst.STRING_HASH_SET.contains(key)
+ && !MessageHeaders.ID.equals(key)
+ && !MessageHeaders.TIMESTAMP.equals(key)) {
+ messageBuilder.setHeader(key, val);
+ }
+ });
+ }
+ }
+
+ public static org.apache.rocketmq.common.message.Message convertMessage2MQ(
+ String destination, Message> source) {
+ Message> message = MESSAGE_CONVERTER.toMessage(source.getPayload(),
+ source.getHeaders());
+ assert message != null;
+ MessageBuilder> builder = MessageBuilder.fromMessage(message);
+ builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
+ message = builder.build();
+ return doConvert(destination, message);
+ }
+
+ private static org.apache.rocketmq.common.message.Message doConvert(String topic,
+ Message> message) {
+ Charset charset = Charset.defaultCharset();
+ Object payloadObj = message.getPayload();
+ byte[] payloads;
+ try {
+ if (payloadObj instanceof String) {
+ payloads = ((String) payloadObj).getBytes(charset);
+ }
+ else if (payloadObj instanceof byte[]) {
+ payloads = (byte[]) message.getPayload();
+ }
+ else {
+ String jsonObj = (String) MESSAGE_CONVERTER.fromMessage(message,
+ payloadObj.getClass());
+ if (null == jsonObj) {
+ throw new RuntimeException(String.format(
+ "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+ MESSAGE_CONVERTER.getClass(), payloadObj.getClass(),
+ payloadObj));
+ }
+ payloads = jsonObj.getBytes(charset);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("convert to RocketMQ message failed.", e);
+ }
+ return getAndWrapMessage(topic, message.getHeaders(), payloads);
+ }
+
+ private static org.apache.rocketmq.common.message.Message getAndWrapMessage(
+ String topic, MessageHeaders headers, byte[] payloads) {
+ if (topic == null || topic.length() < 1) {
+ return null;
+ }
+ if (payloads == null || payloads.length < 1) {
+ return null;
+ }
+ org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(
+ topic, payloads);
+ if (Objects.nonNull(headers) && !headers.isEmpty()) {
+ Object tag = headers.getOrDefault(Headers.TAGS,
+ headers.get(toRocketHeaderKey(Headers.TAGS)));
+ if (!StringUtils.isEmpty(tag)) {
+ rocketMsg.setTags(String.valueOf(tag));
+ }
+
+ Object keys = headers.getOrDefault(Headers.KEYS,
+ headers.get(toRocketHeaderKey(Headers.KEYS)));
+ if (!StringUtils.isEmpty(keys)) {
+ rocketMsg.setKeys(keys.toString());
+ }
+ Object flagObj = headers.getOrDefault(Headers.FLAG,
+ headers.get(toRocketHeaderKey(Headers.FLAG)));
+ int flag = 0;
+ int delayLevel = 0;
+ try {
+ flagObj = flagObj == null ? 0 : flagObj;
+ Object delayLevelObj = headers.getOrDefault(
+ RocketMQConst.PROPERTY_DELAY_TIME_LEVEL,
+ headers.get(toRocketHeaderKey(
+ RocketMQConst.PROPERTY_DELAY_TIME_LEVEL)));
+ delayLevelObj = delayLevelObj == null ? 0 : delayLevelObj;
+ delayLevel = Integer.parseInt(String.valueOf(delayLevelObj));
+ flag = Integer.parseInt(String.valueOf(flagObj));
+ }
+ catch (Exception ignored) {
+ }
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ rocketMsg.setFlag(flag);
+ Object waitStoreMsgOkObj = headers
+ .getOrDefault(RocketMQConst.PROPERTY_WAIT_STORE_MSG_OK, "true");
+ rocketMsg.setWaitStoreMsgOK(
+ Boolean.parseBoolean(String.valueOf(waitStoreMsgOkObj)));
+ headers.entrySet().stream()
+ .filter(entry -> !Objects.equals(entry.getKey(), Headers.FLAG))
+ .forEach(entry -> {
+ if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
+ rocketMsg.putUserProperty(entry.getKey(),
+ String.valueOf(entry.getValue()));
+ }
+ });
+
+ }
+ return rocketMsg;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java
new file mode 100644
index 00000000..3f789842
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.utils;
+
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQCommonProperties;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.RPCHook;
+
+import org.springframework.util.StringUtils;
+
+/**
+ * TODO Describe what it does
+ *
+ * @author Jim
+ */
+public class RocketMQUtils {
+
+ public static T mergeRocketMQProperties(
+ RocketMQBinderConfigurationProperties binderConfigurationProperties,
+ T mqProperties) {
+ if (null == binderConfigurationProperties || mqProperties == null) {
+ return mqProperties;
+ }
+ if (StringUtils.isEmpty(mqProperties.getNameServer())) {
+ mqProperties.setNameServer(binderConfigurationProperties.getNameServer());
+ }
+ if (StringUtils.isEmpty(mqProperties.getSecretKey())) {
+ mqProperties.setSecretKey(binderConfigurationProperties.getSecretKey());
+ }
+ if (StringUtils.isEmpty(mqProperties.getAccessKey())) {
+ mqProperties.setAccessKey(binderConfigurationProperties.getAccessKey());
+ }
+ if (StringUtils.isEmpty(mqProperties.getAccessChannel())) {
+ mqProperties
+ .setAccessChannel(binderConfigurationProperties.getAccessChannel());
+ }
+ if (StringUtils.isEmpty(mqProperties.getNamespace())) {
+ mqProperties.setNamespace(binderConfigurationProperties.getNamespace());
+ }
+ if (StringUtils.isEmpty(mqProperties.getGroup())) {
+ mqProperties.setGroup(binderConfigurationProperties.getGroup());
+ }
+ if (StringUtils.isEmpty(mqProperties.getCustomizedTraceTopic())) {
+ mqProperties.setCustomizedTraceTopic(
+ binderConfigurationProperties.getCustomizedTraceTopic());
+ }
+ mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer()));
+ return mqProperties;
+ }
+
+ public static String getInstanceName(RPCHook rpcHook, String identify) {
+ String separator = "|";
+ StringBuilder instanceName = new StringBuilder();
+ if (null != rpcHook) {
+ SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook)
+ .getSessionCredentials();
+ instanceName.append(sessionCredentials.getAccessKey()).append(separator)
+ .append(sessionCredentials.getSecretKey()).append(separator);
+ }
+ instanceName.append(identify).append(separator).append(UtilAll.getPid());
+ return instanceName.toString();
+ }
+
+ public static String getNameServerStr(String nameServer) {
+ if (StringUtils.isEmpty(nameServer)) {
+ return null;
+ }
+ return nameServer.replaceAll(",", ";");
+ }
+
+ private static final String SQL = "sql:";
+
+ public static MessageSelector getMessageSelector(String expression) {
+ if (StringUtils.hasText(expression) && expression.startsWith(SQL)) {
+ return MessageSelector.bySql(expression.replaceFirst(SQL, ""));
+ }
+ return MessageSelector.byTag(expression);
+ }
+
+}