diff --git a/pom.xml b/pom.xml
index 7f9f0f88..3fd389ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
4.0.1
+ 4.6.12.0.2
@@ -258,10 +259,20 @@
+
+
+
+
+
org.apache.rocketmq
- rocketmq-spring-boot-starter
- ${rocketmq.starter.version}
+ rocketmq-client
+ ${rocketmq.version}
+
+
+ org.apache.rocketmq
+ rocketmq-acl
+ ${rocketmq.version}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml
index ec44c9d7..ca5c2239 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml
@@ -50,9 +50,12 @@
org.apache.rocketmq
- rocketmq-spring-boot-starter
+ rocketmq-client
+
+
+ org.apache.rocketmq
+ rocketmq-acl
-
org.springframework.bootspring-boot-starter-test
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java
deleted file mode 100644
index c7daff0e..00000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq;
-
-import java.util.Arrays;
-import java.util.List;
-
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
-
-import org.springframework.util.CollectionUtils;
-import org.springframework.util.StringUtils;
-
-/**
- * @author Jim
- */
-public final class RocketMQBinderUtils {
-
- private RocketMQBinderUtils() {
-
- }
-
- public static RocketMQBinderConfigurationProperties mergeProperties(
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- RocketMQProperties rocketMQProperties) {
- RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties();
- if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) {
- result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
- }
- else {
- result.setNameServer(
- Arrays.asList(rocketMQProperties.getNameServer().split(";")));
- }
- if (rocketMQProperties.getProducer() == null
- || StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
- result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey());
- }
- else {
- result.setAccessKey(rocketMQProperties.getProducer().getAccessKey());
- }
- if (rocketMQProperties.getProducer() == null
- || StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) {
- result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey());
- }
- else {
- result.setSecretKey(rocketMQProperties.getProducer().getSecretKey());
- }
- if (rocketMQProperties.getProducer() == null || StringUtils
- .isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) {
- result.setCustomizedTraceTopic(
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- }
- else {
- result.setCustomizedTraceTopic(
- rocketMQProperties.getProducer().getCustomizedTraceTopic());
- }
- if (rocketMQProperties.getProducer() != null
- && rocketMQProperties.getProducer().isEnableMsgTrace()) {
- result.setEnableMsgTrace(Boolean.TRUE);
- }
- else {
- result.setEnableMsgTrace(
- rocketBinderConfigurationProperties.isEnableMsgTrace());
- }
- return result;
- }
-
- public static String getNameServerStr(List nameServerList) {
- if (CollectionUtils.isEmpty(nameServerList)) {
- return null;
- }
- return String.join(";", nameServerList);
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index e5a2e24b..5683bff6 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -1,11 +1,11 @@
/*
- * Copyright 2013-2018 the original author or authors.
+ * Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,34 +16,18 @@
package com.alibaba.cloud.stream.binder.rocketmq;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
+import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull.DefaultErrorAcknowledgeHandler;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull.RocketMQMessageSource;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.outbound.RocketMQProducerMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
-import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
-import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
-import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
-import org.apache.rocketmq.spring.support.RocketMQUtil;
+import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -55,15 +39,19 @@ import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
-import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.support.DefaultErrorMessageStrategy;
+import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
/**
+ * A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the
+ * underlying middleware.
+ *
* @author Jim
*/
public class RocketMQMessageChannelBinder extends
@@ -71,120 +59,44 @@ public class RocketMQMessageChannelBinder extends
implements
ExtendedPropertiesBinder {
- private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties();
+ private final RocketMQExtendedBindingProperties extendedBindingProperties;
+ private final RocketMQBinderConfigurationProperties binderConfigurationProperties;
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- private final RocketMQProperties rocketMQProperties;
-
- private final InstrumentationManager instrumentationManager;
-
- private Map topicInUse = new HashMap<>();
-
- public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
+ public RocketMQMessageChannelBinder(
+ RocketMQBinderConfigurationProperties binderConfigurationProperties,
RocketMQExtendedBindingProperties extendedBindingProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- RocketMQProperties rocketMQProperties,
- InstrumentationManager instrumentationManager) {
- super(null, provisioningProvider);
+ RocketMQTopicProvisioner provisioningProvider) {
+ super(new String[0], provisioningProvider);
this.extendedBindingProperties = extendedBindingProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.rocketMQProperties = rocketMQProperties;
- this.instrumentationManager = instrumentationManager;
+ this.binderConfigurationProperties = binderConfigurationProperties;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
- ExtendedProducerProperties producerProperties,
+ ExtendedProducerProperties extendedProducerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
- if (producerProperties.getExtension().getEnabled()) {
-
- // if producerGroup is empty, using destination
- String extendedProducerGroup = producerProperties.getExtension().getGroup();
- String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
- ? destination.getName() : extendedProducerGroup;
-
- RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
- .mergeProperties(rocketBinderConfigurationProperties,
- rocketMQProperties);
-
- RocketMQTemplate rocketMQTemplate;
- if (producerProperties.getExtension().getTransactional()) {
- Map rocketMQTemplates = getBeanFactory()
- .getBeansOfType(RocketMQTemplate.class);
- if (rocketMQTemplates.size() == 0) {
- throw new IllegalStateException(
- "there is no RocketMQTemplate in Spring BeanFactory");
- }
- else if (rocketMQTemplates.size() > 1) {
- throw new IllegalStateException(
- "there is more than 1 RocketMQTemplates in Spring BeanFactory");
- }
- rocketMQTemplate = rocketMQTemplates.values().iterator().next();
- }
- else {
- rocketMQTemplate = new RocketMQTemplate();
- rocketMQTemplate.setObjectMapper(this.getApplicationContext()
- .getBeansOfType(ObjectMapper.class).values().iterator().next());
- DefaultMQProducer producer;
- String ak = mergedProperties.getAccessKey();
- String sk = mergedProperties.getSecretKey();
- if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
- RPCHook rpcHook = new AclClientRPCHook(
- new SessionCredentials(ak, sk));
- producer = new DefaultMQProducer(producerGroup, rpcHook,
- mergedProperties.isEnableMsgTrace(),
- mergedProperties.getCustomizedTraceTopic());
- producer.setVipChannelEnabled(false);
- producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
- destination.getName() + "|" + UtilAll.getPid()));
- }
- else {
- producer = new DefaultMQProducer(producerGroup);
- producer.setVipChannelEnabled(
- producerProperties.getExtension().getVipChannelEnabled());
- }
- producer.setNamesrvAddr(RocketMQBinderUtils
- .getNameServerStr(mergedProperties.getNameServer()));
- producer.setSendMsgTimeout(
- producerProperties.getExtension().getSendMessageTimeout());
- producer.setRetryTimesWhenSendFailed(
- producerProperties.getExtension().getRetryTimesWhenSendFailed());
- producer.setRetryTimesWhenSendAsyncFailed(producerProperties
- .getExtension().getRetryTimesWhenSendAsyncFailed());
- producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
- .getCompressMessageBodyThreshold());
- producer.setRetryAnotherBrokerWhenNotStoreOK(
- producerProperties.getExtension().isRetryNextServer());
- producer.setMaxMessageSize(
- producerProperties.getExtension().getMaxMessageSize());
- rocketMQTemplate.setProducer(producer);
- if (producerProperties.isPartitioned()) {
- rocketMQTemplate
- .setMessageQueueSelector(new PartitionMessageQueueSelector());
- }
- }
-
- RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
- rocketMQTemplate, destination.getName(), producerGroup,
- producerProperties.getExtension().getTransactional(),
- instrumentationManager, producerProperties,
- ((AbstractMessageChannel) channel).getInterceptors().stream().filter(
- channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
- .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
- .findFirst().orElse(null));
- messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
- messageHandler.setSync(producerProperties.getExtension().getSync());
- messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
- if (errorChannel != null) {
- messageHandler.setSendFailureChannel(errorChannel);
- }
- return messageHandler;
- }
- else {
+ if (!extendedProducerProperties.getExtension().getEnabled()) {
throw new RuntimeException("Binding for channel " + destination.getName()
+ " has been disabled, message can't be delivered");
}
+ RocketMQProducerProperties mqProducerProperties = RocketMQUtils
+ .mergeRocketMQProperties(binderConfigurationProperties,
+ extendedProducerProperties.getExtension());
+ RocketMQProducerMessageHandler messageHandler = new RocketMQProducerMessageHandler(
+ destination, extendedProducerProperties, mqProducerProperties);
+ messageHandler.setApplicationContext(this.getApplicationContext());
+ if (errorChannel != null) {
+ messageHandler.setSendFailureChannel(errorChannel);
+ }
+ MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor = ((AbstractMessageChannel) channel)
+ .getInterceptors().stream()
+ .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
+ .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
+ .findFirst().orElse(null);
+ messageHandler.setPartitioningInterceptor(partitioningInterceptor);
+ messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
+ messageHandler.setErrorMessageStrategy(this.getErrorMessageStrategy());
+ return messageHandler;
}
@Override
@@ -198,56 +110,43 @@ public class RocketMQMessageChannelBinder extends
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
String group,
- ExtendedConsumerProperties consumerProperties)
+ ExtendedConsumerProperties extendedConsumerProperties)
throws Exception {
- if (group == null || "".equals(group)) {
+ // todo support anymous consumer
+ if (StringUtils.isEmpty(group)) {
throw new RuntimeException(
"'group must be configured for channel " + destination.getName());
}
+ RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
+ extendedConsumerProperties.getExtension());
+ extendedConsumerProperties.getExtension().setGroup(group);
- RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(
- consumerProperties, rocketBinderConfigurationProperties, this);
- listenerContainer.setConsumerGroup(group);
- listenerContainer.setTopic(destination.getName());
- listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
- listenerContainer.setSuspendCurrentQueueTimeMillis(
- consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());
- listenerContainer.setDelayLevelWhenNextConsume(
- consumerProperties.getExtension().getDelayLevelWhenNextConsume());
- listenerContainer
- .setNameServer(rocketBinderConfigurationProperties.getNameServer());
- listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
-
- RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
- listenerContainer, consumerProperties, instrumentationManager);
-
- topicInUse.put(destination.getName(), group);
-
+ RocketMQInboundChannelAdapter inboundChannelAdapter = new RocketMQInboundChannelAdapter(
+ destination.getName(), extendedConsumerProperties);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
- group, consumerProperties);
- if (consumerProperties.getMaxAttempts() > 1) {
- rocketInboundChannelAdapter
- .setRetryTemplate(buildRetryTemplate(consumerProperties));
- rocketInboundChannelAdapter
- .setRecoveryCallback(errorInfrastructure.getRecoverer());
+ group, extendedConsumerProperties);
+ if (extendedConsumerProperties.getMaxAttempts() > 1) {
+ inboundChannelAdapter
+ .setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
+ inboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else {
- rocketInboundChannelAdapter
- .setErrorChannel(errorInfrastructure.getErrorChannel());
+ inboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
-
- return rocketInboundChannelAdapter;
+ return inboundChannelAdapter;
}
@Override
protected PolledConsumerResources createPolledConsumerResources(String name,
String group, ConsumerDestination destination,
- ExtendedConsumerProperties consumerProperties) {
- RocketMQMessageSource rocketMQMessageSource = new RocketMQMessageSource(
- rocketBinderConfigurationProperties, consumerProperties, name, group);
- return new PolledConsumerResources(rocketMQMessageSource,
- registerErrorInfrastructure(destination, group, consumerProperties,
- true));
+ ExtendedConsumerProperties extendedConsumerProperties) {
+ RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
+ extendedConsumerProperties.getExtension());
+ extendedConsumerProperties.getExtension().setGroup(group);
+ RocketMQMessageSource messageSource = new RocketMQMessageSource(name,
+ extendedConsumerProperties);
+ return new PolledConsumerResources(messageSource, registerErrorInfrastructure(
+ destination, group, extendedConsumerProperties, true));
}
@Override
@@ -261,67 +160,47 @@ public class RocketMQMessageChannelBinder extends
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
- if (properties.getExtension().shouldRequeue()) {
- ack.acknowledge(Status.REQUEUE);
- }
- else {
- ack.acknowledge(Status.REJECT);
- }
+ ErrorAcknowledgeHandler handler = RocketMQBeanContainerCache.getBean(
+ properties.getExtension().getPull().getErrAcknowledge(),
+ ErrorAcknowledgeHandler.class,
+ new DefaultErrorAcknowledgeHandler());
+ ack.acknowledge(
+ handler.handler(((MessagingException) message.getPayload())
+ .getFailedMessage()));
}
}
};
}
+ /**
+ * Binders can return an {@link ErrorMessageStrategy} for building error messages;
+ * binder implementations typically might add extra headers to the error message.
+ *
+ * @return the implementation - may be null.
+ */
+ @Override
+ protected ErrorMessageStrategy getErrorMessageStrategy() {
+ // It can be extended to custom if necessary.
+ return new DefaultErrorMessageStrategy();
+ }
+
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
- return extendedBindingProperties.getExtendedConsumerProperties(channelName);
+ return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
- return extendedBindingProperties.getExtendedProducerProperties(channelName);
- }
-
- public Map getTopicInUse() {
- return topicInUse;
+ return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
public String getDefaultsPrefix() {
- return extendedBindingProperties.getDefaultsPrefix();
+ return this.extendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
- return extendedBindingProperties.getExtendedPropertiesEntryClass();
+ return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
-
- public void setExtendedBindingProperties(
- RocketMQExtendedBindingProperties extendedBindingProperties) {
- this.extendedBindingProperties = extendedBindingProperties;
- }
-
- private RocketMQHeaderMapper createHeaderMapper(
- final ExtendedConsumerProperties extendedConsumerProperties) {
- Set trustedPackages = extendedConsumerProperties.getExtension()
- .getTrustedPackages();
- return createHeaderMapper(trustedPackages);
- }
-
- private RocketMQHeaderMapper createHeaderMapper(
- final ExtendedProducerProperties producerProperties) {
- return createHeaderMapper(Collections.emptyList());
- }
-
- private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages) {
- ObjectMapper objectMapper = this.getApplicationContext()
- .getBeansOfType(ObjectMapper.class).values().iterator().next();
- JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(
- objectMapper);
- if (!StringUtils.isEmpty(trustedPackages)) {
- headerMapper.addTrustedPackages(trustedPackages);
- }
- return headerMapper;
- }
-
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
index 1c49359e..6e704250 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
@@ -19,7 +19,6 @@ package com.alibaba.cloud.stream.binder.rocketmq.actuator;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
@@ -29,23 +28,20 @@ import org.springframework.boot.actuate.health.Health;
*/
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
- @Autowired
- private InstrumentationManager instrumentationManager;
-
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
- if (instrumentationManager.getHealthInstrumentations().stream()
+ if (InstrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isUp)) {
builder.up();
return;
}
- if (instrumentationManager.getHealthInstrumentations().stream()
+ if (InstrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isOutOfService)) {
builder.outOfService();
return;
}
builder.down();
- instrumentationManager.getHealthInstrumentations().stream()
+ InstrumentationManager.getHealthInstrumentations().stream()
.filter(instrumentation -> !instrumentation.isStarted())
.forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException()));
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
index fb167c69..7bd875f3 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
@@ -1,465 +1,470 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.consuming;
-
-import java.util.List;
-import java.util.Objects;
-
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.MessageSelector;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.annotation.ConsumeMode;
-import org.apache.rocketmq.spring.annotation.MessageModel;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
-import org.apache.rocketmq.spring.support.RocketMQUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
-import org.springframework.context.SmartLifecycle;
-import org.springframework.integration.support.MessageBuilder;
-import org.springframework.messaging.Message;
-import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
-
-import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
-
-/**
- * A class that Listen on rocketmq message.
- *
- * this class will delegate {@link RocketMQListener} to handle message
- *
- * @author Jim
- * @author Xiejiashuai
- * @see RocketMQListener
- */
-public class RocketMQListenerBindingContainer
- implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
-
- private final static Logger log = LoggerFactory
- .getLogger(RocketMQListenerBindingContainer.class);
-
- private long suspendCurrentQueueTimeMillis = 1000;
-
- /**
- * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency.
- */
- private int delayLevelWhenNextConsume = 0;
-
- private List nameServer;
-
- private String consumerGroup;
-
- private String topic;
-
- private int consumeThreadMax = 64;
-
- private String charset = "UTF-8";
-
- private RocketMQListener rocketMQListener;
-
- private RocketMQHeaderMapper headerMapper;
-
- private DefaultMQPushConsumer consumer;
-
- private boolean running;
-
- private final ExtendedConsumerProperties rocketMQConsumerProperties;
-
- private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
-
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- // The following properties came from RocketMQConsumerProperties.
- private ConsumeMode consumeMode;
-
- private SelectorType selectorType;
-
- private String selectorExpression;
-
- private MessageModel messageModel;
-
- public RocketMQListenerBindingContainer(
- ExtendedConsumerProperties rocketMQConsumerProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
- this.rocketMQConsumerProperties = rocketMQConsumerProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
- this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
- ? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
- if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
- this.selectorType = SelectorType.TAG;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
- }
- else {
- this.selectorType = SelectorType.SQL92;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
- }
- this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
- ? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
- }
-
- @Override
- public void setupMessageListener(RocketMQListener> rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- @Override
- public void destroy() throws Exception {
- this.setRunning(false);
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- log.info("container destroyed, {}", this.toString());
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- initRocketMQPushConsumer();
- }
-
- @Override
- public boolean isAutoStartup() {
- return true;
- }
-
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
-
- @Override
- public void start() {
- if (this.isRunning()) {
- throw new IllegalStateException(
- "container already running. " + this.toString());
- }
-
- try {
- consumer.start();
- }
- catch (MQClientException e) {
- throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
- }
- this.setRunning(true);
-
- log.info("running container: {}", this.toString());
- }
-
- @Override
- public void stop() {
- if (this.isRunning()) {
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- setRunning(false);
- }
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- private void setRunning(boolean running) {
- this.running = running;
- }
-
- @Override
- public int getPhase() {
- return Integer.MAX_VALUE;
- }
-
- private void initRocketMQPushConsumer() throws MQClientException {
- Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
- Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
- Assert.notNull(nameServer, "Property 'nameServer' is required");
- Assert.notNull(topic, "Property 'topic' is required");
-
- String ak = rocketBinderConfigurationProperties.getAccessKey();
- String sk = rocketBinderConfigurationProperties.getSecretKey();
- if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
- RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
- consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
- new AllocateMessageQueueAveragely(),
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
- topic + "|" + UtilAll.getPid()));
- consumer.setVipChannelEnabled(false);
- }
- else {
- consumer = new DefaultMQPushConsumer(consumerGroup,
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- }
-
- consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
- consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
- consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
-
- switch (messageModel) {
- case BROADCASTING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
- break;
- case CLUSTERING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
- break;
- default:
- throw new IllegalArgumentException("Property 'messageModel' was wrong.");
- }
-
- switch (selectorType) {
- case TAG:
- consumer.subscribe(topic, selectorExpression);
- break;
- case SQL92:
- consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
- break;
- default:
- throw new IllegalArgumentException("Property 'selectorType' was wrong.");
- }
-
- switch (consumeMode) {
- case ORDERLY:
- consumer.setMessageListener(new DefaultMessageListenerOrderly());
- break;
- case CONCURRENTLY:
- consumer.setMessageListener(new DefaultMessageListenerConcurrently());
- break;
- default:
- throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
- }
-
- if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
- .prepareStart(consumer);
- }
-
- }
-
- @Override
- public String toString() {
- return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
- + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
- + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
- + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
- + messageModel + '}';
- }
-
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
-
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
-
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
- }
-
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
- }
-
- public List getNameServer() {
- return nameServer;
- }
-
- public void setNameServer(List nameServer) {
- this.nameServer = nameServer;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getConsumeThreadMax() {
- return consumeThreadMax;
- }
-
- public void setConsumeThreadMax(int consumeThreadMax) {
- this.consumeThreadMax = consumeThreadMax;
- }
-
- public String getCharset() {
- return charset;
- }
-
- public void setCharset(String charset) {
- this.charset = charset;
- }
-
- public RocketMQListener getRocketMQListener() {
- return rocketMQListener;
- }
-
- public void setRocketMQListener(RocketMQListener rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(DefaultMQPushConsumer consumer) {
- this.consumer = consumer;
- }
-
- public ExtendedConsumerProperties getRocketMQConsumerProperties() {
- return rocketMQConsumerProperties;
- }
-
- public ConsumeMode getConsumeMode() {
- return consumeMode;
- }
-
- public SelectorType getSelectorType() {
- return selectorType;
- }
-
- public String getSelectorExpression() {
- return selectorExpression;
- }
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
- public RocketMQHeaderMapper getHeaderMapper() {
- return headerMapper;
- }
-
- public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
- this.headerMapper = headerMapper;
- }
-
- /**
- * Convert rocketmq {@link MessageExt} to Spring {@link Message}.
- * @param messageExt the rocketmq message
- * @return the converted Spring {@link Message}
- */
- @SuppressWarnings("unchecked")
- private Message convertToSpringMessage(MessageExt messageExt) {
-
- // add reconsume-times header to messageExt
- int reconsumeTimes = messageExt.getReconsumeTimes();
- messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
- String.valueOf(reconsumeTimes));
- Message message = RocketMQUtil.convertToSpringMessage(messageExt);
- return MessageBuilder.fromMessage(message)
- .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
- }
-
- public class DefaultMessageListenerConcurrently
- implements MessageListenerConcurrently {
-
- @SuppressWarnings({ "unchecked", "Duplicates" })
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} message key:[{}] cost: {} ms",
- messageExt.getMsgId(), messageExt.getKeys(), costTime);
- }
- catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- }
-
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
- @SuppressWarnings({ "unchecked", "Duplicates" })
- @Override
- public ConsumeOrderlyStatus consumeMessage(List msgs,
- ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.info("consume {} message key:[{}] cost: {} ms",
- messageExt.getMsgId(), messageExt.getKeys(), costTime);
- }
- catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(
- suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
-
- }
-
-}
+/// *
+// * Copyright 2013-2018 the original author or authors.
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// * https://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+// package com.alibaba.cloud.stream.binder.rocketmq.consuming;
+//
+// import java.util.List;
+// import java.util.Objects;
+//
+// import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
+// import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
+// import
+/// com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+// import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+// import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
+// import org.apache.rocketmq.acl.common.AclClientRPCHook;
+// import org.apache.rocketmq.acl.common.SessionCredentials;
+// import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+// import org.apache.rocketmq.client.consumer.MessageSelector;
+// import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+// import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+// import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+// import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+// import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+// import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+// import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+// import org.apache.rocketmq.client.exception.MQClientException;
+// import org.apache.rocketmq.common.UtilAll;
+// import org.apache.rocketmq.common.message.MessageExt;
+// import org.apache.rocketmq.remoting.RPCHook;
+// import org.apache.rocketmq.spring.annotation.ConsumeMode;
+// import org.apache.rocketmq.spring.annotation.MessageModel;
+// import org.apache.rocketmq.spring.annotation.SelectorType;
+// import org.apache.rocketmq.spring.core.RocketMQListener;
+// import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+// import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
+// import org.apache.rocketmq.spring.support.RocketMQUtil;
+// import org.slf4j.Logger;
+// import org.slf4j.LoggerFactory;
+//
+// import org.springframework.beans.factory.InitializingBean;
+// import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+// import org.springframework.context.SmartLifecycle;
+// import org.springframework.integration.support.MessageBuilder;
+// import org.springframework.messaging.Message;
+// import org.springframework.util.Assert;
+// import org.springframework.util.StringUtils;
+//
+// import static
+/// com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
+//
+/// **
+// * A class that Listen on rocketmq message.
+// *
+// * this class will delegate {@link RocketMQListener} to handle message
+// *
+// * @author Jim
+// * @author Xiejiashuai
+// * @see RocketMQListener
+// */
+// public class RocketMQListenerBindingContainer
+// implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
+//
+// private final static Logger log = LoggerFactory
+// .getLogger(RocketMQListenerBindingContainer.class);
+//
+// private long suspendCurrentQueueTimeMillis = 1000;
+//
+// /**
+// * Message consume retry strategy
+// * -1,no retry,put into DLQ directly
+// * 0,broker control retry frequency
+// * >0,client control retry frequency.
+// */
+// private int delayLevelWhenNextConsume = 0;
+//
+// private List nameServer;
+//
+// private String consumerGroup;
+//
+// private String topic;
+//
+// private int consumeThreadMax = 64;
+//
+// private String charset = "UTF-8";
+//
+// private RocketMQListener rocketMQListener;
+//
+// private RocketMQHeaderMapper headerMapper;
+//
+// private DefaultMQPushConsumer consumer;
+//
+// private boolean running;
+//
+// private final ExtendedConsumerProperties
+/// rocketMQConsumerProperties;
+//
+// private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
+//
+// private final RocketMQBinderConfigurationProperties
+/// rocketBinderConfigurationProperties;
+//
+// // The following properties came from RocketMQConsumerProperties.
+// private ConsumeMode consumeMode;
+//
+// private SelectorType selectorType;
+//
+// private String selectorExpression;
+//
+// private MessageModel messageModel;
+//
+// public RocketMQListenerBindingContainer(
+// ExtendedConsumerProperties rocketMQConsumerProperties,
+// RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+// RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
+// this.rocketMQConsumerProperties = rocketMQConsumerProperties;
+// this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+// this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
+// this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
+// ? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
+// if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
+// this.selectorType = SelectorType.TAG;
+// this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
+// }
+// else {
+// this.selectorType = SelectorType.SQL92;
+// this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
+// }
+// this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
+// ? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
+// }
+//
+// @Override
+// public void setupMessageListener(RocketMQListener> rocketMQListener) {
+// this.rocketMQListener = rocketMQListener;
+// }
+//
+// @Override
+// public void destroy() throws Exception {
+// this.setRunning(false);
+// if (Objects.nonNull(consumer)) {
+// consumer.shutdown();
+// }
+// log.info("container destroyed, {}", this.toString());
+// }
+//
+// @Override
+// public void afterPropertiesSet() throws Exception {
+// initRocketMQPushConsumer();
+// }
+//
+// @Override
+// public boolean isAutoStartup() {
+// return true;
+// }
+//
+// @Override
+// public void stop(Runnable callback) {
+// stop();
+// callback.run();
+// }
+//
+// @Override
+// public void start() {
+// if (this.isRunning()) {
+// throw new IllegalStateException(
+// "container already running. " + this.toString());
+// }
+//
+// try {
+// consumer.start();
+// }
+// catch (MQClientException e) {
+// throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
+// }
+// this.setRunning(true);
+//
+// log.info("running container: {}", this.toString());
+// }
+//
+// @Override
+// public void stop() {
+// if (this.isRunning()) {
+// if (Objects.nonNull(consumer)) {
+// consumer.shutdown();
+// }
+// setRunning(false);
+// }
+// }
+//
+// @Override
+// public boolean isRunning() {
+// return running;
+// }
+//
+// private void setRunning(boolean running) {
+// this.running = running;
+// }
+//
+// @Override
+// public int getPhase() {
+// return Integer.MAX_VALUE;
+// }
+//
+// private void initRocketMQPushConsumer() throws MQClientException {
+// Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
+// Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
+// Assert.notNull(nameServer, "Property 'nameServer' is required");
+// Assert.notNull(topic, "Property 'topic' is required");
+//
+// String ak = rocketBinderConfigurationProperties.getAccessKey();
+// String sk = rocketBinderConfigurationProperties.getSecretKey();
+// if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+// RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
+// consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
+// new AllocateMessageQueueAveragely(),
+// rocketBinderConfigurationProperties.isEnableMsgTrace(),
+// rocketBinderConfigurationProperties.getCustomizedTraceTopic());
+// consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
+// topic + "|" + UtilAll.getPid()));
+// consumer.setVipChannelEnabled(false);
+// }
+// else {
+// consumer = new DefaultMQPushConsumer(consumerGroup,
+// rocketBinderConfigurationProperties.isEnableMsgTrace(),
+// rocketBinderConfigurationProperties.getCustomizedTraceTopic());
+// }
+//
+// consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
+// consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
+// consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
+//
+// switch (messageModel) {
+// case BROADCASTING:
+// consumer.setMessageModel(
+// org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+// break;
+// case CLUSTERING:
+// consumer.setMessageModel(
+// org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+// break;
+// default:
+// throw new IllegalArgumentException("Property 'messageModel' was wrong.");
+// }
+//
+// switch (selectorType) {
+// case TAG:
+// consumer.subscribe(topic, selectorExpression);
+// break;
+// case SQL92:
+// consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
+// break;
+// default:
+// throw new IllegalArgumentException("Property 'selectorType' was wrong.");
+// }
+//
+// switch (consumeMode) {
+// case ORDERLY:
+// consumer.setMessageListener(new DefaultMessageListenerOrderly());
+// break;
+// case CONCURRENTLY:
+// consumer.setMessageListener(new DefaultMessageListenerConcurrently());
+// break;
+// default:
+// throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
+// }
+//
+// if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
+// ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
+// .prepareStart(consumer);
+// }
+//
+// }
+//
+// @Override
+// public String toString() {
+// return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+// + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
+// + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
+// + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+// + messageModel + '}';
+// }
+//
+// public long getSuspendCurrentQueueTimeMillis() {
+// return suspendCurrentQueueTimeMillis;
+// }
+//
+// public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
+// this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+// }
+//
+// public int getDelayLevelWhenNextConsume() {
+// return delayLevelWhenNextConsume;
+// }
+//
+// public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+// this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+// }
+//
+// public List getNameServer() {
+// return nameServer;
+// }
+//
+// public void setNameServer(List nameServer) {
+// this.nameServer = nameServer;
+// }
+//
+// public String getConsumerGroup() {
+// return consumerGroup;
+// }
+//
+// public void setConsumerGroup(String consumerGroup) {
+// this.consumerGroup = consumerGroup;
+// }
+//
+// public String getTopic() {
+// return topic;
+// }
+//
+// public void setTopic(String topic) {
+// this.topic = topic;
+// }
+//
+// public int getConsumeThreadMax() {
+// return consumeThreadMax;
+// }
+//
+// public void setConsumeThreadMax(int consumeThreadMax) {
+// this.consumeThreadMax = consumeThreadMax;
+// }
+//
+// public String getCharset() {
+// return charset;
+// }
+//
+// public void setCharset(String charset) {
+// this.charset = charset;
+// }
+//
+// public RocketMQListener getRocketMQListener() {
+// return rocketMQListener;
+// }
+//
+// public void setRocketMQListener(RocketMQListener rocketMQListener) {
+// this.rocketMQListener = rocketMQListener;
+// }
+//
+// public DefaultMQPushConsumer getConsumer() {
+// return consumer;
+// }
+//
+// public void setConsumer(DefaultMQPushConsumer consumer) {
+// this.consumer = consumer;
+// }
+//
+// public ExtendedConsumerProperties
+/// getRocketMQConsumerProperties() {
+// return rocketMQConsumerProperties;
+// }
+//
+// public ConsumeMode getConsumeMode() {
+// return consumeMode;
+// }
+//
+// public SelectorType getSelectorType() {
+// return selectorType;
+// }
+//
+// public String getSelectorExpression() {
+// return selectorExpression;
+// }
+//
+// public MessageModel getMessageModel() {
+// return messageModel;
+// }
+//
+// public RocketMQHeaderMapper getHeaderMapper() {
+// return headerMapper;
+// }
+//
+// public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
+// this.headerMapper = headerMapper;
+// }
+//
+// /**
+// * Convert rocketmq {@link MessageExt} to Spring {@link Message}.
+// * @param messageExt the rocketmq message
+// * @return the converted Spring {@link Message}
+// */
+// @SuppressWarnings("unchecked")
+// private Message convertToSpringMessage(MessageExt messageExt) {
+//
+// // add reconsume-times header to messageExt
+// int reconsumeTimes = messageExt.getReconsumeTimes();
+// messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
+// String.valueOf(reconsumeTimes));
+// Message message = RocketMQUtil.convertToSpringMessage(messageExt);
+// return MessageBuilder.fromMessage(message)
+// .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
+// }
+//
+// public class DefaultMessageListenerConcurrently
+// implements MessageListenerConcurrently {
+//
+// @SuppressWarnings({ "unchecked", "Duplicates" })
+// @Override
+// public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+// ConsumeConcurrentlyContext context) {
+// for (MessageExt messageExt : msgs) {
+// log.debug("received msg: {}", messageExt);
+// try {
+// long now = System.currentTimeMillis();
+// rocketMQListener.onMessage(convertToSpringMessage(messageExt));
+// long costTime = System.currentTimeMillis() - now;
+// log.debug("consume {} message key:[{}] cost: {} ms",
+// messageExt.getMsgId(), messageExt.getKeys(), costTime);
+// }
+// catch (Exception e) {
+// log.warn("consume message failed. messageExt:{}", messageExt, e);
+// context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+// }
+// }
+//
+// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+// }
+//
+// }
+//
+// public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+//
+// @SuppressWarnings({ "unchecked", "Duplicates" })
+// @Override
+// public ConsumeOrderlyStatus consumeMessage(List msgs,
+// ConsumeOrderlyContext context) {
+// for (MessageExt messageExt : msgs) {
+// log.debug("received msg: {}", messageExt);
+// try {
+// long now = System.currentTimeMillis();
+// rocketMQListener.onMessage(convertToSpringMessage(messageExt));
+// long costTime = System.currentTimeMillis() - now;
+// log.info("consume {} message key:[{}] cost: {} ms",
+// messageExt.getMsgId(), messageExt.getKeys(), costTime);
+// }
+// catch (Exception e) {
+// log.warn("consume message failed. messageExt:{}", messageExt, e);
+// context.setSuspendCurrentQueueTimeMillis(
+// suspendCurrentQueueTimeMillis);
+// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+// }
+// }
+//
+// return ConsumeOrderlyStatus.SUCCESS;
+// }
+//
+// }
+//
+// }
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
deleted file mode 100644
index a3b68041..00000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.integration;
-
-import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
-import org.springframework.integration.endpoint.MessageProducerSupport;
-import org.springframework.integration.support.MessageBuilder;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessagingException;
-import org.springframework.retry.RecoveryCallback;
-import org.springframework.retry.RetryCallback;
-import org.springframework.retry.RetryContext;
-import org.springframework.retry.RetryListener;
-import org.springframework.retry.support.RetryTemplate;
-import org.springframework.util.Assert;
-
-/**
- * @author Jim
- */
-public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
-
- private static final Logger log = LoggerFactory
- .getLogger(RocketMQInboundChannelAdapter.class);
-
- private RetryTemplate retryTemplate;
-
- private RecoveryCallback extends Object> recoveryCallback;
-
- private RocketMQListenerBindingContainer rocketMQListenerContainer;
-
- private final ExtendedConsumerProperties consumerProperties;
-
- private final InstrumentationManager instrumentationManager;
-
- public RocketMQInboundChannelAdapter(
- RocketMQListenerBindingContainer rocketMQListenerContainer,
- ExtendedConsumerProperties consumerProperties,
- InstrumentationManager instrumentationManager) {
- this.rocketMQListenerContainer = rocketMQListenerContainer;
- this.consumerProperties = consumerProperties;
- this.instrumentationManager = instrumentationManager;
- }
-
- @Override
- protected void onInit() {
- if (consumerProperties == null
- || !consumerProperties.getExtension().getEnabled()) {
- return;
- }
- super.onInit();
- if (this.retryTemplate != null) {
- Assert.state(getErrorChannel() == null,
- "Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
- + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
- + "send an error message when retries are exhausted");
- }
-
- BindingRocketMQListener listener = new BindingRocketMQListener();
- rocketMQListenerContainer.setRocketMQListener(listener);
-
- if (retryTemplate != null) {
- this.retryTemplate.registerListener(listener);
- }
-
- try {
- rocketMQListenerContainer.afterPropertiesSet();
-
- }
- catch (Exception e) {
- log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
- throw new IllegalArgumentException(
- "rocketMQListenerContainer init error: " + e.getMessage(), e);
- }
-
- instrumentationManager.addHealthInstrumentation(
- new Instrumentation(rocketMQListenerContainer.getTopic()
- + rocketMQListenerContainer.getConsumerGroup()));
- }
-
- @Override
- protected void doStart() {
- if (consumerProperties == null
- || !consumerProperties.getExtension().getEnabled()) {
- return;
- }
- try {
- rocketMQListenerContainer.start();
- instrumentationManager
- .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
- + rocketMQListenerContainer.getConsumerGroup())
- .markStartedSuccessfully();
- }
- catch (Exception e) {
- instrumentationManager
- .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
- + rocketMQListenerContainer.getConsumerGroup())
- .markStartFailed(e);
- log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
- throw new MessagingException(MessageBuilder.withPayload(
- "RocketMQTemplate startup failed, Caused by " + e.getMessage())
- .build(), e);
- }
- }
-
- @Override
- protected void doStop() {
- rocketMQListenerContainer.stop();
- }
-
- public void setRetryTemplate(RetryTemplate retryTemplate) {
- this.retryTemplate = retryTemplate;
- }
-
- public void setRecoveryCallback(RecoveryCallback extends Object> recoveryCallback) {
- this.recoveryCallback = recoveryCallback;
- }
-
- protected class BindingRocketMQListener
- implements RocketMQListener, RetryListener {
-
- @Override
- public void onMessage(Message message) {
- boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
- if (enableRetry) {
- RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
- RocketMQInboundChannelAdapter.this.sendMessage(message);
- return null;
- }, (RecoveryCallback
+ *
+ * This field defaults to clustering.
*/
- private String tags;
+ private String messageModel = MessageModel.CLUSTERING.getModeCN();
/**
- * {@link MQPushConsumer#subscribe(String, MessageSelector)}
- * {@link MessageSelector#bySql(String)}.
+ * Queue allocation algorithm specifying how message queues are allocated to each
+ * consumer clients.
*/
- private String sql;
+ private String allocateMessageQueueStrategy;
/**
- * {@link MessageModel#BROADCASTING}.
+ * The expressions include tags or SQL,as follow:
+ *
+ * tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+ *
+ * Determines whether there are specific characters "{@code ||}" in the expression to
+ * determine how the message is filtered,tags or SQL.
*/
- private Boolean broadcasting = false;
+ private String subscription;
/**
- * if orderly is true, using {@link MessageListenerOrderly} else if orderly if false,
- * using {@link MessageListenerConcurrently}.
+ * Delay some time when exception occur
*/
- private Boolean orderly = false;
+ private long pullTimeDelayMillsWhenException = 1000;
/**
- * for concurrently listener. message consume retry strategy. see
- * {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or
- * discard, see {@link this#shouldRequeue}), others means requeue.
+ * Consuming point on consumer booting.
+ *
+ *
+ * There are three consuming points:
+ *
+ *
CONSUME_FROM_LAST_OFFSET: consumer clients pick up where it
+ * stopped previously. If it were a newly booting up consumer client, according aging
+ * of the consumer group, there are two cases:
+ *
+ *
if the consumer group is created so recently that the earliest message being
+ * subscribed has yet expired, which means the consumer group represents a lately
+ * launched business, consuming will start from the very beginning;
+ *
if the earliest message being subscribed has expired, consuming will start from
+ * the latest messages, meaning messages born prior to the booting timestamp would be
+ * ignored.
+ *
+ *
+ *
CONSUME_FROM_FIRST_OFFSET: Consumer client will start from
+ * earliest messages available.
+ *
CONSUME_FROM_TIMESTAMP: Consumer client will start from specified
+ * timestamp, which means messages born prior to {@link #consumeTimestamp} will be
+ * ignored
+ *
*/
- private int delayLevelWhenNextConsume = 0;
+ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+ /**
+ * Backtracking consumption time with second precision. Time format is
+ * 20131223171201
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013 year
+ * Default backtracking consumption time Half an hour ago.
+ */
+ private String consumeTimestamp = UtilAll
+ .timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
- * for orderly listener. next retry delay time.
+ * Flow control threshold on queue level, each message queue will cache at most 1000
+ * messages by default, Consider the {@link #pullBatchSize}, the instantaneous value
+ * may exceed the limit
*/
- private long suspendCurrentQueueTimeMillis = 1000;
-
- private Boolean enabled = true;
+ private int pullThresholdForQueue = 1000;
+ /**
+ * Limit the cached message size on queue level, each message queue will cache at most
+ * 100 MiB messages by default, Consider the {@link #pullBatchSize}, the instantaneous
+ * value may exceed the limit
+ *
+ *
+ * The size of a message only measured by message body, so it's not accurate
+ */
+ private int pullThresholdSizeForQueue = 100;
/**
- * {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}.
+ * Maximum number of messages pulled each time.
*/
- private Set trustedPackages;
+ private int pullBatchSize = 10;
- // ------------ For Pull Consumer ------------
+ /**
+ * Consume max span offset.it has no effect on sequential consumption.
+ */
+ private int consumeMaxSpan = 2000;
- private long pullTimeout = 10 * 1000;
+ private Push push = new Push();
+ private Pull pull = new Pull();
- private boolean fromStore;
-
- // ------------ For Pull Consumer ------------
-
- public String getTags() {
- return tags;
+ public String getMessageModel() {
+ return messageModel;
}
- public void setTags(String tags) {
- this.tags = tags;
+ public RocketMQConsumerProperties setMessageModel(String messageModel) {
+ this.messageModel = messageModel;
+ return this;
}
- public String getSql() {
- return sql;
+ public String getAllocateMessageQueueStrategy() {
+ return allocateMessageQueueStrategy;
}
- public void setSql(String sql) {
- this.sql = sql;
+ public void setAllocateMessageQueueStrategy(String allocateMessageQueueStrategy) {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
- public Boolean getOrderly() {
- return orderly;
+ public String getSubscription() {
+ return subscription;
}
- public void setOrderly(Boolean orderly) {
- this.orderly = orderly;
+ public void setSubscription(String subscription) {
+ this.subscription = subscription;
}
- public Boolean getEnabled() {
- return enabled;
+ public Push getPush() {
+ return push;
}
- public void setEnabled(Boolean enabled) {
- this.enabled = enabled;
+ public void setPush(Push push) {
+ this.push = push;
}
- public Boolean getBroadcasting() {
- return broadcasting;
+ public long getPullTimeDelayMillsWhenException() {
+ return pullTimeDelayMillsWhenException;
}
- public void setBroadcasting(Boolean broadcasting) {
- this.broadcasting = broadcasting;
+ public RocketMQConsumerProperties setPullTimeDelayMillsWhenException(
+ long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ return this;
}
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
}
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ public RocketMQConsumerProperties setConsumeFromWhere(
+ ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ return this;
}
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
+ public String getConsumeTimestamp() {
+ return consumeTimestamp;
}
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ public RocketMQConsumerProperties setConsumeTimestamp(String consumeTimestamp) {
+ this.consumeTimestamp = consumeTimestamp;
+ return this;
}
- public long getPullTimeout() {
- return pullTimeout;
+ public int getPullThresholdForQueue() {
+ return pullThresholdForQueue;
}
- public void setPullTimeout(long pullTimeout) {
- this.pullTimeout = pullTimeout;
+ public RocketMQConsumerProperties setPullThresholdForQueue(
+ int pullThresholdForQueue) {
+ this.pullThresholdForQueue = pullThresholdForQueue;
+ return this;
}
- public boolean isFromStore() {
- return fromStore;
+ public int getPullThresholdSizeForQueue() {
+ return pullThresholdSizeForQueue;
}
- public void setFromStore(boolean fromStore) {
- this.fromStore = fromStore;
+ public RocketMQConsumerProperties setPullThresholdSizeForQueue(
+ int pullThresholdSizeForQueue) {
+ this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+ return this;
}
- public boolean shouldRequeue() {
- return delayLevelWhenNextConsume != -1;
+ public int getPullBatchSize() {
+ return pullBatchSize;
}
- public Set getTrustedPackages() {
- return trustedPackages;
+ public RocketMQConsumerProperties setPullBatchSize(int pullBatchSize) {
+ this.pullBatchSize = pullBatchSize;
+ return this;
}
- public void setTrustedPackages(Set trustedPackages) {
- this.trustedPackages = trustedPackages;
+ public Pull getPull() {
+ return pull;
+ }
+
+ public RocketMQConsumerProperties setPull(Pull pull) {
+ this.pull = pull;
+ return this;
+ }
+
+ public int getConsumeMaxSpan() {
+ return consumeMaxSpan;
+ }
+
+ public RocketMQConsumerProperties setConsumeMaxSpan(int consumeMaxSpan) {
+ this.consumeMaxSpan = consumeMaxSpan;
+ return this;
+ }
+
+ public static class Push implements Serializable {
+ private static final long serialVersionUID = -7398468554978817630L;
+
+ /**
+ * if orderly is true, using {@link MessageListenerOrderly} else if orderly if
+ * false, using {@link MessageListenerConcurrently}.
+ */
+ private boolean orderly = false;
+ /**
+ * Suspending pulling time for cases requiring slow pulling like flow-control
+ * scenario. see{@link ConsumeMessageOrderlyService#processConsumeResult}.
+ * see{@link ConsumeOrderlyContext#getSuspendCurrentQueueTimeMillis}.
+ */
+ private int suspendCurrentQueueTimeMillis = 1000;
+
+ /**
+ * https://github.com/alibaba/spring-cloud-alibaba/issues/1866 Max re-consume
+ * times. -1 means 16 times.
+ *
+ * If messages are re-consumed more than {@link #maxReconsumeTimes} before
+ * success, it's be directed to a deletion queue waiting.
+ */
+ private int maxReconsumeTimes;
+
+ /**
+ * for concurrently listener. message consume retry strategy. -1 means dlq(or
+ * discard. see {@link ConsumeMessageConcurrentlyService#processConsumeResult}.
+ * see {@link ConsumeConcurrentlyContext#getDelayLevelWhenNextConsume}.
+ */
+ private int delayLevelWhenNextConsume = 0;
+
+ /**
+ * Flow control threshold on topic level, default value is -1(Unlimited)
+ *
+ * The value of {@code pullThresholdForQueue} will be overwrote and calculated
+ * based on {@code pullThresholdForTopic} if it is't unlimited
+ *
+ * For example, if the value of pullThresholdForTopic is 1000 and 10 message
+ * queues are assigned to this consumer, then pullThresholdForQueue will be set to
+ * 100.
+ */
+ private int pullThresholdForTopic = -1;
+
+ /**
+ * Limit the cached message size on topic level, default value is -1
+ * MiB(Unlimited)
+ *
+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated
+ * based on {@code pullThresholdSizeForTopic} if it is't unlimited
+ *
+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10
+ * message queues are assigned to this consumer, then pullThresholdSizeForQueue
+ * will be set to 100 MiB
+ */
+ private int pullThresholdSizeForTopic = -1;
+
+ /**
+ * Message pull Interval
+ */
+ private long pullInterval = 0;
+
+ /**
+ * Batch consumption size
+ */
+ private int consumeMessageBatchMaxSize = 1;
+
+ public boolean getOrderly() {
+ return orderly;
+ }
+
+ public void setOrderly(boolean orderly) {
+ this.orderly = orderly;
+ }
+
+ public int getSuspendCurrentQueueTimeMillis() {
+ return suspendCurrentQueueTimeMillis;
+ }
+
+ public void setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis) {
+ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ }
+
+ public int getMaxReconsumeTimes() {
+ return maxReconsumeTimes;
+ }
+
+ public void setMaxReconsumeTimes(int maxReconsumeTimes) {
+ this.maxReconsumeTimes = maxReconsumeTimes;
+ }
+
+ public int getDelayLevelWhenNextConsume() {
+ return delayLevelWhenNextConsume;
+ }
+
+ public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+ this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ }
+
+ public int getPullThresholdForTopic() {
+ return pullThresholdForTopic;
+ }
+
+ public void setPullThresholdForTopic(int pullThresholdForTopic) {
+ this.pullThresholdForTopic = pullThresholdForTopic;
+ }
+
+ public int getPullThresholdSizeForTopic() {
+ return pullThresholdSizeForTopic;
+ }
+
+ public void setPullThresholdSizeForTopic(int pullThresholdSizeForTopic) {
+ this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
+ }
+
+ public long getPullInterval() {
+ return pullInterval;
+ }
+
+ public void setPullInterval(long pullInterval) {
+ this.pullInterval = pullInterval;
+ }
+
+ public int getConsumeMessageBatchMaxSize() {
+ return consumeMessageBatchMaxSize;
+ }
+
+ public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
+ this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+ }
+ }
+
+ public static class Pull implements Serializable {
+ /**
+ * The poll timeout in milliseconds
+ */
+ private long pollTimeoutMillis = 1000 * 5;
+ /**
+ * Pull thread number
+ */
+ private int pullThreadNums = 20;
+
+ /**
+ * Interval time in in milliseconds for checking changes in topic metadata.
+ */
+ private long topicMetadataCheckIntervalMillis = 30 * 1000;
+
+ /**
+ * Long polling mode, the Consumer connection timeout(must greater than
+ * brokerSuspendMaxTimeMillis), it is not recommended to modify
+ */
+ private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
+
+ /**
+ * Ack state handling, including receive, reject, and retry, when a consumption
+ * exception occurs. see {@link }
+ */
+ private String errAcknowledge;
+
+ private long pullThresholdForAll = 1000L;
+
+ public long getPollTimeoutMillis() {
+ return pollTimeoutMillis;
+ }
+
+ public void setPollTimeoutMillis(long pollTimeoutMillis) {
+ this.pollTimeoutMillis = pollTimeoutMillis;
+ }
+
+ public int getPullThreadNums() {
+ return pullThreadNums;
+ }
+
+ public void setPullThreadNums(int pullThreadNums) {
+ this.pullThreadNums = pullThreadNums;
+ }
+
+ public long getTopicMetadataCheckIntervalMillis() {
+ return topicMetadataCheckIntervalMillis;
+ }
+
+ public void setTopicMetadataCheckIntervalMillis(
+ long topicMetadataCheckIntervalMillis) {
+ this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
+ }
+
+ public long getConsumerTimeoutMillisWhenSuspend() {
+ return consumerTimeoutMillisWhenSuspend;
+ }
+
+ public void setConsumerTimeoutMillisWhenSuspend(
+ long consumerTimeoutMillisWhenSuspend) {
+ this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
+ }
+
+ public String getErrAcknowledge() {
+ return errAcknowledge;
+ }
+
+ public void setErrAcknowledge(String errAcknowledge) {
+ this.errAcknowledge = errAcknowledge;
+ }
+
+ public long getPullThresholdForAll() {
+ return pullThresholdForAll;
+ }
+
+ public void setPullThresholdForAll(long pullThresholdForAll) {
+ this.pullThresholdForAll = pullThresholdForAll;
+ }
}
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
index 890d2250..5fc34ed0 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
@@ -1,11 +1,11 @@
/*
- * Copyright 2013-2018 the original author or authors.
+ * Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,12 +21,14 @@ import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/**
- * @author Timur Valiev
+ * rocketMQ specific extended binding properties class that extends from
+ * {@link AbstractExtendedBindingProperties}.
+ *
* @author Jim
*/
@ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties extends
- AbstractExtendedBindingProperties {
+ AbstractExtendedBindingProperties {
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default";
@@ -37,7 +39,7 @@ public class RocketMQExtendedBindingProperties extends
@Override
public Class extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
- return RocketMQBindingProperties.class;
+ return RocketMQSpecificPropertiesProvider.class;
}
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
index ab2a92a3..8f73cf93 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
@@ -1,11 +1,11 @@
/*
- * Copyright 2013-2018 the original author or authors.
+ * Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,55 +16,39 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-
/**
- * @author Timur Valiev
+ * Extended producer properties for RocketMQ binder.
+ *
* @author Jim
*/
-public class RocketMQProducerProperties {
-
- private Boolean enabled = true;
+public class RocketMQProducerProperties extends RocketMQCommonProperties {
/**
- * Name of producer.
+ * Timeout for sending messages.
*/
- private String group;
-
- /**
- * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}.
- */
- private Integer maxMessageSize = 1024 * 1024 * 4;
-
- private Boolean transactional = false;
-
- private Boolean sync = false;
-
- private Boolean vipChannelEnabled = true;
-
- /**
- * Millis of send message timeout.
- */
- private int sendMessageTimeout = 3000;
+ private int sendMsgTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be
* compressed on default.
*/
- private int compressMessageBodyThreshold = 1024 * 4;
+ private int compressMsgBodyThreshold = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in
- * synchronous mode. This may potentially cause message duplication which is up to
- * application developers to resolve.
+ * synchronous mode.
+ *
+ *
+ * This may potentially cause message duplication which is up to application
+ * developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
- *
* Maximum number of retry to perform internally before claiming sending failure in
* asynchronous mode.
*
+ *
* This may potentially cause message duplication which is up to application
* developers to resolve.
*/
@@ -73,70 +57,45 @@ public class RocketMQProducerProperties {
/**
* Indicate whether to retry another broker on sending failure internally.
*/
- private boolean retryNextServer = false;
+ private boolean retryAnotherBroker = false;
- public String getGroup() {
- return group;
+ /**
+ * Maximum allowed message size in bytes.
+ */
+ private int maxMessageSize = 1024 * 1024 * 4;
+
+ private String producerType = ProducerType.Normal.name();
+
+ private String sendType = SendType.Sync.name();
+
+ private String sendCallBack;
+
+ private String transactionListener;
+
+ private String messageQueueSelector;
+
+ private String errorMessageStrategy;
+
+ private String sendFailureChannel;
+
+ private String checkForbiddenHook;
+
+ private String sendMessageHook;
+
+ public int getSendMsgTimeout() {
+ return sendMsgTimeout;
}
- public void setGroup(String group) {
- this.group = group;
+ public void setSendMsgTimeout(int sendMsgTimeout) {
+ this.sendMsgTimeout = sendMsgTimeout;
}
- public Boolean getEnabled() {
- return enabled;
+ public int getCompressMsgBodyThreshold() {
+ return compressMsgBodyThreshold;
}
- public void setEnabled(Boolean enabled) {
- this.enabled = enabled;
- }
-
- public Integer getMaxMessageSize() {
- return maxMessageSize;
- }
-
- public void setMaxMessageSize(Integer maxMessageSize) {
- this.maxMessageSize = maxMessageSize;
- }
-
- public Boolean getTransactional() {
- return transactional;
- }
-
- public void setTransactional(Boolean transactional) {
- this.transactional = transactional;
- }
-
- public Boolean getSync() {
- return sync;
- }
-
- public void setSync(Boolean sync) {
- this.sync = sync;
- }
-
- public Boolean getVipChannelEnabled() {
- return vipChannelEnabled;
- }
-
- public void setVipChannelEnabled(Boolean vipChannelEnabled) {
- this.vipChannelEnabled = vipChannelEnabled;
- }
-
- public int getSendMessageTimeout() {
- return sendMessageTimeout;
- }
-
- public void setSendMessageTimeout(int sendMessageTimeout) {
- this.sendMessageTimeout = sendMessageTimeout;
- }
-
- public int getCompressMessageBodyThreshold() {
- return compressMessageBodyThreshold;
- }
-
- public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
- this.compressMessageBodyThreshold = compressMessageBodyThreshold;
+ public void setCompressMsgBodyThreshold(int compressMsgBodyThreshold) {
+ this.compressMsgBodyThreshold = compressMsgBodyThreshold;
}
public int getRetryTimesWhenSendFailed() {
@@ -155,12 +114,108 @@ public class RocketMQProducerProperties {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
- public boolean isRetryNextServer() {
- return retryNextServer;
+ public boolean getRetryAnotherBroker() {
+ return retryAnotherBroker;
}
- public void setRetryNextServer(boolean retryNextServer) {
- this.retryNextServer = retryNextServer;
+ public void setRetryAnotherBroker(boolean retryAnotherBroker) {
+ this.retryAnotherBroker = retryAnotherBroker;
+ }
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ public String getProducerType() {
+ return producerType;
+ }
+
+ public void setProducerType(String producerType) {
+ this.producerType = producerType;
+ }
+
+ public String getSendType() {
+ return sendType;
+ }
+
+ public void setSendType(String sendType) {
+ this.sendType = sendType;
+ }
+
+ public String getSendCallBack() {
+ return sendCallBack;
+ }
+
+ public void setSendCallBack(String sendCallBack) {
+ this.sendCallBack = sendCallBack;
+ }
+
+ public String getTransactionListener() {
+ return transactionListener;
+ }
+
+ public void setTransactionListener(String transactionListener) {
+ this.transactionListener = transactionListener;
+ }
+
+ public String getMessageQueueSelector() {
+ return messageQueueSelector;
+ }
+
+ public void setMessageQueueSelector(String messageQueueSelector) {
+ this.messageQueueSelector = messageQueueSelector;
+ }
+
+ public String getErrorMessageStrategy() {
+ return errorMessageStrategy;
+ }
+
+ public void setErrorMessageStrategy(String errorMessageStrategy) {
+ this.errorMessageStrategy = errorMessageStrategy;
+ }
+
+ public String getSendFailureChannel() {
+ return sendFailureChannel;
+ }
+
+ public void setSendFailureChannel(String sendFailureChannel) {
+ this.sendFailureChannel = sendFailureChannel;
+ }
+
+ public String getCheckForbiddenHook() {
+ return checkForbiddenHook;
+ }
+
+ public void setCheckForbiddenHook(String checkForbiddenHook) {
+ this.checkForbiddenHook = checkForbiddenHook;
+ }
+
+ public String getSendMessageHook() {
+ return sendMessageHook;
+ }
+
+ public void setSendMessageHook(String sendMessageHook) {
+ this.sendMessageHook = sendMessageHook;
+ }
+
+ public enum ProducerType {
+ Normal, Trans;
+
+ public boolean equalsName(String name) {
+ return this.name().equalsIgnoreCase(name);
+ }
+ }
+
+ public enum SendType {
+ OneWay, Async, Sync,;
+
+ public boolean equalsName(String name) {
+ return this.name().equalsIgnoreCase(name);
+ }
}
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java
index 694dcdc9..0580fe85 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/provisioning/selector/PartitionMessageQueueSelector.java
@@ -36,7 +36,7 @@ public class PartitionMessageQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
- Integer partition = 0;
+ int partition = 0;
try {
partition = Math.abs(
Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER)));