diff --git a/pom.xml b/pom.xml
index 70dc9d4a..053e6e20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
spring-cloud-alibaba-sentinel-datasource
spring-cloud-alibaba-nacos-config
spring-cloud-alibaba-nacos-discovery
+ spring-cloud-stream-binder-rocketmq
spring-cloud-alibaba-examples
spring-cloud-alibaba-test
spring-cloud-alibaba-docs
diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml
index c89ae488..38c62d12 100644
--- a/spring-cloud-alibaba-dependencies/pom.xml
+++ b/spring-cloud-alibaba-dependencies/pom.xml
@@ -24,6 +24,8 @@
4.0.1
1.0.0
2.16.0
+ 4.3.1
+ 3.2.6
@@ -115,6 +117,11 @@
sentinel-dubbo-api
${project.version}
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
@@ -166,6 +173,11 @@
spring-cloud-alicloud-context
${project.version}
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rocketmq
+ ${project.version}
+
@@ -202,6 +214,20 @@
spring-cloud-starter-alicloud-acm
${project.version}
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rocketmq
+ ${project.version}
+
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ ${metrics.core}
+
+
diff --git a/spring-cloud-starter-alibaba/pom.xml b/spring-cloud-starter-alibaba/pom.xml
index d455bf87..09aa60bd 100644
--- a/spring-cloud-starter-alibaba/pom.xml
+++ b/spring-cloud-starter-alibaba/pom.xml
@@ -17,6 +17,7 @@
spring-cloud-starter-alibaba-nacos-config
spring-cloud-starter-alibaba-nacos-discovery
spring-cloud-starter-alibaba-sentinel
+ spring-cloud-starter-stream-rocketmq
diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml
new file mode 100644
index 00000000..d4f2eedd
--- /dev/null
+++ b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml
@@ -0,0 +1,20 @@
+
+ 4.0.0
+
+
+ org.springframework.cloud
+ spring-cloud-starter-alibaba
+ 0.1.1.BUILD-SNAPSHOT
+
+ spring-cloud-starter-stream-rocketmq
+ Spring Cloud Starter Stream RocketMQ
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rocketmq
+
+
+
+
diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml
new file mode 100644
index 00000000..4b534d4b
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-alibaba
+ 0.1.1.BUILD-SNAPSHOT
+
+ 4.0.0
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rocketmq
+ Spring Cloud Alibaba RocketMQ Binder
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-actuator
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
+
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
new file mode 100644
index 00000000..cb35b5a6
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq;
+
+/**
+ * @author Jim
+ */
+public interface RocketMQBinderConstants {
+
+ /**
+ * Header key
+ */
+ String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE";
+
+ String ROCKET_FLAG = "ROCKETMQ_FLAG";
+
+ String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
+
+ String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
+
+ /**
+ * Instrumentation key
+ */
+ String LASTSEND_TIMESTAMP = "lastSend.timestamp";
+
+ String ENDPOINT_ID = "rocketmq_binder";
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
new file mode 100644
index 00000000..4bed86af
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
+import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
+import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQMessageChannelBinder extends
+ AbstractMessageChannelBinder,
+ ExtendedProducerProperties, RocketMQTopicProvisioner>
+ implements ExtendedPropertiesBinder {
+
+ private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
+
+ private final RocketMQExtendedBindingProperties extendedBindingProperties;
+ private final RocketMQTopicProvisioner rocketTopicProvisioner;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+ private final InstrumentationManager instrumentationManager;
+ private final ConsumersManager consumersManager;
+
+ public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
+ RocketMQExtendedBindingProperties extendedBindingProperties,
+ RocketMQTopicProvisioner provisioningProvider,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ InstrumentationManager instrumentationManager) {
+ super(true, null, provisioningProvider);
+ this.consumersManager = consumersManager;
+ this.extendedBindingProperties = extendedBindingProperties;
+ this.rocketTopicProvisioner = provisioningProvider;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
+ ExtendedProducerProperties
+ producerProperties,
+ MessageChannel errorChannel) throws Exception {
+ if (producerProperties.getExtension().getEnabled()) {
+ return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
+ rocketBinderConfigurationProperties, instrumentationManager);
+ } else {
+ throw new RuntimeException(
+ "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
+ }
+ }
+
+ @Override
+ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
+ ExtendedConsumerProperties
+ consumerProperties)
+ throws Exception {
+ if (group == null || "".equals(group)) {
+ throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
+ }
+
+ RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
+ consumerProperties, destination.getName(), group, instrumentationManager);
+
+ ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
+ consumerProperties);
+ if (consumerProperties.getMaxAttempts() > 1) {
+ rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
+ rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
+ } else {
+ rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
+ }
+
+ return rocketInboundChannelAdapter;
+ }
+
+ @Override
+ public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
+ return extendedBindingProperties.getExtendedConsumerProperties(channelName);
+ }
+
+ @Override
+ public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
+ return extendedBindingProperties.getExtendedProducerProperties(channelName);
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
new file mode 100644
index 00000000..fa480681
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
+import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageHeaderAccessor;
+
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
+
+ public RocketMQMessageHeaderAccessor() {
+ super();
+ }
+
+ public RocketMQMessageHeaderAccessor(Message> message) {
+ super(message);
+ }
+
+ public Acknowledgement getAcknowledgement(Message message) {
+ return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
+ }
+
+ public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
+ setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
+ return this;
+ }
+
+ public String getTags() {
+ return getMessageHeaders().get(MessageConst.PROPERTY_TAGS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_TAGS);
+ }
+
+ public RocketMQMessageHeaderAccessor withTags(String tag) {
+ setHeader(MessageConst.PROPERTY_TAGS, tag);
+ return this;
+ }
+
+ public String getKeys() {
+ return getMessageHeaders().get(MessageConst.PROPERTY_KEYS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_KEYS);
+ }
+
+ public RocketMQMessageHeaderAccessor withKeys(String keys) {
+ setHeader(MessageConst.PROPERTY_KEYS, keys);
+ return this;
+ }
+
+ public MessageExt getRocketMessage() {
+ return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
+ }
+
+ public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
+ setHeader(ORIGINAL_ROCKET_MESSAGE, message);
+ return this;
+ }
+
+ public Integer getDelayTimeLevel() {
+ return NumberUtils.toInt((String)getMessageHeaders().get(MessageConst.PROPERTY_DELAY_TIME_LEVEL), 0);
+ }
+
+ public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
+ setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
+ return this;
+ }
+
+ public Integer getFlag() {
+ return NumberUtils.toInt((String)getMessageHeaders().get(ROCKET_FLAG), 0);
+ }
+
+ public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
+ setHeader(ROCKET_FLAG, delayTimeLevel);
+ return this;
+ }
+
+ public SendResult getSendResult() {
+ return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
+ }
+
+ public static void putSendResult(MutableMessage message, SendResult sendResult) {
+ message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
+ }
+
+ public Map getUserProperties() {
+ Map result = new HashMap<>();
+ for (Map.Entry entry : this.toMap().entrySet()) {
+ if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
+ .getKey().equals(MessageHeaders.CONTENT_TYPE)) {
+ result.put(entry.getKey(), (String)entry.getValue());
+ }
+ }
+ return result;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
new file mode 100644
index 00000000..756924be
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq.actuator;
+
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQBinderEndpoint extends AbstractEndpoint