diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 428906b7..356418ff 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -25,7 +25,6 @@ 1.0.0 2.16.0 4.3.1 - 3.2.6 @@ -221,13 +220,6 @@ ${project.version} - - - io.dropwizard.metrics - metrics-core - ${metrics.core} - - diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml index e8800187..9e33978a 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml @@ -28,6 +28,10 @@ org.springframework.boot spring-boot-starter-actuator + + io.dropwizard.metrics + metrics-core + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md index b00bfbc3..1156b81f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md +++ b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md @@ -6,13 +6,21 @@ [RocketMQ](https://rocketmq.apache.org/) 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。 -在说明RocketMQ的示例之前,我们先了解一下 Spring Cloud Stream 中的Binder和Binding概念。 +在说明RocketMQ的示例之前,我们先了解一下 Spring Cloud Stream。 -Binder: 跟外部消息中间件集成的组件,用来创建Binding,各消息中间件都有自己的Binder实现。 +这是官方对Spring Cloud Stream的一段介绍: + +Spring Cloud Stream是一个用于构建基于消息的微服务应用框架。它基于SpringBoot来创建具有生产级别的单机Spring应用,并且使用 `Spring Integration` 与Broker进行连接。 + +Spring Cloud Stream提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。 + +Spring Cloud Stream 内部有2个概念:Binder 和 Binding。 + +* Binder: 跟外部消息中间件集成的组件,用来创建Binding,各消息中间件都有自己的Binder实现。 比如 `Kafka` 的实现 `KafkaMessageChannelBinder` ,`RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder` 。 -Binding: 包括Input Binding和Output Binding。 +* Binding: 包括Input Binding和Output Binding。 Binding在消息中间件与应用程序提供的Provider和Consumer之间提供了一个桥梁,实现了开发者只需使用应用程序的Provider或Consumer生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。 @@ -110,8 +118,9 @@ server.port=28081 使用2个input binding订阅数据。 -input1: 订阅topic为test-topic的消息,顺序消费所有消息(顺序消费的前提是所有消息都在一个MessageQueue中) -input2: 订阅topic为test-topic的消息,异步消费tags为tagStr的消息,Consumer端线程池个数为20 +* input1:订阅topic为test-topic的消息,顺序消费所有消息(顺序消费的前提是所有消息都在一个MessageQueue中) + +* input2:订阅topic为test-topic的消息,异步消费tags为tagStr的消息,Consumer端线程池个数为20 配置信息如下: @@ -249,6 +258,14 @@ Spring Boot 1.x 可以通过访问 http://127.0.0.1:28081/rocketmq_binder 来查 } ``` +注意:要想查看统计数据需要在pom里加上 [metrics-core依赖](https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core) 。如若不加,endpoint将会显示warning信息而不会显示统计信息: + +```json +{ + "warning": "please add metrics-core dependency, we use it for metrics" +} +``` + ## More RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index 80ea2d99..0ba6a54e 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -27,5 +27,4 @@ spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 server.port=28081 - -management.endpoints.web.exposure.include=* \ No newline at end of file +management.security.enabled=false \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml index 4b534d4b..7bf086c4 100644 --- a/spring-cloud-stream-binder-rocketmq/pom.xml +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -21,13 +21,15 @@ - io.dropwizard.metrics - metrics-core + org.apache.rocketmq + rocketmq-client - org.apache.rocketmq - rocketmq-client + io.dropwizard.metrics + metrics-core + provided + true diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index cb35b5a6..9c8b5221 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -21,6 +21,8 @@ package org.springframework.cloud.stream.binder.rocketmq; */ public interface RocketMQBinderConstants { + String ENDPOINT_ID = "rocketmq_binder"; + /** * Header key */ @@ -33,10 +35,27 @@ public interface RocketMQBinderConstants { String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; /** - * Instrumentation key + * Instrumentation */ String LASTSEND_TIMESTAMP = "lastSend.timestamp"; - String ENDPOINT_ID = "rocketmq_binder"; + interface Metrics { + interface Producer { + String PREFIX = "scs-rocketmq.producer."; + String TOTAL_SENT = "totalSent"; + String TOTAL_SENT_FAILURES = "totalSentFailures"; + String SENT_PER_SECOND = "sentPerSecond"; + String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond"; + } + + interface Consumer { + String GROUP_PREFIX = "scs-rocketmq.consumerGroup."; + String PREFIX = "scs-rocketmq.consumer."; + String TOTAL_CONSUMED = "totalConsumed"; + String CONSUMED_PER_SECOND = "consumedPerSecond"; + String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures"; + String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond"; + } + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 4bed86af..03009b43 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -42,76 +42,82 @@ import org.springframework.messaging.MessageHandler; * @author Jim */ public class RocketMQMessageChannelBinder extends - AbstractMessageChannelBinder, - ExtendedProducerProperties, RocketMQTopicProvisioner> - implements ExtendedPropertiesBinder { + AbstractMessageChannelBinder, ExtendedProducerProperties, RocketMQTopicProvisioner> + implements + ExtendedPropertiesBinder { - private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class); + private static final Logger logger = LoggerFactory + .getLogger(RocketMQMessageChannelBinder.class); - private final RocketMQExtendedBindingProperties extendedBindingProperties; - private final RocketMQTopicProvisioner rocketTopicProvisioner; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; + private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final InstrumentationManager instrumentationManager; + private final ConsumersManager consumersManager; - public RocketMQMessageChannelBinder(ConsumersManager consumersManager, - RocketMQExtendedBindingProperties extendedBindingProperties, - RocketMQTopicProvisioner provisioningProvider, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, - InstrumentationManager instrumentationManager) { - super(true, null, provisioningProvider); - this.consumersManager = consumersManager; - this.extendedBindingProperties = extendedBindingProperties; - this.rocketTopicProvisioner = provisioningProvider; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - this.instrumentationManager = instrumentationManager; - } + public RocketMQMessageChannelBinder(ConsumersManager consumersManager, + RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQTopicProvisioner provisioningProvider, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + InstrumentationManager instrumentationManager) { + super(true, null, provisioningProvider); + this.consumersManager = consumersManager; + this.extendedBindingProperties = extendedBindingProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.instrumentationManager = instrumentationManager; + } - @Override - protected MessageHandler createProducerMessageHandler(ProducerDestination destination, - ExtendedProducerProperties - producerProperties, - MessageChannel errorChannel) throws Exception { - if (producerProperties.getExtension().getEnabled()) { - return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(), - rocketBinderConfigurationProperties, instrumentationManager); - } else { - throw new RuntimeException( - "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered"); - } - } + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties producerProperties, + MessageChannel errorChannel) throws Exception { + if (producerProperties.getExtension().getEnabled()) { + return new RocketMQMessageHandler(destination.getName(), + producerProperties.getExtension(), + rocketBinderConfigurationProperties, instrumentationManager); + } + else { + throw new RuntimeException("Binding for channel " + destination.getName() + + " has been disabled, message can't be delivered"); + } + } - @Override - protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, - ExtendedConsumerProperties - consumerProperties) - throws Exception { - if (group == null || "".equals(group)) { - throw new RuntimeException("'group' must be configured for channel + " + destination.getName()); - } + @Override + protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, + String group, + ExtendedConsumerProperties consumerProperties) + throws Exception { + if (group == null || "".equals(group)) { + throw new RuntimeException( + "'group' must be configured for channel + " + destination.getName()); + } - RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager, - consumerProperties, destination.getName(), group, instrumentationManager); + RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( + consumersManager, consumerProperties, destination.getName(), group, + instrumentationManager); - ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, - consumerProperties); - if (consumerProperties.getMaxAttempts() > 1) { - rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties)); - rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); - } else { - rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel()); - } + ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, + group, consumerProperties); + if (consumerProperties.getMaxAttempts() > 1) { + rocketInboundChannelAdapter + .setRetryTemplate(buildRetryTemplate(consumerProperties)); + rocketInboundChannelAdapter + .setRecoveryCallback(errorInfrastructure.getRecoverer()); + } + else { + rocketInboundChannelAdapter + .setErrorChannel(errorInfrastructure.getErrorChannel()); + } - return rocketInboundChannelAdapter; - } + return rocketInboundChannelAdapter; + } - @Override - public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { - return extendedBindingProperties.getExtendedConsumerProperties(channelName); - } + @Override + public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { + return extendedBindingProperties.getExtendedConsumerProperties(channelName); + } - @Override - public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { - return extendedBindingProperties.getExtendedProducerProperties(channelName); - } + @Override + public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { + return extendedBindingProperties.getExtendedProducerProperties(channelName); + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java index 756924be..baf8e700 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java @@ -20,11 +20,10 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.endpoint.AbstractEndpoint; - -import com.codahale.metrics.MetricRegistry; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; /** * @author Timur Valiev @@ -32,8 +31,8 @@ import com.codahale.metrics.MetricRegistry; */ public class RocketMQBinderEndpoint extends AbstractEndpoint> { - private MetricRegistry metricRegistry = new MetricRegistry(); - private Map runtime = new ConcurrentHashMap<>(); + @Autowired(required = false) + private InstrumentationManager instrumentationManager; public RocketMQBinderEndpoint() { super(ENDPOINT_ID); @@ -42,17 +41,15 @@ public class RocketMQBinderEndpoint extends AbstractEndpoint @Override public Map invoke() { Map result = new HashMap<>(); - result.put("metrics", metricRegistry().getMetrics()); - result.put("runtime", runtime()); + if (instrumentationManager != null) { + result.put("metrics", instrumentationManager.getMetricRegistry()); + result.put("runtime", instrumentationManager.getRuntime()); + } + else { + result.put("warning", + "please add metrics-core dependency, we use it for metrics"); + } return result; } - public MetricRegistry metricRegistry() { - return metricRegistry; - } - - public Map runtime() { - return runtime; - } - } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java index f23463ac..cb2c2efe 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq.actuator; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; @@ -27,40 +28,45 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM */ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { - private final InstrumentationManager instrumentationManager; - - public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) { - this.instrumentationManager = instrumentationManager; - } + @Autowired(required = false) + private InstrumentationManager instrumentationManager; @Override protected void doHealthCheck(Health.Builder builder) throws Exception { int upCount = 0, outOfServiceCount = 0; - for (Instrumentation instrumentation : instrumentationManager - .getHealthInstrumentations()) { - if (instrumentation.isUp()) { - upCount++; + if (instrumentationManager != null) { + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (instrumentation.isUp()) { + upCount++; + } + else if (instrumentation.isOutOfService()) { + upCount++; + } } - else if (instrumentation.isOutOfService()) { - upCount++; + if (upCount == instrumentationManager.getHealthInstrumentations().size()) { + builder.up(); + return; } - } - if (upCount == instrumentationManager.getHealthInstrumentations().size()) { - builder.up(); - return; - } - else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations() - .size()) { - builder.outOfService(); - return; - } - builder.down(); + else if (outOfServiceCount == instrumentationManager + .getHealthInstrumentations().size()) { + builder.outOfService(); + return; + } + builder.down(); - for (Instrumentation instrumentation : instrumentationManager - .getHealthInstrumentations()) { - if (!instrumentation.isStarted()) { - builder.withException(instrumentation.getStartException()); + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (!instrumentation.isStarted()) { + builder.withException(instrumentation.getStartException()); + } } } + else { + builder.down(); + builder.withDetail("warning", + "please add metrics-core dependency, we use it for metrics"); + } + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index f2bb68cf..95abaeeb 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -33,38 +33,46 @@ import org.springframework.context.annotation.Configuration; * @author Jim */ @Configuration -@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class}) +@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, + RocketMQExtendedBindingProperties.class }) public class RocketMQBinderAutoConfiguration { - private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQExtendedBindingProperties extendedBindingProperties; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - @Autowired - public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { - this.extendedBindingProperties = extendedBindingProperties; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel()); - } + @Autowired(required = false) + private InstrumentationManager instrumentationManager; - @Bean - public RocketMQTopicProvisioner provisioningProvider() { - return new RocketMQTopicProvisioner(); - } + @Autowired + public RocketMQBinderAutoConfiguration( + RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.extendedBindingProperties = extendedBindingProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, + this.rocketBinderConfigurationProperties.getLogLevel()); + } - @Bean - public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, - InstrumentationManager instrumentationManager, - ConsumersManager consumersManager) { - RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties, - provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager); - return binder; - } + @Bean + public RocketMQTopicProvisioner provisioningProvider() { + return new RocketMQTopicProvisioner(); + } - @Bean - public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) { - return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties); - } + @Bean + public RocketMQMessageChannelBinder rocketMessageChannelBinder( + RocketMQTopicProvisioner provisioningProvider, + ConsumersManager consumersManager) { + RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( + consumersManager, extendedBindingProperties, provisioningProvider, + rocketBinderConfigurationProperties, instrumentationManager); + return binder; + } + + @Bean + public ConsumersManager consumersManager() { + return new ConsumersManager(instrumentationManager, + rocketBinderConfigurationProperties); + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java index a5386509..4b602621 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java @@ -17,7 +17,9 @@ package org.springframework.cloud.stream.binder.rocketmq.config; import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration; +import org.springframework.boot.actuate.endpoint.Endpoint; import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -29,6 +31,7 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @AutoConfigureAfter(EndpointAutoConfiguration.class) +@ConditionalOnClass(Endpoint.class) public class RocketMQBinderEndpointAutoConfiguration { @Bean @@ -37,16 +40,14 @@ public class RocketMQBinderEndpointAutoConfiguration { } @Bean - public RocketMQBinderHealthIndicator rocketBinderHealthIndicator( - InstrumentationManager instrumentationManager) { - return new RocketMQBinderHealthIndicator(instrumentationManager); + public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() { + return new RocketMQBinderHealthIndicator(); } @Bean - public InstrumentationManager instrumentationManager( - RocketMQBinderEndpoint rocketBinderEndpoint) { - return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), - rocketBinderEndpoint.runtime()); + @ConditionalOnClass(name = "com.codahale.metrics.Counter") + public InstrumentationManager instrumentationManager() { + return new InstrumentationManager(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java index 6b9445f9..c56f060c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java @@ -38,85 +38,98 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu */ public class ConsumersManager { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Map consumerGroups = new HashMap<>(); - private final Map started = new HashMap<>(); - private final Map, ExtendedConsumerProperties> propertiesMap - = new HashMap<>(); - private final InstrumentationManager instrumentationManager; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private InstrumentationManager instrumentationManager; - public ConsumersManager(InstrumentationManager instrumentationManager, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { - this.instrumentationManager = instrumentationManager; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - } + private final Map consumerGroups = new HashMap<>(); + private final Map started = new HashMap<>(); + private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>(); + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic, - ExtendedConsumerProperties consumerProperties) { - propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties); - ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(instrumentation); + public ConsumersManager(InstrumentationManager instrumentationManager, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.instrumentationManager = instrumentationManager; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + } - if (consumerGroups.containsKey(group)) { - return consumerGroups.get(group); - } + public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, + String topic, + ExtendedConsumerProperties consumerProperties) { + propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), + consumerProperties); + if (instrumentationManager != null) { + ConsumerGroupInstrumentation instrumentation = instrumentationManager + .getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(instrumentation); + } - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); - consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - consumerGroups.put(group, consumer); - started.put(group, false); - consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); - consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); - if (consumerProperties.getExtension().getBroadcasting()) { - consumer.setMessageModel(MessageModel.BROADCASTING); - } - logger.info("RocketMQ consuming for SCS group {} created", group); - return consumer; - } + if (consumerGroups.containsKey(group)) { + return consumerGroups.get(group); + } - public synchronized void startConsumers() throws MQClientException { - for (String group : getConsumerGroups()) { - start(group); - } - } + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); + consumerGroups.put(group, consumer); + started.put(group, false); + consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); + consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); + if (consumerProperties.getExtension().getBroadcasting()) { + consumer.setMessageModel(MessageModel.BROADCASTING); + } + logger.info("RocketMQ consuming for SCS group {} created", group); + return consumer; + } - public synchronized void startConsumer(String group) throws MQClientException { - start(group); - } + public synchronized void startConsumers() throws MQClientException { + for (String group : getConsumerGroups()) { + start(group); + } + } - public synchronized void stopConsumer(String group) { - stop(group); - } + public synchronized void startConsumer(String group) throws MQClientException { + start(group); + } - private void stop(String group) { - if (consumerGroups.get(group) != null) { - consumerGroups.get(group).shutdown(); - started.put(group, false); - } - } + public synchronized void stopConsumer(String group) { + stop(group); + } - private synchronized void start(String group) throws MQClientException { - if (started.get(group)) { - return; - } - ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation( - group); - instrumentationManager.addHealthInstrumentation(groupInstrumentation); - try { - consumerGroups.get(group).start(); - started.put(group, true); - groupInstrumentation.markStartedSuccessfully(); - } catch (MQClientException e) { - groupInstrumentation.markStartFailed(e); - logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); - throw e; - } - } + private void stop(String group) { + if (consumerGroups.get(group) != null) { + consumerGroups.get(group).shutdown(); + started.put(group, false); + } + } - public synchronized Set getConsumerGroups() { - return consumerGroups.keySet(); - } + private synchronized void start(String group) throws MQClientException { + if (started.get(group)) { + return; + } + ConsumerGroupInstrumentation groupInstrumentation = null; + if (instrumentationManager != null) { + groupInstrumentation = instrumentationManager + .getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(groupInstrumentation); + } + try { + consumerGroups.get(group).start(); + started.put(group, true); + if (groupInstrumentation != null) { + groupInstrumentation.markStartedSuccessfully(); + } + } + catch (MQClientException e) { + if (groupInstrumentation != null) { + groupInstrumentation.markStartFailed(e); + } + logger.error("RocketMQ Consumer hasn't been started. Caused by " + + e.getErrorMessage(), e); + throw e; + } + } + + public synchronized Set getConsumerGroups() { + return consumerGroups.keySet(); + } } - diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 8c884da8..4558eb95 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -62,14 +62,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { private ConsumerInstrumentation consumerInstrumentation; + private InstrumentationManager instrumentationManager; + private final ExtendedConsumerProperties consumerProperties; private final String destination; private final String group; - private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; private RetryTemplate retryTemplate; @@ -89,21 +89,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override protected void doStart() { - if (!consumerProperties.getExtension().getEnabled()) { + if (consumerProperties == null + || !consumerProperties.getExtension().getEnabled()) { return; } - String tags = consumerProperties == null ? null - : consumerProperties.getExtension().getTags(); - Boolean isOrderly = consumerProperties == null ? false - : consumerProperties.getExtension().getOrderly(); + String tags = consumerProperties.getExtension().getTags(); + Boolean isOrderly = consumerProperties.getExtension().getOrderly(); DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties); final CloudStreamMessageListener listener = isOrderly - ? new CloudStreamMessageListenerOrderly(instrumentationManager) - : new CloudStreamMessageListenerConcurrently(instrumentationManager); + ? new CloudStreamMessageListenerOrderly() + : new CloudStreamMessageListenerConcurrently(); if (retryTemplate != null) { retryTemplate.registerListener(listener); @@ -116,9 +115,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } } - consumerInstrumentation = instrumentationManager - .getConsumerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + if (instrumentationManager != null) { + consumerInstrumentation = instrumentationManager + .getConsumerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + } try { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { @@ -129,10 +130,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { consumer.subscribe(destination, org.apache.commons.lang3.StringUtils.join(tagsSet, " || ")); } - consumerInstrumentation.markStartedSuccessfully(); + if (consumerInstrumentation != null) { + consumerInstrumentation.markStartedSuccessfully(); + } } catch (MQClientException e) { - consumerInstrumentation.markStartFailed(e); + if (consumerInstrumentation != null) { + consumerInstrumentation.markStartFailed(e); + } logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e); throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); @@ -166,19 +171,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListener implements MessageListener, RetryListener { - private final InstrumentationManager instrumentationManager; - - CloudStreamMessageListener(InstrumentationManager instrumentationManager) { - this.instrumentationManager = instrumentationManager; - } - Acknowledgement consumeMessage(final List msgs) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; try { if (enableRetry) { - return RocketMQInboundChannelAdapter.this.retryTemplate.execute( - // (RetryCallback) - new RetryCallback() { + return RocketMQInboundChannelAdapter.this.retryTemplate + .execute(new RetryCallback() { @Override public Acknowledgement doWithRetry(RetryContext context) throws Exception { @@ -203,10 +201,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } else { Acknowledgement result = doSendMsgs(msgs, null); - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); + if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) { + RocketMQInboundChannelAdapter.this.instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + } return result; } } @@ -214,10 +214,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { logger.error( "RocketMQ Message hasn't been processed successfully. Caused by ", e); - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); + if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) { + RocketMQInboundChannelAdapter.this.instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + } throw new RuntimeException( "RocketMQ Message hasn't been processed successfully. Caused by ", e); @@ -254,14 +256,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override public void close(RetryContext context, RetryCallback callback, Throwable throwable) { + if (RocketMQInboundChannelAdapter.this.instrumentationManager == null) { + return; + } if (throwable != null) { - instrumentationManager + RocketMQInboundChannelAdapter.this.instrumentationManager .getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.destination) .markConsumedFailure(); } else { - instrumentationManager + RocketMQInboundChannelAdapter.this.instrumentationManager .getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.destination) .markConsumed(); @@ -277,11 +282,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements MessageListenerConcurrently { - public CloudStreamMessageListenerConcurrently( - InstrumentationManager instrumentationManager) { - super(instrumentationManager); - } - @Override public ConsumeConcurrentlyStatus consumeMessage(final List msgs, ConsumeConcurrentlyContext context) { @@ -295,11 +295,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly { - public CloudStreamMessageListenerOrderly( - InstrumentationManager instrumentationManager) { - super(instrumentationManager); - } - @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 88273e8c..0ab7386c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -45,14 +45,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private ProducerInstrumentation producerInstrumentation; + private InstrumentationManager instrumentationManager; + private final RocketMQProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - private final InstrumentationManager instrumentationManager; - protected volatile boolean running = false; public RocketMQMessageHandler(String destination, @@ -69,9 +69,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li public void start() { producer = new DefaultMQProducer(destination); - producerInstrumentation = instrumentationManager - .getProducerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(producerInstrumentation); + if (instrumentationManager != null) { + producerInstrumentation = instrumentationManager + .getProducerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(producerInstrumentation); + } producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); @@ -81,10 +83,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li try { producer.start(); - producerInstrumentation.markStartedSuccessfully(); + if (producerInstrumentation != null) { + producerInstrumentation.markStartedSuccessfully(); + } } catch (MQClientException e) { - producerInstrumentation.markStartFailed(e); + if (producerInstrumentation != null) { + producerInstrumentation.markStartFailed(e); + } logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); @@ -142,14 +148,18 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, sendRes); } - instrumentationManager.getRuntime().put( - RocketMQBinderConstants.LASTSEND_TIMESTAMP, - System.currentTimeMillis()); - producerInstrumentation.markSent(); + if (instrumentationManager != null) { + instrumentationManager.getRuntime().put( + RocketMQBinderConstants.LASTSEND_TIMESTAMP, + System.currentTimeMillis()); + producerInstrumentation.markSent(); + } } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedOperationException e) { - producerInstrumentation.markSentFailure(); + if (producerInstrumentation != null) { + producerInstrumentation.markSentFailure(); + } logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java index e773729f..f624b237 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java @@ -16,38 +16,44 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics; +import static com.codahale.metrics.MetricRegistry.name; + +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; + import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import static com.codahale.metrics.MetricRegistry.name; - /** * @author juven.xuxb * @author Jim */ public class ConsumerInstrumentation extends Instrumentation { - private final Counter totalConsumed; - private final Counter totalConsumedFailures; - private final Meter consumedPerSecond; - private final Meter consumedFailuresPerSecond; + private final Counter totalConsumed; + private final Counter totalConsumedFailures; + private final Meter consumedPerSecond; + private final Meter consumedFailuresPerSecond; - public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { - super(baseMetricName); - this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed")); - this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond")); - this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures")); - this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond")); - } + public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { + super(baseMetricName); + this.totalConsumed = registry + .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED)); + this.consumedPerSecond = registry + .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND)); + this.totalConsumedFailures = registry + .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES)); + this.consumedFailuresPerSecond = registry + .meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND)); + } - public void markConsumed() { - totalConsumed.inc(); - consumedPerSecond.mark(); - } + public void markConsumed() { + totalConsumed.inc(); + consumedPerSecond.mark(); + } - public void markConsumedFailure() { - totalConsumedFailures.inc(); - consumedFailuresPerSecond.mark(); - } + public void markConsumedFailure() { + totalConsumedFailures.inc(); + consumedFailuresPerSecond.mark(); + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java index dea84859..a593af91 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -22,28 +22,24 @@ import java.util.Map; import java.util.Set; import com.codahale.metrics.MetricRegistry; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer; /** * @author Timur Valiev * @author Jim */ public class InstrumentationManager { - private final MetricRegistry metricRegistry; - private final Map runtime; + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final Map runtime = new HashMap<>(); private final Map producerInstrumentations = new HashMap<>(); private final Map consumeInstrumentations = new HashMap<>(); private final Map consumerGroupsInstrumentations = new HashMap<>(); private final Map healthInstrumentations = new HashMap<>(); - public InstrumentationManager(MetricRegistry metricRegistry, - Map runtime) { - this.metricRegistry = metricRegistry; - this.runtime = runtime; - } - public ProducerInstrumentation getProducerInstrumentation(String destination) { - String key = "scs-rocketmq.producer." + destination; + String key = Producer.PREFIX + destination; ProducerInstrumentation producerInstrumentation = producerInstrumentations .get(key); if (producerInstrumentation == null) { @@ -54,7 +50,7 @@ public class InstrumentationManager { } public ConsumerInstrumentation getConsumerInstrumentation(String destination) { - String key = "scs-rocketmq.consumer." + destination; + String key = Consumer.PREFIX + destination; ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations .get(key); if (consumerInstrumentation == null) { @@ -65,7 +61,7 @@ public class InstrumentationManager { } public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { - String key = "scs-rocketmq.consumerGroup." + group; + String key = Consumer.GROUP_PREFIX + group; ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations .get(key); if (consumerGroupInstrumentation == null) { @@ -86,4 +82,8 @@ public class InstrumentationManager { public Map getRuntime() { return runtime; } + + public MetricRegistry getMetricRegistry() { + return metricRegistry; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java index 6fc6daf9..1ede7802 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java @@ -16,38 +16,44 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics; +import static com.codahale.metrics.MetricRegistry.name; + +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer; + import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import static com.codahale.metrics.MetricRegistry.name; - /** * @author juven.xuxb * @author Jim */ public class ProducerInstrumentation extends Instrumentation { - private final Counter totalSent; - private final Counter totalSentFailures; - private final Meter sentPerSecond; - private final Meter sentFailuresPerSecond; + private final Counter totalSent; + private final Counter totalSentFailures; + private final Meter sentPerSecond; + private final Meter sentFailuresPerSecond; - public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { - super(baseMetricName); - this.totalSent = registry.counter(name(baseMetricName, "totalSent")); - this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures")); - this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond")); - this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond")); - } + public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { + super(baseMetricName); - public void markSent() { - totalSent.inc(); - sentPerSecond.mark(); - } + this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT)); + this.totalSentFailures = registry + .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES)); + this.sentPerSecond = registry + .meter(name(baseMetricName, Producer.SENT_PER_SECOND)); + this.sentFailuresPerSecond = registry + .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND)); + } - public void markSentFailure() { - totalSentFailures.inc(); - sentFailuresPerSecond.mark(); - } + public void markSent() { + totalSent.inc(); + sentPerSecond.mark(); + } + + public void markSentFailure() { + totalSentFailures.inc(); + sentFailuresPerSecond.mark(); + } }