From 56336b40c54330c9b31b1c08eef4fbee7edf3367 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Tue, 19 Feb 2019 15:36:44 +0800 Subject: [PATCH] sync rocketmq binder to 1.x branch --- spring-cloud-alibaba-dependencies/pom.xml | 6 +- .../rocketmq-example/pom.xml | 4 - .../examples/MyTransactionCheckListener.java | 18 - .../cloud/examples/MyTransactionExecuter.java | 20 - .../cloud/examples/ReceiveService.java | 5 - .../examples/TransactionListenerImpl.java | 51 +++ .../src/main/resources/application.properties | 10 +- spring-cloud-stream-binder-rocketmq/pom.xml | 21 +- .../rocketmq/RocketMQBinderConstants.java | 35 +- .../RocketMQMessageChannelBinder.java | 153 +++---- .../RocketMQMessageHeaderAccessor.java | 139 ------- .../actuator/RocketMQBinderEndpoint.java | 55 --- .../RocketMQBinderHealthIndicator.java | 52 +-- .../RocketMQBinderAutoConfiguration.java | 18 +- ...nderHealthIndicatorAutoConfiguration.java} | 21 +- ...etMQComponent4BinderAutoConfiguration.java | 120 ++++++ .../rocketmq/consuming/Acknowledgement.java | 94 ----- .../rocketmq/consuming/ConsumersManager.java | 135 ------ .../RocketMQListenerBindingContainer.java | 387 ++++++++++++++++++ .../RocketMQInboundChannelAdapter.java | 298 ++++---------- .../integration/RocketMQMessageHandler.java | 253 ++++++------ .../metrics/ConsumerGroupInstrumentation.java | 33 -- .../metrics/ConsumerInstrumentation.java | 59 --- .../rocketmq/metrics/Instrumentation.java | 62 +-- .../metrics/InstrumentationManager.java | 52 +-- .../metrics/ProducerInstrumentation.java | 59 --- ...RocketMQBinderConfigurationProperties.java | 29 +- .../RocketMQConsumerProperties.java | 125 +++--- .../RocketMQProducerProperties.java | 100 ++++- .../main/resources/META-INF/spring.factories | 2 +- .../RocketMQAutoConfigurationTests.java | 8 +- 31 files changed, 1118 insertions(+), 1306 deletions(-) delete mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java delete mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java rename spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/{RocketMQBinderEndpointAutoConfiguration.java => RocketMQBinderHealthIndicatorAutoConfiguration.java} (59%) create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java delete mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 1ab8d95d..72c2c369 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -25,7 +25,7 @@ 4.0.1 1.0.5 2.16.0 - 4.3.1 + 2.0.2-SNAPSHOT 2.1.6 1.1.0 1.1.8 @@ -94,8 +94,8 @@ org.apache.rocketmq - rocketmq-client - ${rocketmq.version} + rocketmq-spring-boot-starter + ${rocketmq.starter.version} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml index 6a208125..647c2eac 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml @@ -28,10 +28,6 @@ org.springframework.boot spring-boot-starter-actuator - - io.dropwizard.metrics - metrics-core - diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java deleted file mode 100644 index 65d72662..00000000 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.springframework.cloud.alibaba.cloud.examples; - -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.apache.rocketmq.common.message.MessageExt; - -/** - * @author Jim - */ -public class MyTransactionCheckListener implements TransactionCheckListener { - - @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - System.out.println("TransactionCheckListener: " + new String(msg.getBody())); - return LocalTransactionState.COMMIT_MESSAGE; - } - -} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java deleted file mode 100644 index 752d4e5f..00000000 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.springframework.cloud.alibaba.cloud.examples; - -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.common.message.Message; - -/** - * @author Jim - */ -public class MyTransactionExecuter implements LocalTransactionExecuter { - @Override - public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { - if ("1".equals(msg.getUserProperty("test"))) { - System.out.println(new String(msg.getBody()) + " rollback"); - return LocalTransactionState.ROLLBACK_MESSAGE; - } - System.out.println(new String(msg.getBody()) + " commit"); - return LocalTransactionState.COMMIT_MESSAGE; - } -} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index d590aa28..35fc25a6 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -25,11 +25,6 @@ public class ReceiveService { System.out.println("input3 receive: " + foo); } - @StreamListener("input1") - public void receiveInput1Again(String receiveMsg) { - System.out.println("input1 receive again: " + receiveMsg); - } - @StreamListener("input4") public void receiveTransactionalMsg(String transactionMsg) { System.out.println("input4 receive transaction msg: " + transactionMsg); diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java new file mode 100644 index 00000000..56812aa6 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import java.util.HashMap; + +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; + +/** + * @author Jim + */ +@RocketMQTransactionListener(txProducerGroup = "TransactionTopic", corePoolSize = 5, maximumPoolSize = 10) +class TransactionListenerImpl implements RocketMQLocalTransactionListener { + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message msg, + Object arg) { + if ("1".equals(((HashMap) msg.getHeaders().get(RocketMQHeaders.PROPERTIES)) + .get("USERS_test"))) { + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " rollback"); + return RocketMQLocalTransactionState.ROLLBACK; + } + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " commit"); + return RocketMQLocalTransactionState.COMMIT; + } + + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { + System.out.println("check: " + new String((byte[]) msg.getPayload())); + return RocketMQLocalTransactionState.COMMIT; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index 54bf902b..753e168d 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -8,8 +8,6 @@ spring.cloud.stream.bindings.output1.content-type=application/json spring.cloud.stream.bindings.output2.destination=TransactionTopic spring.cloud.stream.bindings.output2.content-type=application/json spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true -spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter -spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain @@ -22,20 +20,18 @@ spring.cloud.stream.bindings.input2.content-type=text/plain spring.cloud.stream.bindings.input2.group=test-group2 spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr -spring.cloud.stream.bindings.input2.consumer.concurrency=20 -spring.cloud.stream.bindings.input2.consumer.maxAttempts=1 +spring.cloud.stream.bindings.input2.consumer.concurrency=5 spring.cloud.stream.bindings.input3.destination=test-topic spring.cloud.stream.bindings.input3.content-type=application/json spring.cloud.stream.bindings.input3.group=test-group3 spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj -spring.cloud.stream.bindings.input3.consumer.concurrency=20 -spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 +spring.cloud.stream.bindings.input3.consumer.concurrency=5 spring.cloud.stream.bindings.input4.destination=TransactionTopic spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group -spring.cloud.stream.bindings.input4.consumer.concurrency=210 +spring.cloud.stream.bindings.input4.consumer.concurrency=10 spring.application.name=rocketmq-example diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml index f7ba2c89..4c3f5fff 100644 --- a/spring-cloud-stream-binder-rocketmq/pom.xml +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -20,43 +20,32 @@ spring-cloud-stream - - org.apache.rocketmq - rocketmq-client - - - - io.dropwizard.metrics - metrics-core - provided - true - - org.springframework.boot spring-boot-configuration-processor - provided true org.springframework.boot spring-boot - provided true org.springframework.boot spring-boot-autoconfigure - provided true + + org.apache.rocketmq + rocketmq-spring-boot-starter + + org.springframework.boot spring-boot-actuator - provided true diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index a54d2c1c..c57ce9b6 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -21,43 +21,16 @@ package org.springframework.cloud.stream.binder.rocketmq; */ public interface RocketMQBinderConstants { - String ENDPOINT_ID = "rocketmq_binder"; - /** * Header key */ - String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE"; - - String ROCKET_FLAG = "ROCKETMQ_FLAG"; - - String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; - - String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG"; - - String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; + String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG"; /** - * Instrumentation + * Default value */ - String LASTSEND_TIMESTAMP = "lastSend.timestamp"; + String DEFAULT_NAME_SERVER = "127.0.0.1:9876"; - interface Metrics { - interface Producer { - String PREFIX = "scs-rocketmq.producer."; - String TOTAL_SENT = "totalSent"; - String TOTAL_SENT_FAILURES = "totalSentFailures"; - String SENT_PER_SECOND = "sentPerSecond"; - String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond"; - } - - interface Consumer { - String GROUP_PREFIX = "scs-rocketmq.consumerGroup."; - String PREFIX = "scs-rocketmq.consumer."; - String TOTAL_CONSUMED = "totalConsumed"; - String CONSUMED_PER_SECOND = "consumedPerSecond"; - String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures"; - String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond"; - } - } + String DEFAULT_GROUP = "rocketmq_binder_default_group_name"; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 4982ffa1..98164c64 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,16 +16,18 @@ package org.springframework.cloud.stream.binder.rocketmq; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; -import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -39,10 +41,10 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.util.ClassUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; /** - * @author Timur Valiev * @author Jim */ public class RocketMQMessageChannelBinder extends @@ -50,21 +52,19 @@ public class RocketMQMessageChannelBinder extends implements ExtendedPropertiesBinder { - private static final Logger logger = LoggerFactory - .getLogger(RocketMQMessageChannelBinder.class); - private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; - public RocketMQMessageChannelBinder(ConsumersManager consumersManager, + private Set clientConfigId = new HashSet<>(); + private Map topicInUse = new HashMap<>(); + + public RocketMQMessageChannelBinder( RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQTopicProvisioner provisioningProvider, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, InstrumentationManager instrumentationManager) { super(true, null, provisioningProvider); - this.consumersManager = consumersManager; this.extendedBindingProperties = extendedBindingProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.instrumentationManager = instrumentationManager; @@ -75,23 +75,57 @@ public class RocketMQMessageChannelBinder extends ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( - destination.getName(), producerProperties, - rocketBinderConfigurationProperties, instrumentationManager); + + RocketMQTemplate rocketMQTemplate; if (producerProperties.getExtension().getTransactional()) { - // transaction message check LocalTransactionExecuter - messageHandler.setLocalTransactionExecuter( - getClassConfiguration(destination.getName(), - producerProperties.getExtension().getExecuter(), - LocalTransactionExecuter.class)); - // transaction message check TransactionCheckListener - messageHandler.setTransactionCheckListener( - getClassConfiguration(destination.getName(), - producerProperties.getExtension() - .getTransactionCheckListener(), - TransactionCheckListener.class)); + Map rocketMQTemplates = getApplicationContext() + .getParent().getBeansOfType(RocketMQTemplate.class); + if (rocketMQTemplates.size() == 0) { + throw new IllegalStateException( + "there is no RocketMQTemplate in Spring BeanFactory"); + } + else if (rocketMQTemplates.size() > 1) { + throw new IllegalStateException( + "there is more than 1 RocketMQTemplates in Spring BeanFactory"); + } + rocketMQTemplate = rocketMQTemplates.values().iterator().next(); + clientConfigId.add(rocketMQTemplate.getProducer().buildMQClientId()); + } + else { + rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setObjectMapper(getApplicationContext().getParent() + .getBeansOfType(ObjectMapper.class).values().iterator().next()); + DefaultMQProducer producer = new DefaultMQProducer(destination.getName()); + producer.setNamesrvAddr( + rocketBinderConfigurationProperties.getNamesrvAddr()); + producer.setSendMsgTimeout( + producerProperties.getExtension().getSendMessageTimeout()); + producer.setRetryTimesWhenSendFailed( + producerProperties.getExtension().getRetryTimesWhenSendFailed()); + producer.setRetryTimesWhenSendAsyncFailed(producerProperties + .getExtension().getRetryTimesWhenSendAsyncFailed()); + producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension() + .getCompressMessageBodyThreshold()); + producer.setRetryAnotherBrokerWhenNotStoreOK( + producerProperties.getExtension().isRetryNextServer()); + producer.setMaxMessageSize( + producerProperties.getExtension().getMaxMessageSize()); + producer.setVipChannelEnabled( + producerProperties.getExtension().getVipChannelEnabled()); + rocketMQTemplate.setProducer(producer); + clientConfigId.add(producer.buildMQClientId()); } + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( + rocketMQTemplate, destination.getName(), + producerProperties.getExtension().getTransactional(), + instrumentationManager); + messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); + messageHandler.setSync(producerProperties.getExtension().getSync()); + + if (errorChannel != null) { + messageHandler.setSendFailureChannel(errorChannel); + } return messageHandler; } else { @@ -107,12 +141,25 @@ public class RocketMQMessageChannelBinder extends throws Exception { if (group == null || "".equals(group)) { throw new RuntimeException( - "'group' must be configured for channel + " + destination.getName()); + "'group must be configured for channel " + destination.getName()); } + RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer( + consumerProperties, this); + listenerContainer.setConsumerGroup(group); + listenerContainer.setTopic(destination.getName()); + listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency()); + listenerContainer.setSuspendCurrentQueueTimeMillis( + consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis()); + listenerContainer.setDelayLevelWhenNextConsume( + consumerProperties.getExtension().getDelayLevelWhenNextConsume()); + listenerContainer + .setNameServer(rocketBinderConfigurationProperties.getNamesrvAddr()); + RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( - consumersManager, consumerProperties, destination.getName(), group, - instrumentationManager); + listenerContainer, consumerProperties, instrumentationManager); + + topicInUse.put(destination.getName(), group); ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, consumerProperties); @@ -140,45 +187,11 @@ public class RocketMQMessageChannelBinder extends return extendedBindingProperties.getExtendedProducerProperties(channelName); } - private T getClassConfiguration(String destName, String className, - Class interfaceClass) { - if (StringUtils.isEmpty(className)) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, should set " - + interfaceClass.getSimpleName() + " configuration" - + interfaceClass.getSimpleName() + " should be set, like " - + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour" - + interfaceClass.getSimpleName() + "'"); - } - else if (StringUtils.isNotEmpty(className)) { - Class fieldClass; - // check class exists - try { - fieldClass = ClassUtils.forName(className, - RocketMQMessageChannelBinder.class.getClassLoader()); - } - catch (ClassNotFoundException e) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, but " + className - + " class is not found"); - } - // check interface incompatible - if (!interfaceClass.isAssignableFrom(fieldClass)) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, but " + className - + " is incompatible with " + interfaceClass.getSimpleName() - + " interface"); - } - try { - return (T) fieldClass.newInstance(); - } - catch (Exception e) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, but " + className - + " instance error", e); - } - } - return null; + public Set getClientConfigId() { + return clientConfigId; } + public Map getTopicInUse() { + return topicInUse; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java deleted file mode 100644 index 23671b60..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq; - -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang3.math.NumberUtils; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; -import org.springframework.integration.support.MutableMessage; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.MessageHeaderAccessor; - -/** - * @author Timur Valiev - * @author Jim - */ -public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { - - public RocketMQMessageHeaderAccessor() { - super(); - } - - public RocketMQMessageHeaderAccessor(Message message) { - super(message); - } - - public Acknowledgement getAcknowledgement(Message message) { - return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class); - } - - public RocketMQMessageHeaderAccessor withAcknowledgment( - Acknowledgement acknowledgment) { - setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment); - return this; - } - - public String getTags() { - return getMessageHeaders().get(MessageConst.PROPERTY_TAGS) == null ? "" - : (String) getMessageHeaders().get(MessageConst.PROPERTY_TAGS); - } - - public RocketMQMessageHeaderAccessor withTags(String tag) { - setHeader(MessageConst.PROPERTY_TAGS, tag); - return this; - } - - public String getKeys() { - return getMessageHeaders().get(MessageConst.PROPERTY_KEYS) == null ? "" - : (String) getMessageHeaders().get(MessageConst.PROPERTY_KEYS); - } - - public RocketMQMessageHeaderAccessor withKeys(String keys) { - setHeader(MessageConst.PROPERTY_KEYS, keys); - return this; - } - - public MessageExt getRocketMessage() { - return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class); - } - - public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) { - setHeader(ORIGINAL_ROCKET_MESSAGE, message); - return this; - } - - public Integer getDelayTimeLevel() { - return NumberUtils.toInt( - (String) getMessageHeaders().get(MessageConst.PROPERTY_DELAY_TIME_LEVEL), - 0); - } - - public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) { - setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); - return this; - } - - public Integer getFlag() { - return NumberUtils.toInt((String) getMessageHeaders().get(ROCKET_FLAG), 0); - } - - public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) { - setHeader(ROCKET_FLAG, delayTimeLevel); - return this; - } - - public Object getTransactionalArg() { - return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG); - } - - public Object withTransactionalArg(Object arg) { - setHeader(ROCKET_TRANSACTIONAL_ARG, arg); - return this; - } - - public SendResult getSendResult() { - return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); - } - - public static void putSendResult(MutableMessage message, SendResult sendResult) { - message.getHeaders().put(ROCKET_SEND_RESULT, sendResult); - } - - public Map getUserProperties() { - Map result = new HashMap<>(); - for (Map.Entry entry : this.toMap().entrySet()) { - if (entry.getValue() instanceof String - && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) - && !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) { - result.put(entry.getKey(), (String) entry.getValue()); - } - } - return result; - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java deleted file mode 100644 index baf8e700..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.actuator; - -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.endpoint.AbstractEndpoint; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; - -/** - * @author Timur Valiev - * @author Jim - */ -public class RocketMQBinderEndpoint extends AbstractEndpoint> { - - @Autowired(required = false) - private InstrumentationManager instrumentationManager; - - public RocketMQBinderEndpoint() { - super(ENDPOINT_ID); - } - - @Override - public Map invoke() { - Map result = new HashMap<>(); - if (instrumentationManager != null) { - result.put("metrics", instrumentationManager.getMetricRegistry()); - result.put("runtime", instrumentationManager.getRuntime()); - } - else { - result.put("warning", - "please add metrics-core dependency, we use it for metrics"); - } - return result; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java index cb2c2efe..719107da 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -28,45 +28,37 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM */ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { - @Autowired(required = false) + @Autowired private InstrumentationManager instrumentationManager; @Override protected void doHealthCheck(Health.Builder builder) throws Exception { int upCount = 0, outOfServiceCount = 0; - if (instrumentationManager != null) { - for (Instrumentation instrumentation : instrumentationManager - .getHealthInstrumentations()) { - if (instrumentation.isUp()) { - upCount++; - } - else if (instrumentation.isOutOfService()) { - upCount++; - } + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (instrumentation.isUp()) { + upCount++; } - if (upCount == instrumentationManager.getHealthInstrumentations().size()) { - builder.up(); - return; - } - else if (outOfServiceCount == instrumentationManager - .getHealthInstrumentations().size()) { - builder.outOfService(); - return; - } - builder.down(); - - for (Instrumentation instrumentation : instrumentationManager - .getHealthInstrumentations()) { - if (!instrumentation.isStarted()) { - builder.withException(instrumentation.getStartException()); - } + else if (instrumentation.isOutOfService()) { + upCount++; } } - else { - builder.down(); - builder.withDetail("warning", - "please add metrics-core dependency, we use it for metrics"); + if (upCount == instrumentationManager.getHealthInstrumentations().size()) { + builder.up(); + return; } + else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations() + .size()) { + builder.outOfService(); + return; + } + builder.down(); + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (!instrumentation.isStarted()) { + builder.withException(instrumentation.getStartException()); + } + } } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index 95abaeeb..d2888839 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -16,23 +16,23 @@ package org.springframework.cloud.stream.binder.rocketmq.config; -import org.apache.rocketmq.client.log.ClientLogger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; -import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; /** * @author Timur Valiev * @author Jim */ @Configuration +@Import(RocketMQBinderHealthIndicatorAutoConfiguration.class) @EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class }) public class RocketMQBinderAutoConfiguration { @@ -41,17 +41,12 @@ public class RocketMQBinderAutoConfiguration { private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - @Autowired(required = false) - private InstrumentationManager instrumentationManager; - @Autowired public RocketMQBinderAutoConfiguration( RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { this.extendedBindingProperties = extendedBindingProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, - this.rocketBinderConfigurationProperties.getLogLevel()); } @Bean @@ -62,17 +57,16 @@ public class RocketMQBinderAutoConfiguration { @Bean public RocketMQMessageChannelBinder rocketMessageChannelBinder( RocketMQTopicProvisioner provisioningProvider, - ConsumersManager consumersManager) { + InstrumentationManager instrumentationManager) { RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( - consumersManager, extendedBindingProperties, provisioningProvider, + extendedBindingProperties, provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager); return binder; } @Bean - public ConsumersManager consumersManager() { - return new ConsumersManager(instrumentationManager, - rocketBinderConfigurationProperties); + public InstrumentationManager instrumentationManager() { + return new InstrumentationManager(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java similarity index 59% rename from spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java rename to spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java index 4b602621..3d3e572c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java @@ -16,13 +16,8 @@ package org.springframework.cloud.stream.binder.rocketmq.config; -import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration; -import org.springframework.boot.actuate.endpoint.Endpoint; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -30,24 +25,12 @@ import org.springframework.context.annotation.Configuration; * @author Jim */ @Configuration -@AutoConfigureAfter(EndpointAutoConfiguration.class) -@ConditionalOnClass(Endpoint.class) -public class RocketMQBinderEndpointAutoConfiguration { - - @Bean - public RocketMQBinderEndpoint rocketBinderEndpoint() { - return new RocketMQBinderEndpoint(); - } +@ConditionalOnClass(name = "org.springframework.boot.actuate.endpoint.AbstractEndpoint") +public class RocketMQBinderHealthIndicatorAutoConfiguration { @Bean public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() { return new RocketMQBinderHealthIndicator(); } - @Bean - @ConditionalOnClass(name = "com.codahale.metrics.Counter") - public InstrumentationManager instrumentationManager() { - return new InstrumentationManager(); - } - } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java new file mode 100644 index 00000000..4b34a104 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer; +import org.apache.rocketmq.spring.config.RocketMQConfigUtils; +import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor; +import org.apache.rocketmq.spring.config.TransactionHandlerRegistry; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.util.Assert; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author Jim + */ +@Configuration +@AutoConfigureAfter(RocketMQAutoConfiguration.class) +@ConditionalOnMissingBean(DefaultMQProducer.class) +public class RocketMQComponent4BinderAutoConfiguration { + + private final Environment environment; + + public RocketMQComponent4BinderAutoConfiguration(Environment environment) { + this.environment = environment; + } + + @Bean + public DefaultMQProducer defaultMQProducer() { + RocketMQProperties rocketMQProperties = new RocketMQProperties(); + String configNameServer = environment + .getProperty("spring.cloud.stream.rocketmq.binder.namesrv-addr"); + if (StringUtils.isEmpty(configNameServer)) { + rocketMQProperties.setNameServer(RocketMQBinderConstants.DEFAULT_NAME_SERVER); + } + else { + rocketMQProperties.setNameServer(configNameServer); + } + RocketMQProperties.Producer producerConfig = new Producer(); + rocketMQProperties.setProducer(producerConfig); + producerConfig.setGroup(RocketMQBinderConstants.DEFAULT_GROUP); + + String nameServer = rocketMQProperties.getNameServer(); + String groupName = producerConfig.getGroup(); + Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); + Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); + + DefaultMQProducer producer = new DefaultMQProducer(groupName); + producer.setNamesrvAddr(nameServer); + producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); + producer.setRetryTimesWhenSendFailed( + producerConfig.getRetryTimesWhenSendFailed()); + producer.setRetryTimesWhenSendAsyncFailed( + producerConfig.getRetryTimesWhenSendAsyncFailed()); + producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); + producer.setCompressMsgBodyOverHowmuch( + producerConfig.getCompressMessageBodyThreshold()); + producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); + + return producer; + } + + @Bean(destroyMethod = "destroy") + @ConditionalOnBean(DefaultMQProducer.class) + @ConditionalOnMissingBean(RocketMQTemplate.class) + public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, + ObjectMapper rocketMQMessageObjectMapper) { + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setProducer(mqProducer); + rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper); + return rocketMQTemplate; + } + + @Bean + @ConditionalOnBean(RocketMQTemplate.class) + @ConditionalOnMissingBean(TransactionHandlerRegistry.class) + public TransactionHandlerRegistry transactionHandlerRegistry( + RocketMQTemplate template) { + return new TransactionHandlerRegistry(template); + } + + @Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME) + @ConditionalOnBean(TransactionHandlerRegistry.class) + public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor( + TransactionHandlerRegistry transactionHandlerRegistry) { + return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry); + } + + @Bean + @ConditionalOnMissingBean(ObjectMapper.class) + public ObjectMapper rocketMQBinderObjectMapper() { + return new ObjectMapper(); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java deleted file mode 100644 index 207dbe50..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; - -/** - * @author Timur Valiev - * @author Jim - */ -public class Acknowledgement { - - /** - * for {@link ConsumeConcurrentlyContext} using - */ - private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - /** - * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency - */ - private Integer consumeConcurrentlyDelayLevel = 0; - - /** - * for {@link ConsumeOrderlyContext} using - */ - private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS; - private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L; - - public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) { - this.consumeConcurrentlyStatus = consumeConcurrentlyStatus; - return this; - } - - public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() { - return consumeConcurrentlyStatus; - } - - public ConsumeOrderlyStatus getConsumeOrderlyStatus() { - return consumeOrderlyStatus; - } - - public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) { - this.consumeOrderlyStatus = consumeOrderlyStatus; - return this; - } - - public Integer getConsumeConcurrentlyDelayLevel() { - return consumeConcurrentlyDelayLevel; - } - - public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) { - this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel; - } - - public Long getConsumeOrderlySuspendCurrentQueueTimeMill() { - return consumeOrderlySuspendCurrentQueueTimeMill; - } - - public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) { - this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill; - } - - public static Acknowledgement buildOrderlyInstance() { - Acknowledgement acknowledgement = new Acknowledgement(); - acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS); - return acknowledgement; - } - - public static Acknowledgement buildConcurrentlyInstance() { - Acknowledgement acknowledgement = new Acknowledgement(); - acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS); - return acknowledgement; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java deleted file mode 100644 index c56f060c..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; - -/** - * @author Timur Valiev - * @author Jim - */ -public class ConsumersManager { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private InstrumentationManager instrumentationManager; - - private final Map consumerGroups = new HashMap<>(); - private final Map started = new HashMap<>(); - private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>(); - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - - public ConsumersManager(InstrumentationManager instrumentationManager, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { - this.instrumentationManager = instrumentationManager; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - } - - public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, - String topic, - ExtendedConsumerProperties consumerProperties) { - propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), - consumerProperties); - if (instrumentationManager != null) { - ConsumerGroupInstrumentation instrumentation = instrumentationManager - .getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(instrumentation); - } - - if (consumerGroups.containsKey(group)) { - return consumerGroups.get(group); - } - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); - consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - consumerGroups.put(group, consumer); - started.put(group, false); - consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); - consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); - if (consumerProperties.getExtension().getBroadcasting()) { - consumer.setMessageModel(MessageModel.BROADCASTING); - } - logger.info("RocketMQ consuming for SCS group {} created", group); - return consumer; - } - - public synchronized void startConsumers() throws MQClientException { - for (String group : getConsumerGroups()) { - start(group); - } - } - - public synchronized void startConsumer(String group) throws MQClientException { - start(group); - } - - public synchronized void stopConsumer(String group) { - stop(group); - } - - private void stop(String group) { - if (consumerGroups.get(group) != null) { - consumerGroups.get(group).shutdown(); - started.put(group, false); - } - } - - private synchronized void start(String group) throws MQClientException { - if (started.get(group)) { - return; - } - ConsumerGroupInstrumentation groupInstrumentation = null; - if (instrumentationManager != null) { - groupInstrumentation = instrumentationManager - .getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(groupInstrumentation); - } - try { - consumerGroups.get(group).start(); - started.put(group, true); - if (groupInstrumentation != null) { - groupInstrumentation.markStartedSuccessfully(); - } - } - catch (MQClientException e) { - if (groupInstrumentation != null) { - groupInstrumentation.markStartFailed(e); - } - logger.error("RocketMQ Consumer hasn't been started. Caused by " - + e.getErrorMessage(), e); - throw e; - } - } - - public synchronized Set getConsumerGroups() { - return consumerGroups.keySet(); - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java new file mode 100644 index 00000000..5b6abad9 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -0,0 +1,387 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.List; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; +import org.apache.rocketmq.spring.support.RocketMQListenerContainer; +import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.context.SmartLifecycle; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQListenerBindingContainer + implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQListenerBindingContainer.class); + + private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency. + */ + private int delayLevelWhenNextConsume = 0; + + private String nameServer; + + private String consumerGroup; + + private String topic; + + private int consumeThreadMax = 64; + + private String charset = "UTF-8"; + + private RocketMQListener rocketMQListener; + + private DefaultMQPushConsumer consumer; + + private boolean running; + + private final ExtendedConsumerProperties rocketMQConsumerProperties; + + private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; + + // The following properties came from RocketMQConsumerProperties. + private ConsumeMode consumeMode; + private SelectorType selectorType; + private String selectorExpression; + private MessageModel messageModel; + + public RocketMQListenerBindingContainer( + ExtendedConsumerProperties rocketMQConsumerProperties, + RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { + this.rocketMQConsumerProperties = rocketMQConsumerProperties; + this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; + this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() + ? ConsumeMode.ORDERLY + : ConsumeMode.CONCURRENTLY; + if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) { + this.selectorType = SelectorType.TAG; + this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags(); + } + else { + this.selectorType = SelectorType.SQL92; + this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql(); + } + this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting() + ? MessageModel.BROADCASTING + : MessageModel.CLUSTERING; + } + + @Override + public void setupMessageListener(RocketMQListener rocketMQListener) { + this.rocketMQListener = rocketMQListener; + } + + @Override + public void destroy() throws Exception { + this.setRunning(false); + if (consumer != null) { + consumer.shutdown(); + } + log.info("container destroyed, {}", this.toString()); + } + + @Override + public void afterPropertiesSet() throws Exception { + initRocketMQPushConsumer(); + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + @Override + public void start() { + if (this.isRunning()) { + throw new IllegalStateException( + "container already running. " + this.toString()); + } + + try { + consumer.start(); + } + catch (MQClientException e) { + throw new IllegalStateException("Failed to start RocketMQ push consumer", e); + } + this.setRunning(true); + + log.info("running container: {}", this.toString()); + } + + @Override + public void stop() { + if (this.isRunning()) { + if (consumer != null) { + consumer.shutdown(); + } + setRunning(false); + } + } + + @Override + public boolean isRunning() { + return running; + } + + private void setRunning(boolean running) { + this.running = running; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE; + } + + private void initRocketMQPushConsumer() throws MQClientException { + Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); + Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); + Assert.notNull(nameServer, "Property 'nameServer' is required"); + Assert.notNull(topic, "Property 'topic' is required"); + + consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setNamesrvAddr(nameServer); + consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); + consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); + + switch (messageModel) { + case BROADCASTING: + consumer.setMessageModel( + org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); + break; + case CLUSTERING: + consumer.setMessageModel( + org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); + break; + default: + throw new IllegalArgumentException("Property 'messageModel' was wrong."); + } + + switch (selectorType) { + case TAG: + consumer.subscribe(topic, selectorExpression); + break; + case SQL92: + consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); + break; + default: + throw new IllegalArgumentException("Property 'selectorType' was wrong."); + } + + switch (consumeMode) { + case ORDERLY: + consumer.setMessageListener(new DefaultMessageListenerOrderly()); + break; + case CONCURRENTLY: + consumer.setMessageListener(new DefaultMessageListenerConcurrently()); + break; + default: + throw new IllegalArgumentException("Property 'consumeMode' was wrong."); + } + + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener) rocketMQListener) + .prepareStart(consumer); + } + + rocketMQMessageChannelBinder.getClientConfigId().add(consumer.buildMQClientId()); + + } + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } + + public int getDelayLevelWhenNextConsume() { + return delayLevelWhenNextConsume; + } + + public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + } + + public String getNameServer() { + return nameServer; + } + + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getConsumeThreadMax() { + return consumeThreadMax; + } + + public void setConsumeThreadMax(int consumeThreadMax) { + this.consumeThreadMax = consumeThreadMax; + } + + public String getCharset() { + return charset; + } + + public void setCharset(String charset) { + this.charset = charset; + } + + public RocketMQListener getRocketMQListener() { + return rocketMQListener; + } + + public void setRocketMQListener(RocketMQListener rocketMQListener) { + this.rocketMQListener = rocketMQListener; + } + + public DefaultMQPushConsumer getConsumer() { + return consumer; + } + + public void setConsumer(DefaultMQPushConsumer consumer) { + this.consumer = consumer; + } + + public ExtendedConsumerProperties getRocketMQConsumerProperties() { + return rocketMQConsumerProperties; + } + + public ConsumeMode getConsumeMode() { + return consumeMode; + } + + public SelectorType getSelectorType() { + return selectorType; + } + + public String getSelectorExpression() { + return selectorExpression; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public class DefaultMessageListenerConcurrently + implements MessageListenerConcurrently { + + @SuppressWarnings("unchecked") + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener + .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } + catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + + public class DefaultMessageListenerOrderly implements MessageListenerOrderly { + + @SuppressWarnings("unchecked") + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener + .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } + catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setSuspendCurrentQueueTimeMillis( + suspendCurrentQueueTimeMillis); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 4558eb95..56250e16 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -16,41 +16,24 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.commons.lang3.ClassUtils; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.MessageSelector; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListener; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; -import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; -import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; -import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; +import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.MessagingException; import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; import org.springframework.retry.support.RetryTemplate; -import org.springframework.util.StringUtils; +import org.springframework.util.Assert; /** * @author Jim @@ -60,105 +43,89 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { private static final Logger logger = LoggerFactory .getLogger(RocketMQInboundChannelAdapter.class); - private ConsumerInstrumentation consumerInstrumentation; - - private InstrumentationManager instrumentationManager; - - private final ExtendedConsumerProperties consumerProperties; - - private final String destination; - - private final String group; - - private final ConsumersManager consumersManager; - private RetryTemplate retryTemplate; private RecoveryCallback recoveryCallback; - public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, + private RocketMQListenerBindingContainer rocketMQListenerContainer; + + private final ExtendedConsumerProperties consumerProperties; + + private final InstrumentationManager instrumentationManager; + + public RocketMQInboundChannelAdapter( + RocketMQListenerBindingContainer rocketMQListenerContainer, ExtendedConsumerProperties consumerProperties, - String destination, String group, InstrumentationManager instrumentationManager) { - this.consumersManager = consumersManager; + this.rocketMQListenerContainer = rocketMQListenerContainer; this.consumerProperties = consumerProperties; - this.destination = destination; - this.group = group; this.instrumentationManager = instrumentationManager; } + @Override + protected void onInit() { + if (consumerProperties == null + || !consumerProperties.getExtension().getEnabled()) { + return; + } + super.onInit(); + if (this.retryTemplate != null) { + Assert.state(getErrorChannel() == null, + "Cannot have an 'errorChannel' property when a 'RetryTemplate' is " + + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + + "send an error message when retries are exhausted"); + } + + BindingRocketMQListener listener = new BindingRocketMQListener(); + rocketMQListenerContainer.setRocketMQListener(listener); + + if (retryTemplate != null) { + this.retryTemplate.registerListener(listener); + } + + try { + rocketMQListenerContainer.afterPropertiesSet(); + + } + catch (Exception e) { + logger.error("rocketMQListenerContainer init error: " + e.getMessage(), e); + throw new IllegalArgumentException( + "rocketMQListenerContainer init error: " + e.getMessage(), e); + } + + instrumentationManager.addHealthInstrumentation( + new Instrumentation(rocketMQListenerContainer.getTopic() + + rocketMQListenerContainer.getConsumerGroup())); + } + @Override protected void doStart() { if (consumerProperties == null || !consumerProperties.getExtension().getEnabled()) { return; } - - String tags = consumerProperties.getExtension().getTags(); - Boolean isOrderly = consumerProperties.getExtension().getOrderly(); - - DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, - destination, consumerProperties); - - final CloudStreamMessageListener listener = isOrderly - ? new CloudStreamMessageListenerOrderly() - : new CloudStreamMessageListenerConcurrently(); - - if (retryTemplate != null) { - retryTemplate.registerListener(listener); - } - - Set tagsSet = new HashSet<>(); - if (!StringUtils.isEmpty(tags)) { - for (String tag : tags.split("\\|\\|")) { - tagsSet.add(tag.trim()); - } - } - - if (instrumentationManager != null) { - consumerInstrumentation = instrumentationManager - .getConsumerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(consumerInstrumentation); - } - try { - if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { - consumer.subscribe(destination, MessageSelector - .bySql(consumerProperties.getExtension().getSql())); - } - else { - consumer.subscribe(destination, - org.apache.commons.lang3.StringUtils.join(tagsSet, " || ")); - } - if (consumerInstrumentation != null) { - consumerInstrumentation.markStartedSuccessfully(); - } + rocketMQListenerContainer.start(); + instrumentationManager + .getHealthInstrumentation(rocketMQListenerContainer.getTopic() + + rocketMQListenerContainer.getConsumerGroup()) + .markStartedSuccessfully(); } - catch (MQClientException e) { - if (consumerInstrumentation != null) { - consumerInstrumentation.markStartFailed(e); - } - logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " - + e.getErrorMessage(), e); - throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); - } - - consumer.registerMessageListener(listener); - - try { - consumersManager.startConsumer(group); - } - catch (MQClientException e) { - logger.error( - "RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), - e); - throw new RuntimeException("RocketMQ Consumer startup failed.", e); + catch (Exception e) { + instrumentationManager + .getHealthInstrumentation(rocketMQListenerContainer.getTopic() + + rocketMQListenerContainer.getConsumerGroup()) + .markStartFailed(e); + logger.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); + throw new MessagingException(MessageBuilder.withPayload( + "RocketMQTemplate startup failed, Caused by " + e.getMessage()) + .build(), e); } } @Override protected void doStop() { - consumersManager.stopConsumer(group); + rocketMQListenerContainer.stop(); } public void setRetryTemplate(RetryTemplate retryTemplate) { @@ -169,84 +136,28 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { this.recoveryCallback = recoveryCallback; } - protected class CloudStreamMessageListener implements MessageListener, RetryListener { + protected class BindingRocketMQListener + implements RocketMQListener, RetryListener { - Acknowledgement consumeMessage(final List msgs) { + @Override + public void onMessage(final Message message) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; - try { - if (enableRetry) { - return RocketMQInboundChannelAdapter.this.retryTemplate - .execute(new RetryCallback() { - @Override - public Acknowledgement doWithRetry(RetryContext context) - throws Exception { - return doSendMsgs(msgs, context); - } - }, new RecoveryCallback() { - @Override - public Acknowledgement recover(RetryContext context) - throws Exception { - RocketMQInboundChannelAdapter.this.recoveryCallback - .recover(context); - if (ClassUtils.isAssignable(this.getClass(), - MessageListenerConcurrently.class)) { - return Acknowledgement - .buildConcurrentlyInstance(); - } - else { - return Acknowledgement.buildOrderlyInstance(); - } - } - }); - } - else { - Acknowledgement result = doSendMsgs(msgs, null); - if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) { - RocketMQInboundChannelAdapter.this.instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); - } - return result; - } + if (enableRetry) { + RocketMQInboundChannelAdapter.this.retryTemplate + .execute(new RetryCallback() { + @Override + public Object doWithRetry(RetryContext context) + throws RuntimeException { + RocketMQInboundChannelAdapter.this.sendMessage(message); + return null; + } + }, (RecoveryCallback) RocketMQInboundChannelAdapter.this.recoveryCallback); } - catch (Exception e) { - logger.error( - "RocketMQ Message hasn't been processed successfully. Caused by ", - e); - if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) { - RocketMQInboundChannelAdapter.this.instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); - } - throw new RuntimeException( - "RocketMQ Message hasn't been processed successfully. Caused by ", - e); + else { + RocketMQInboundChannelAdapter.this.sendMessage(message); } } - private Acknowledgement doSendMsgs(final List msgs, - RetryContext context) { - List acknowledgements = new ArrayList<>(); - for (MessageExt msg : msgs) { - String retryInfo = context == null ? "" - : "retryCount-" + String.valueOf(context.getRetryCount()) + "|"; - logger.debug(retryInfo + "consuming msg:\n" + msg); - logger.debug(retryInfo + "message body:\n" + new String(msg.getBody())); - Acknowledgement acknowledgement = new Acknowledgement(); - Message toChannel = MessageBuilder.withPayload(msg.getBody()) - .setHeaders(new RocketMQMessageHeaderAccessor() - .withAcknowledgment(acknowledgement) - .withTags(msg.getTags()).withKeys(msg.getKeys()) - .withFlag(msg.getFlag()).withRocketMessage(msg)) - .build(); - acknowledgements.add(acknowledgement); - RocketMQInboundChannelAdapter.this.sendMessage(toChannel); - } - return acknowledgements.get(0); - } - @Override public boolean open(RetryContext context, RetryCallback callback) { @@ -256,54 +167,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override public void close(RetryContext context, RetryCallback callback, Throwable throwable) { - if (RocketMQInboundChannelAdapter.this.instrumentationManager == null) { - return; - } - if (throwable != null) { - RocketMQInboundChannelAdapter.this.instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); - } - else { - RocketMQInboundChannelAdapter.this.instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); - } } @Override public void onError(RetryContext context, RetryCallback callback, Throwable throwable) { + } } - protected class CloudStreamMessageListenerConcurrently - extends CloudStreamMessageListener implements MessageListenerConcurrently { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(final List msgs, - ConsumeConcurrentlyContext context) { - Acknowledgement acknowledgement = consumeMessage(msgs); - context.setDelayLevelWhenNextConsume( - acknowledgement.getConsumeConcurrentlyDelayLevel()); - return acknowledgement.getConsumeConcurrentlyStatus(); - } - } - - protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener - implements MessageListenerOrderly { - - @Override - public ConsumeOrderlyStatus consumeMessage(List msgs, - ConsumeOrderlyContext context) { - Acknowledgement acknowledgement = consumeMessage(msgs); - context.setSuspendCurrentQueueTimeMillis( - (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); - return acknowledgement.getConsumeOrderlyStatus(); - } - - } - -} +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 331df01b..578e746f 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -16,110 +16,83 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; -import java.util.Map; - -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.apache.rocketmq.client.producer.TransactionMQProducer; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; +import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import org.springframework.context.Lifecycle; import org.springframework.integration.handler.AbstractMessageHandler; -import org.springframework.integration.support.MutableMessage; +import org.springframework.integration.support.DefaultErrorMessageStrategy; +import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * @author Jim */ public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { - private DefaultMQProducer producer; + private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); - private ProducerInstrumentation producerInstrumentation; + private MessageChannel sendFailureChannel; - private InstrumentationManager instrumentationManager; + private final RocketMQTemplate rocketMQTemplate; - private LocalTransactionExecuter localTransactionExecuter; - - private TransactionCheckListener transactionCheckListener; - - private final ExtendedProducerProperties producerProperties; + private final Boolean transactional; private final String destination; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final InstrumentationManager instrumentationManager; + + private boolean sync = false; private volatile boolean running = false; - public RocketMQMessageHandler(String destination, - ExtendedProducerProperties producerProperties, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, - InstrumentationManager instrumentationManager) { + public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, + Boolean transactional, InstrumentationManager instrumentationManager) { + this.rocketMQTemplate = rocketMQTemplate; this.destination = destination; - this.producerProperties = producerProperties; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.transactional = transactional; this.instrumentationManager = instrumentationManager; } @Override public void start() { - if (producerProperties.getExtension().getTransactional()) { - producer = new TransactionMQProducer(destination); - if (transactionCheckListener != null) { - ((TransactionMQProducer) producer) - .setTransactionCheckListener(transactionCheckListener); + if (!transactional) { + instrumentationManager + .addHealthInstrumentation(new Instrumentation(destination)); + try { + rocketMQTemplate.afterPropertiesSet(); + instrumentationManager.getHealthInstrumentation(destination) + .markStartedSuccessfully(); } - } - else { - producer = new DefaultMQProducer(destination); - } - - if (instrumentationManager != null) { - producerInstrumentation = instrumentationManager - .getProducerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(producerInstrumentation); - } - - producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - - if (producerProperties.getExtension().getMaxMessageSize() > 0) { - producer.setMaxMessageSize( - producerProperties.getExtension().getMaxMessageSize()); - } - - try { - producer.start(); - if (producerInstrumentation != null) { - producerInstrumentation.markStartedSuccessfully(); + catch (Exception e) { + instrumentationManager.getHealthInstrumentation(destination) + .markStartFailed(e); + logger.error( + "RocketMQTemplate startup failed, Caused by " + e.getMessage()); + throw new MessagingException(MessageBuilder.withPayload( + "RocketMQTemplate startup failed, Caused by " + e.getMessage()) + .build(), e); } } - catch (MQClientException e) { - if (producerInstrumentation != null) { - producerInstrumentation.markStartFailed(e); - } - logger.error( - "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); - throw new MessagingException(e.getMessage(), e); - } running = true; } @Override public void stop() { - if (producer != null) { - producer.shutdown(); + if (!transactional) { + rocketMQTemplate.destroy(); } running = false; } @@ -130,76 +103,116 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } @Override - protected void handleMessageInternal(org.springframework.messaging.Message message) - throws Exception { + protected void handleMessageInternal( + final org.springframework.messaging.Message message) throws Exception { try { - Message toSend; - if (message.getPayload() instanceof byte[]) { - toSend = new Message(destination, (byte[]) message.getPayload()); + StringBuilder topicWithTags = new StringBuilder(destination); + String tags = null; + if (message.getHeaders().get(RocketMQHeaders.TAGS) != null) { + tags = message.getHeaders().get(RocketMQHeaders.TAGS).toString(); } - else if (message.getPayload() instanceof String) { - toSend = new Message(destination, - ((String) message.getPayload()).getBytes()); + if (!StringUtils.isEmpty(tags)) { + topicWithTags = topicWithTags.append(":").append(tags); + } + SendResult sendRes = null; + if (transactional) { + sendRes = rocketMQTemplate.sendMessageInTransaction(destination, + topicWithTags.toString(), message, message.getHeaders() + .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); } else { - throw new UnsupportedOperationException("Payload class isn't supported: " - + message.getPayload().getClass()); - } - RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor( - message); - headerAccessor.setLeaveMutable(true); - toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel()); - toSend.setTags(headerAccessor.getTags()); - toSend.setKeys(headerAccessor.getKeys()); - toSend.setFlag(headerAccessor.getFlag()); - for (Map.Entry entry : headerAccessor.getUserProperties() - .entrySet()) { - toSend.putUserProperty(entry.getKey(), entry.getValue()); - } + int delayLevel = 0; + try { + Object delayLevelObj = message.getHeaders() + .get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); + if (delayLevelObj instanceof Number) { + delayLevel = ((Number) delayLevelObj).intValue(); + } + else if (delayLevelObj instanceof String) { + delayLevel = Integer.parseInt((String) delayLevelObj); + } + } + catch (Exception e) { + // ignore + } + if (sync) { + sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, + rocketMQTemplate.getProducer().getSendMsgTimeout(), + delayLevel); + } + else { + rocketMQTemplate.asyncSend(topicWithTags.toString(), message, + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { - SendResult sendRes; - if (producerProperties.getExtension().getTransactional()) { - sendRes = producer.sendMessageInTransaction(toSend, - localTransactionExecuter, headerAccessor.getTransactionalArg()); - } - else { - sendRes = producer.send(toSend); - } + } - if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { - throw new MQClientException("message hasn't been sent", null); + @Override + public void onException(Throwable e) { + if (getSendFailureChannel() != null) { + getSendFailureChannel().send( + RocketMQMessageHandler.this.errorMessageStrategy + .buildErrorMessage( + new MessagingException( + message, e), + null)); + } + } + }); + } } - if (message instanceof MutableMessage) { - RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, - sendRes); - } - if (instrumentationManager != null) { - instrumentationManager.getRuntime().put( - RocketMQBinderConstants.LASTSEND_TIMESTAMP, - System.currentTimeMillis()); - producerInstrumentation.markSent(); + if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { + if (getSendFailureChannel() != null) { + this.getSendFailureChannel().send(message); + } + else { + throw new MessagingException(message, + new MQClientException("message hasn't been sent", null)); + } } } - catch (MQClientException | RemotingException | MQBrokerException - | InterruptedException | UnsupportedOperationException e) { - if (producerInstrumentation != null) { - producerInstrumentation.markSentFailure(); - } + catch (Exception e) { logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); - throw new MessagingException(e.getMessage(), e); + if (getSendFailureChannel() != null) { + getSendFailureChannel().send(this.errorMessageStrategy + .buildErrorMessage(new MessagingException(message, e), null)); + } + else { + throw new MessagingException(message, e); + } } } - public void setLocalTransactionExecuter( - LocalTransactionExecuter localTransactionExecuter) { - this.localTransactionExecuter = localTransactionExecuter; + /** + * Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent + * to this channel with a payload of a {@link MessagingException} with the failed + * message and cause. + * @param sendFailureChannel the failure channel. + * @since 0.2.2 + */ + public void setSendFailureChannel(MessageChannel sendFailureChannel) { + this.sendFailureChannel = sendFailureChannel; } - public void setTransactionCheckListener( - TransactionCheckListener transactionCheckListener) { - this.transactionCheckListener = transactionCheckListener; + /** + * Set the error message strategy implementation to use when sending error messages + * after send failures. Cannot be null. + * @param errorMessageStrategy the implementation. + * @since 0.2.2 + */ + public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) { + Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null"); + this.errorMessageStrategy = errorMessageStrategy; } + public MessageChannel getSendFailureChannel() { + return sendFailureChannel; + } + + public void setSync(boolean sync) { + this.sync = sync; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java deleted file mode 100644 index 6e4386d0..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.metrics; - -import com.codahale.metrics.MetricRegistry; - -/** - * @author Timur Valiev - * @author Jim - */ -public class ConsumerGroupInstrumentation extends Instrumentation { - private MetricRegistry metricRegistry; - - public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { - super(name); - this.metricRegistry = metricRegistry; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java deleted file mode 100644 index f624b237..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.metrics; - -import static com.codahale.metrics.MetricRegistry.name; - -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; - -/** - * @author juven.xuxb - * @author Jim - */ -public class ConsumerInstrumentation extends Instrumentation { - - private final Counter totalConsumed; - private final Counter totalConsumedFailures; - private final Meter consumedPerSecond; - private final Meter consumedFailuresPerSecond; - - public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { - super(baseMetricName); - this.totalConsumed = registry - .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED)); - this.consumedPerSecond = registry - .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND)); - this.totalConsumedFailures = registry - .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES)); - this.consumedFailuresPerSecond = registry - .meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND)); - } - - public void markConsumed() { - totalConsumed.inc(); - consumedPerSecond.mark(); - } - - public void markConsumedFailure() { - totalConsumedFailures.inc(); - consumedFailuresPerSecond.mark(); - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java index c169ba83..885a183f 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java @@ -23,44 +23,44 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author Jim */ public class Instrumentation { - private final String name; - protected final AtomicBoolean started = new AtomicBoolean(false); - protected Exception startException = null; + private final String name; + protected final AtomicBoolean started = new AtomicBoolean(false); + protected Exception startException = null; - Instrumentation(String name) { - this.name = name; - } + public Instrumentation(String name) { + this.name = name; + } - public boolean isDown() { - return startException != null; - } + public boolean isDown() { + return startException != null; + } - public boolean isUp() { - return started.get(); - } + public boolean isUp() { + return started.get(); + } - public boolean isOutOfService() { - return !started.get() && startException == null; - } + public boolean isOutOfService() { + return !started.get() && startException == null; + } - public void markStartedSuccessfully() { - started.set(true); - } + public void markStartedSuccessfully() { + started.set(true); + } - public void markStartFailed(Exception e) { - started.set(false); - startException = e; - } + public void markStartFailed(Exception e) { + started.set(false); + startException = e; + } - public String getName() { - return name; - } + public String getName() { + return name; + } - public boolean isStarted() { - return started.get(); - } + public boolean isStarted() { + return started.get(); + } - public Exception getStartException() { - return startException; - } + public Exception getStartException() { + return startException; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java index a593af91..db4e5dbb 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -20,57 +20,18 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; - -import com.codahale.metrics.MetricRegistry; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer; +import java.util.concurrent.ConcurrentHashMap; /** * @author Timur Valiev * @author Jim */ public class InstrumentationManager { - private final MetricRegistry metricRegistry = new MetricRegistry(); - private final Map runtime = new HashMap<>(); - private final Map producerInstrumentations = new HashMap<>(); - private final Map consumeInstrumentations = new HashMap<>(); - private final Map consumerGroupsInstrumentations = new HashMap<>(); + + private final Map runtime = new ConcurrentHashMap<>(); private final Map healthInstrumentations = new HashMap<>(); - public ProducerInstrumentation getProducerInstrumentation(String destination) { - String key = Producer.PREFIX + destination; - ProducerInstrumentation producerInstrumentation = producerInstrumentations - .get(key); - if (producerInstrumentation == null) { - producerInstrumentations.put(key, - new ProducerInstrumentation(metricRegistry, key)); - } - return producerInstrumentations.get(key); - } - - public ConsumerInstrumentation getConsumerInstrumentation(String destination) { - String key = Consumer.PREFIX + destination; - ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations - .get(key); - if (consumerInstrumentation == null) { - consumeInstrumentations.put(key, - new ConsumerInstrumentation(metricRegistry, key)); - } - return consumeInstrumentations.get(key); - } - - public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { - String key = Consumer.GROUP_PREFIX + group; - ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations - .get(key); - if (consumerGroupInstrumentation == null) { - consumerGroupsInstrumentations.put(key, - new ConsumerGroupInstrumentation(metricRegistry, key)); - } - return consumerGroupsInstrumentations.get(key); - } - public Set getHealthInstrumentations() { return new HashSet<>(healthInstrumentations.values()); } @@ -79,11 +40,12 @@ public class InstrumentationManager { healthInstrumentations.put(instrumentation.getName(), instrumentation); } + public Instrumentation getHealthInstrumentation(String key) { + return healthInstrumentations.get(key); + } + public Map getRuntime() { return runtime; } - public MetricRegistry getMetricRegistry() { - return metricRegistry; - } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java deleted file mode 100644 index 1ede7802..00000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.metrics; - -import static com.codahale.metrics.MetricRegistry.name; - -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; - -/** - * @author juven.xuxb - * @author Jim - */ -public class ProducerInstrumentation extends Instrumentation { - - private final Counter totalSent; - private final Counter totalSentFailures; - private final Meter sentPerSecond; - private final Meter sentFailuresPerSecond; - - public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { - super(baseMetricName); - - this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT)); - this.totalSentFailures = registry - .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES)); - this.sentPerSecond = registry - .meter(name(baseMetricName, Producer.SENT_PER_SECOND)); - this.sentFailuresPerSecond = registry - .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND)); - } - - public void markSent() { - totalSent.inc(); - sentPerSecond.mark(); - } - - public void markSentFailure() { - totalSentFailures.inc(); - sentFailuresPerSecond.mark(); - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index f2c028ec..b6d0e9f8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; /** * @author Timur Valiev @@ -25,24 +26,24 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") public class RocketMQBinderConfigurationProperties { - private String namesrvAddr = "127.0.0.1:9876"; + private String namesrvAddr = RocketMQBinderConstants.DEFAULT_NAME_SERVER; - private String logLevel = "ERROR"; + private String logLevel = "ERROR"; - public String getNamesrvAddr() { - return namesrvAddr; - } + public String getNamesrvAddr() { + return namesrvAddr; + } - public void setNamesrvAddr(String namesrvAddr) { - this.namesrvAddr = namesrvAddr; - } + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } - public String getLogLevel() { - return logLevel; - } + public String getLogLevel() { + return logLevel; + } - public void setLogLevel(String logLevel) { - this.logLevel = logLevel; - } + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 7f757586..6cfe9b84 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -28,68 +28,93 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; */ public class RocketMQConsumerProperties { - /** - * using '||' to split tag - * {@link MQPushConsumer#subscribe(String, String)} - */ - private String tags; + /** + * using '||' to split tag {@link MQPushConsumer#subscribe(String, String)} + */ + private String tags; - /** - * {@link MQPushConsumer#subscribe(String, MessageSelector)} - * {@link MessageSelector#bySql(String)} - */ - private String sql; + /** + * {@link MQPushConsumer#subscribe(String, MessageSelector)} + * {@link MessageSelector#bySql(String)} + */ + private String sql; - /** - * {@link MessageModel#BROADCASTING} - */ - private Boolean broadcasting = false; + /** + * {@link MessageModel#BROADCASTING} + */ + private Boolean broadcasting = false; - /** - * if orderly is true, using {@link MessageListenerOrderly} - * else if orderly if false, using {@link MessageListenerConcurrently} - */ - private Boolean orderly = false; + /** + * if orderly is true, using {@link MessageListenerOrderly} else if orderly if false, + * using {@link MessageListenerConcurrently} + */ + private Boolean orderly = false; - private Boolean enabled = true; + /** + * for concurrently listener. message consume retry strategy + */ + private int delayLevelWhenNextConsume = 0; - public String getTags() { - return tags; - } + /** + * for orderly listener. next retry delay time + */ + private long suspendCurrentQueueTimeMillis = 1000; - public void setTags(String tags) { - this.tags = tags; - } + private Boolean enabled = true; - public String getSql() { - return sql; - } + public String getTags() { + return tags; + } - public void setSql(String sql) { - this.sql = sql; - } + public void setTags(String tags) { + this.tags = tags; + } - public Boolean getOrderly() { - return orderly; - } + public String getSql() { + return sql; + } - public void setOrderly(Boolean orderly) { - this.orderly = orderly; - } + public void setSql(String sql) { + this.sql = sql; + } - public Boolean getEnabled() { - return enabled; - } + public Boolean getOrderly() { + return orderly; + } - public void setEnabled(Boolean enabled) { - this.enabled = enabled; - } + public void setOrderly(Boolean orderly) { + this.orderly = orderly; + } - public Boolean getBroadcasting() { - return broadcasting; - } + public Boolean getEnabled() { + return enabled; + } - public void setBroadcasting(Boolean broadcasting) { - this.broadcasting = broadcasting; - } + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Boolean getBroadcasting() { + return broadcasting; + } + + public void setBroadcasting(Boolean broadcasting) { + this.broadcasting = broadcasting; + } + + public int getDelayLevelWhenNextConsume() { + return delayLevelWhenNextConsume; + } + + public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + } + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 1a05ad50..60af4129 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -17,8 +17,6 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.TransactionCheckListener; /** * @author Timur Valiev @@ -31,19 +29,46 @@ public class RocketMQProducerProperties { /** * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize} */ - private Integer maxMessageSize = 0; + private Integer maxMessageSize = 1024 * 1024 * 4; private Boolean transactional = false; - /** - * full class name of {@link LocalTransactionExecuter} - */ - private String executer; + private Boolean sync = false; + + private Boolean vipChannelEnabled = true; /** - * full class name of {@link TransactionCheckListener} + * Millis of send message timeout. */ - private String transactionCheckListener; + private int sendMessageTimeout = 3000; + + /** + * Compress message body threshold, namely, message body larger than 4k will be + * compressed on default. + */ + private int compressMessageBodyThreshold = 1024 * 4; + + /** + * Maximum number of retry to perform internally before claiming sending failure in + * synchronous mode. This may potentially cause message duplication which is up to + * application developers to resolve. + */ + private int retryTimesWhenSendFailed = 2; + + /** + *

+ * Maximum number of retry to perform internally before claiming sending failure in + * asynchronous mode. + *

+ * This may potentially cause message duplication which is up to application + * developers to resolve. + */ + private int retryTimesWhenSendAsyncFailed = 2; + + /** + * Indicate whether to retry another broker on sending failure internally. + */ + private boolean retryNextServer = false; public Boolean getEnabled() { return enabled; @@ -69,19 +94,60 @@ public class RocketMQProducerProperties { this.transactional = transactional; } - public String getExecuter() { - return executer; + public Boolean getSync() { + return sync; } - public void setExecuter(String executer) { - this.executer = executer; + public void setSync(Boolean sync) { + this.sync = sync; } - public String getTransactionCheckListener() { - return transactionCheckListener; + public Boolean getVipChannelEnabled() { + return vipChannelEnabled; } - public void setTransactionCheckListener(String transactionCheckListener) { - this.transactionCheckListener = transactionCheckListener; + public void setVipChannelEnabled(Boolean vipChannelEnabled) { + this.vipChannelEnabled = vipChannelEnabled; } + + public int getSendMessageTimeout() { + return sendMessageTimeout; + } + + public void setSendMessageTimeout(int sendMessageTimeout) { + this.sendMessageTimeout = sendMessageTimeout; + } + + public int getCompressMessageBodyThreshold() { + return compressMessageBodyThreshold; + } + + public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) { + this.compressMessageBodyThreshold = compressMessageBodyThreshold; + } + + public int getRetryTimesWhenSendFailed() { + return retryTimesWhenSendFailed; + } + + public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { + this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; + } + + public int getRetryTimesWhenSendAsyncFailed() { + return retryTimesWhenSendAsyncFailed; + } + + public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) { + this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; + } + + public boolean isRetryNextServer() { + return retryNextServer; + } + + public void setRetryNextServer(boolean retryNextServer) { + this.retryNextServer = retryNextServer; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories index 43b85129..89a1a8f5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories @@ -1,2 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration +org.springframework.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index 3328bb64..155902c0 100644 --- a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -22,7 +22,6 @@ import org.junit.Before; import org.junit.Test; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; -import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -36,10 +35,9 @@ public class RocketMQAutoConfigurationTests { @Before public void setUp() throws Exception { - this.context = new SpringApplicationBuilder( - RocketMQBinderEndpointAutoConfiguration.class, - RocketMQBinderAutoConfiguration.class).web(false).run( - "--spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876", + this.context = new SpringApplicationBuilder(RocketMQBinderAutoConfiguration.class) + .web(false) + .run("--spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876", "--spring.cloud.stream.bindings.output.destination=TopicOrderTest", "--spring.cloud.stream.bindings.output.content-type=application/json", "--spring.cloud.stream.bindings.input1.destination=TopicOrderTest",