1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Merge pull request #129 from fangjian0423/1.x

Refactor binder and update examples for 1.x branch
This commit is contained in:
xiaojing 2018-11-29 14:14:14 +08:00 committed by GitHub
commit 51e9ae552a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 416 additions and 335 deletions

View File

@ -25,7 +25,6 @@
<alicloud.context.version>1.0.0</alicloud.context.version> <alicloud.context.version>1.0.0</alicloud.context.version>
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version> <aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
<rocketmq.version>4.3.1</rocketmq.version> <rocketmq.version>4.3.1</rocketmq.version>
<metrics.core>3.2.6</metrics.core>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -221,13 +220,6 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- Third dependencies -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.core}</version>
</dependency>
<!-- Testing Dependencies --> <!-- Testing Dependencies -->

View File

@ -28,6 +28,10 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -6,13 +6,21 @@
[RocketMQ](https://rocketmq.apache.org/) 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。 [RocketMQ](https://rocketmq.apache.org/) 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
在说明RocketMQ的示例之前我们先了解一下 Spring Cloud Stream 中的Binder和Binding概念 在说明RocketMQ的示例之前我们先了解一下 Spring Cloud Stream。
Binder: 跟外部消息中间件集成的组件用来创建Binding各消息中间件都有自己的Binder实现。 这是官方对Spring Cloud Stream的一段介绍
Spring Cloud Stream是一个用于构建基于消息的微服务应用框架。它基于SpringBoot来创建具有生产级别的单机Spring应用并且使用 `Spring Integration` 与Broker进行连接。
Spring Cloud Stream提供了消息中间件配置的统一抽象推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有2个概念Binder 和 Binding。
* Binder: 跟外部消息中间件集成的组件用来创建Binding各消息中间件都有自己的Binder实现。
比如 `Kafka` 的实现 `KafkaMessageChannelBinder` `RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder` 比如 `Kafka` 的实现 `KafkaMessageChannelBinder` `RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder`
Binding: 包括Input Binding和Output Binding。 * Binding: 包括Input Binding和Output Binding。
Binding在消息中间件与应用程序提供的Provider和Consumer之间提供了一个桥梁实现了开发者只需使用应用程序的Provider或Consumer生产或消费数据即可屏蔽了开发者与底层消息中间件的接触。 Binding在消息中间件与应用程序提供的Provider和Consumer之间提供了一个桥梁实现了开发者只需使用应用程序的Provider或Consumer生产或消费数据即可屏蔽了开发者与底层消息中间件的接触。
@ -110,8 +118,9 @@ server.port=28081
使用2个input binding订阅数据。 使用2个input binding订阅数据。
input1: 订阅topic为test-topic的消息顺序消费所有消息(顺序消费的前提是所有消息都在一个MessageQueue中) * input1订阅topic为test-topic的消息顺序消费所有消息(顺序消费的前提是所有消息都在一个MessageQueue中)
input2: 订阅topic为test-topic的消息异步消费tags为tagStr的消息Consumer端线程池个数为20
* input2订阅topic为test-topic的消息异步消费tags为tagStr的消息Consumer端线程池个数为20
配置信息如下: 配置信息如下:
@ -249,6 +258,14 @@ Spring Boot 1.x 可以通过访问 http://127.0.0.1:28081/rocketmq_binder 来查
} }
``` ```
注意要想查看统计数据需要在pom里加上 [metrics-core依赖](https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core) 。如若不加endpoint将会显示warning信息而不会显示统计信息
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```
## More ## More
RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

View File

@ -27,5 +27,4 @@ spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
server.port=28081 server.port=28081
management.security.enabled=false
management.endpoints.web.exposure.include=*

View File

@ -21,13 +21,15 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>metrics-core</artifactId> <artifactId>rocketmq-client</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>rocketmq-client</artifactId> <artifactId>metrics-core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -21,6 +21,8 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/ */
public interface RocketMQBinderConstants { public interface RocketMQBinderConstants {
String ENDPOINT_ID = "rocketmq_binder";
/** /**
* Header key * Header key
*/ */
@ -33,10 +35,27 @@ public interface RocketMQBinderConstants {
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/** /**
* Instrumentation key * Instrumentation
*/ */
String LASTSEND_TIMESTAMP = "lastSend.timestamp"; String LASTSEND_TIMESTAMP = "lastSend.timestamp";
String ENDPOINT_ID = "rocketmq_binder"; interface Metrics {
interface Producer {
String PREFIX = "scs-rocketmq.producer.";
String TOTAL_SENT = "totalSent";
String TOTAL_SENT_FAILURES = "totalSentFailures";
String SENT_PER_SECOND = "sentPerSecond";
String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond";
}
interface Consumer {
String GROUP_PREFIX = "scs-rocketmq.consumerGroup.";
String PREFIX = "scs-rocketmq.consumer.";
String TOTAL_CONSUMED = "totalConsumed";
String CONSUMED_PER_SECOND = "consumedPerSecond";
String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures";
String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond";
}
}
} }

View File

@ -42,76 +42,82 @@ import org.springframework.messaging.MessageHandler;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQMessageChannelBinder extends public class RocketMQMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>, AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner> implements
implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class); private static final Logger logger = LoggerFactory
.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQTopicProvisioner rocketTopicProvisioner; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final InstrumentationManager instrumentationManager;
private final InstrumentationManager instrumentationManager; private final ConsumersManager consumersManager;
private final ConsumersManager consumersManager;
public RocketMQMessageChannelBinder(ConsumersManager consumersManager, public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQTopicProvisioner provisioningProvider, RocketMQTopicProvisioner provisioningProvider,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
super(true, null, provisioningProvider); super(true, null, provisioningProvider);
this.consumersManager = consumersManager; this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;
this.rocketTopicProvisioner = provisioningProvider; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.instrumentationManager = instrumentationManager;
this.instrumentationManager = instrumentationManager; }
}
@Override @Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination, protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
producerProperties, MessageChannel errorChannel) throws Exception {
MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) {
if (producerProperties.getExtension().getEnabled()) { return new RocketMQMessageHandler(destination.getName(),
return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(), producerProperties.getExtension(),
rocketBinderConfigurationProperties, instrumentationManager); rocketBinderConfigurationProperties, instrumentationManager);
} else { }
throw new RuntimeException( else {
"Binding for channel " + destination.getName() + "has been disabled, message can't be delivered"); throw new RuntimeException("Binding for channel " + destination.getName()
} + " has been disabled, message can't be delivered");
} }
}
@Override @Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
ExtendedConsumerProperties<RocketMQConsumerProperties> String group,
consumerProperties) ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties)
throws Exception { throws Exception {
if (group == null || "".equals(group)) { if (group == null || "".equals(group)) {
throw new RuntimeException("'group' must be configured for channel + " + destination.getName()); throw new RuntimeException(
} "'group' must be configured for channel + " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager, RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
consumerProperties, destination.getName(), group, instrumentationManager); consumersManager, consumerProperties, destination.getName(), group,
instrumentationManager);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
consumerProperties); group, consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) { if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties)); rocketInboundChannelAdapter
rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); .setRetryTemplate(buildRetryTemplate(consumerProperties));
} else { rocketInboundChannelAdapter
rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel()); .setRecoveryCallback(errorInfrastructure.getRecoverer());
} }
else {
rocketInboundChannelAdapter
.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return rocketInboundChannelAdapter; return rocketInboundChannelAdapter;
} }
@Override @Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName); return extendedBindingProperties.getExtendedConsumerProperties(channelName);
} }
@Override @Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName); return extendedBindingProperties.getExtendedProducerProperties(channelName);
} }
} }

View File

@ -20,11 +20,10 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint; import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.codahale.metrics.MetricRegistry;
/** /**
* @author Timur Valiev * @author Timur Valiev
@ -32,8 +31,8 @@ import com.codahale.metrics.MetricRegistry;
*/ */
public class RocketMQBinderEndpoint extends AbstractEndpoint<Map<String, Object>> { public class RocketMQBinderEndpoint extends AbstractEndpoint<Map<String, Object>> {
private MetricRegistry metricRegistry = new MetricRegistry(); @Autowired(required = false)
private Map<String, Object> runtime = new ConcurrentHashMap<>(); private InstrumentationManager instrumentationManager;
public RocketMQBinderEndpoint() { public RocketMQBinderEndpoint() {
super(ENDPOINT_ID); super(ENDPOINT_ID);
@ -42,17 +41,15 @@ public class RocketMQBinderEndpoint extends AbstractEndpoint<Map<String, Object>
@Override @Override
public Map<String, Object> invoke() { public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
result.put("metrics", metricRegistry().getMetrics()); if (instrumentationManager != null) {
result.put("runtime", runtime()); result.put("metrics", instrumentationManager.getMetricRegistry());
result.put("runtime", instrumentationManager.getRuntime());
}
else {
result.put("warning",
"please add metrics-core dependency, we use it for metrics");
}
return result; return result;
} }
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public Map<String, Object> runtime() {
return runtime;
}
} }

View File

@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator; package org.springframework.cloud.stream.binder.rocketmq.actuator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
@ -27,40 +28,45 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
*/ */
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
private final InstrumentationManager instrumentationManager; @Autowired(required = false)
private InstrumentationManager instrumentationManager;
public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
@Override @Override
protected void doHealthCheck(Health.Builder builder) throws Exception { protected void doHealthCheck(Health.Builder builder) throws Exception {
int upCount = 0, outOfServiceCount = 0; int upCount = 0, outOfServiceCount = 0;
for (Instrumentation instrumentation : instrumentationManager if (instrumentationManager != null) {
.getHealthInstrumentations()) { for (Instrumentation instrumentation : instrumentationManager
if (instrumentation.isUp()) { .getHealthInstrumentations()) {
upCount++; if (instrumentation.isUp()) {
upCount++;
}
else if (instrumentation.isOutOfService()) {
upCount++;
}
} }
else if (instrumentation.isOutOfService()) { if (upCount == instrumentationManager.getHealthInstrumentations().size()) {
upCount++; builder.up();
return;
} }
} else if (outOfServiceCount == instrumentationManager
if (upCount == instrumentationManager.getHealthInstrumentations().size()) { .getHealthInstrumentations().size()) {
builder.up(); builder.outOfService();
return; return;
} }
else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations() builder.down();
.size()) {
builder.outOfService();
return;
}
builder.down();
for (Instrumentation instrumentation : instrumentationManager for (Instrumentation instrumentation : instrumentationManager
.getHealthInstrumentations()) { .getHealthInstrumentations()) {
if (!instrumentation.isStarted()) { if (!instrumentation.isStarted()) {
builder.withException(instrumentation.getStartException()); builder.withException(instrumentation.getStartException());
}
} }
} }
else {
builder.down();
builder.withDetail("warning",
"please add metrics-core dependency, we use it for metrics");
}
} }
} }

View File

@ -33,38 +33,46 @@ import org.springframework.context.annotation.Configuration;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@Configuration @Configuration
@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class}) @EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
RocketMQExtendedBindingProperties.class })
public class RocketMQBinderAutoConfiguration { public class RocketMQBinderAutoConfiguration {
private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired @Autowired(required = false)
public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties, private InstrumentationManager instrumentationManager;
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
}
@Bean @Autowired
public RocketMQTopicProvisioner provisioningProvider() { public RocketMQBinderAutoConfiguration(
return new RocketMQTopicProvisioner(); RocketMQExtendedBindingProperties extendedBindingProperties,
} RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL,
this.rocketBinderConfigurationProperties.getLogLevel());
}
@Bean @Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, public RocketMQTopicProvisioner provisioningProvider() {
InstrumentationManager instrumentationManager, return new RocketMQTopicProvisioner();
ConsumersManager consumersManager) { }
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean @Bean
public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) { public RocketMQMessageChannelBinder rocketMessageChannelBinder(
return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties); RocketMQTopicProvisioner provisioningProvider,
} ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
consumersManager, extendedBindingProperties, provisioningProvider,
rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean
public ConsumersManager consumersManager() {
return new ConsumersManager(instrumentationManager,
rocketBinderConfigurationProperties);
}
} }

View File

@ -17,7 +17,9 @@
package org.springframework.cloud.stream.binder.rocketmq.config; package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration; import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration;
import org.springframework.boot.actuate.endpoint.Endpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@ -29,6 +31,7 @@ import org.springframework.context.annotation.Configuration;
*/ */
@Configuration @Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class) @AutoConfigureAfter(EndpointAutoConfiguration.class)
@ConditionalOnClass(Endpoint.class)
public class RocketMQBinderEndpointAutoConfiguration { public class RocketMQBinderEndpointAutoConfiguration {
@Bean @Bean
@ -37,16 +40,14 @@ public class RocketMQBinderEndpointAutoConfiguration {
} }
@Bean @Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator( public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
InstrumentationManager instrumentationManager) { return new RocketMQBinderHealthIndicator();
return new RocketMQBinderHealthIndicator(instrumentationManager);
} }
@Bean @Bean
public InstrumentationManager instrumentationManager( @ConditionalOnClass(name = "com.codahale.metrics.Counter")
RocketMQBinderEndpoint rocketBinderEndpoint) { public InstrumentationManager instrumentationManager() {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), return new InstrumentationManager();
rocketBinderEndpoint.runtime());
} }
} }

View File

@ -38,85 +38,98 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu
*/ */
public class ConsumersManager { public class ConsumersManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>(); private InstrumentationManager instrumentationManager;
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap
= new HashMap<>();
private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
public ConsumersManager(InstrumentationManager instrumentationManager, private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { private final Map<String, Boolean> started = new HashMap<>();
this.instrumentationManager = instrumentationManager; private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>();
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic, public ConsumersManager(InstrumentationManager instrumentationManager,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) { RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties); this.instrumentationManager = instrumentationManager;
ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group); this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
instrumentationManager.addHealthInstrumentation(instrumentation); }
if (consumerGroups.containsKey(group)) { public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group,
return consumerGroups.get(group); String topic,
} ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
consumerProperties);
if (instrumentationManager != null) {
ConsumerGroupInstrumentation instrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); if (consumerGroups.containsKey(group)) {
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); return consumerGroups.get(group);
consumerGroups.put(group, consumer); }
started.put(group, false);
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
if (consumerProperties.getExtension().getBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
logger.info("RocketMQ consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumers() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
for (String group : getConsumerGroups()) { consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
start(group); consumerGroups.put(group, consumer);
} started.put(group, false);
} consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
if (consumerProperties.getExtension().getBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
logger.info("RocketMQ consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumer(String group) throws MQClientException { public synchronized void startConsumers() throws MQClientException {
start(group); for (String group : getConsumerGroups()) {
} start(group);
}
}
public synchronized void stopConsumer(String group) { public synchronized void startConsumer(String group) throws MQClientException {
stop(group); start(group);
} }
private void stop(String group) { public synchronized void stopConsumer(String group) {
if (consumerGroups.get(group) != null) { stop(group);
consumerGroups.get(group).shutdown(); }
started.put(group, false);
}
}
private synchronized void start(String group) throws MQClientException { private void stop(String group) {
if (started.get(group)) { if (consumerGroups.get(group) != null) {
return; consumerGroups.get(group).shutdown();
} started.put(group, false);
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation( }
group); }
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
try {
consumerGroups.get(group).start();
started.put(group, true);
groupInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
groupInstrumentation.markStartFailed(e);
logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() { private synchronized void start(String group) throws MQClientException {
return consumerGroups.keySet(); if (started.get(group)) {
} return;
}
ConsumerGroupInstrumentation groupInstrumentation = null;
if (instrumentationManager != null) {
groupInstrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
}
try {
consumerGroups.get(group).start();
started.put(group, true);
if (groupInstrumentation != null) {
groupInstrumentation.markStartedSuccessfully();
}
}
catch (MQClientException e) {
if (groupInstrumentation != null) {
groupInstrumentation.markStartFailed(e);
}
logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() {
return consumerGroups.keySet();
}
} }

View File

@ -62,14 +62,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private ConsumerInstrumentation consumerInstrumentation; private ConsumerInstrumentation consumerInstrumentation;
private InstrumentationManager instrumentationManager;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination; private final String destination;
private final String group; private final String group;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager; private final ConsumersManager consumersManager;
private RetryTemplate retryTemplate; private RetryTemplate retryTemplate;
@ -89,21 +89,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override @Override
protected void doStart() { protected void doStart() {
if (!consumerProperties.getExtension().getEnabled()) { if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return; return;
} }
String tags = consumerProperties == null ? null String tags = consumerProperties.getExtension().getTags();
: consumerProperties.getExtension().getTags(); Boolean isOrderly = consumerProperties.getExtension().getOrderly();
Boolean isOrderly = consumerProperties == null ? false
: consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties); destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly final CloudStreamMessageListener listener = isOrderly
? new CloudStreamMessageListenerOrderly(instrumentationManager) ? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently(instrumentationManager); : new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) { if (retryTemplate != null) {
retryTemplate.registerListener(listener); retryTemplate.registerListener(listener);
@ -116,9 +115,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
} }
} }
consumerInstrumentation = instrumentationManager if (instrumentationManager != null) {
.getConsumerInstrumentation(destination); consumerInstrumentation = instrumentationManager
instrumentationManager.addHealthInstrumentation(consumerInstrumentation); .getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
}
try { try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
@ -129,10 +130,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
consumer.subscribe(destination, consumer.subscribe(destination,
org.apache.commons.lang3.StringUtils.join(tagsSet, " || ")); org.apache.commons.lang3.StringUtils.join(tagsSet, " || "));
} }
consumerInstrumentation.markStartedSuccessfully(); if (consumerInstrumentation != null) {
consumerInstrumentation.markStartedSuccessfully();
}
} }
catch (MQClientException e) { catch (MQClientException e) {
consumerInstrumentation.markStartFailed(e); if (consumerInstrumentation != null) {
consumerInstrumentation.markStartFailed(e);
}
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
+ e.getErrorMessage(), e); + e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
@ -166,19 +171,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListener implements MessageListener, RetryListener { protected class CloudStreamMessageListener implements MessageListener, RetryListener {
private final InstrumentationManager instrumentationManager;
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
Acknowledgement consumeMessage(final List<MessageExt> msgs) { Acknowledgement consumeMessage(final List<MessageExt> msgs) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
try { try {
if (enableRetry) { if (enableRetry) {
return RocketMQInboundChannelAdapter.this.retryTemplate.execute( return RocketMQInboundChannelAdapter.this.retryTemplate
// (RetryCallback<Acknowledgement, Exception>) .execute(new RetryCallback<Acknowledgement, Exception>() {
new RetryCallback<Acknowledgement, Exception>() {
@Override @Override
public Acknowledgement doWithRetry(RetryContext context) public Acknowledgement doWithRetry(RetryContext context)
throws Exception { throws Exception {
@ -203,10 +201,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
} }
else { else {
Acknowledgement result = doSendMsgs(msgs, null); Acknowledgement result = doSendMsgs(msgs, null);
instrumentationManager if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) {
.getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.instrumentationManager
RocketMQInboundChannelAdapter.this.destination) .getConsumerInstrumentation(
.markConsumed(); RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
}
return result; return result;
} }
} }
@ -214,10 +214,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
logger.error( logger.error(
"RocketMQ Message hasn't been processed successfully. Caused by ", "RocketMQ Message hasn't been processed successfully. Caused by ",
e); e);
instrumentationManager if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) {
.getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.instrumentationManager
RocketMQInboundChannelAdapter.this.destination) .getConsumerInstrumentation(
.markConsumedFailure(); RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
}
throw new RuntimeException( throw new RuntimeException(
"RocketMQ Message hasn't been processed successfully. Caused by ", "RocketMQ Message hasn't been processed successfully. Caused by ",
e); e);
@ -254,14 +256,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override @Override
public <T, E extends Throwable> void close(RetryContext context, public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) { RetryCallback<T, E> callback, Throwable throwable) {
if (RocketMQInboundChannelAdapter.this.instrumentationManager == null) {
return;
}
if (throwable != null) { if (throwable != null) {
instrumentationManager RocketMQInboundChannelAdapter.this.instrumentationManager
.getConsumerInstrumentation( .getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination) RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure(); .markConsumedFailure();
} }
else { else {
instrumentationManager RocketMQInboundChannelAdapter.this.instrumentationManager
.getConsumerInstrumentation( .getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination) RocketMQInboundChannelAdapter.this.destination)
.markConsumed(); .markConsumed();
@ -277,11 +282,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerConcurrently protected class CloudStreamMessageListenerConcurrently
extends CloudStreamMessageListener implements MessageListenerConcurrently { extends CloudStreamMessageListener implements MessageListenerConcurrently {
public CloudStreamMessageListenerConcurrently(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { ConsumeConcurrentlyContext context) {
@ -295,11 +295,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
implements MessageListenerOrderly { implements MessageListenerOrderly {
public CloudStreamMessageListenerOrderly(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) { ConsumeOrderlyContext context) {

View File

@ -45,14 +45,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private ProducerInstrumentation producerInstrumentation; private ProducerInstrumentation producerInstrumentation;
private InstrumentationManager instrumentationManager;
private final RocketMQProducerProperties producerProperties; private final RocketMQProducerProperties producerProperties;
private final String destination; private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
protected volatile boolean running = false; protected volatile boolean running = false;
public RocketMQMessageHandler(String destination, public RocketMQMessageHandler(String destination,
@ -69,9 +69,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void start() { public void start() {
producer = new DefaultMQProducer(destination); producer = new DefaultMQProducer(destination);
producerInstrumentation = instrumentationManager if (instrumentationManager != null) {
.getProducerInstrumentation(destination); producerInstrumentation = instrumentationManager
instrumentationManager.addHealthInstrumentation(producerInstrumentation); .getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
}
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
@ -81,10 +83,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
try { try {
producer.start(); producer.start();
producerInstrumentation.markStartedSuccessfully(); if (producerInstrumentation != null) {
producerInstrumentation.markStartedSuccessfully();
}
} }
catch (MQClientException e) { catch (MQClientException e) {
producerInstrumentation.markStartFailed(e); if (producerInstrumentation != null) {
producerInstrumentation.markStartFailed(e);
}
logger.error( logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); "RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e); throw new MessagingException(e.getMessage(), e);
@ -142,14 +148,18 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
sendRes); sendRes);
} }
instrumentationManager.getRuntime().put( if (instrumentationManager != null) {
RocketMQBinderConstants.LASTSEND_TIMESTAMP, instrumentationManager.getRuntime().put(
System.currentTimeMillis()); RocketMQBinderConstants.LASTSEND_TIMESTAMP,
producerInstrumentation.markSent(); System.currentTimeMillis());
producerInstrumentation.markSent();
}
} }
catch (MQClientException | RemotingException | MQBrokerException catch (MQClientException | RemotingException | MQBrokerException
| InterruptedException | UnsupportedOperationException e) { | InterruptedException | UnsupportedOperationException e) {
producerInstrumentation.markSentFailure(); if (producerInstrumentation != null) {
producerInstrumentation.markSentFailure();
}
logger.error( logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); "RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e); throw new MessagingException(e.getMessage(), e);

View File

@ -16,38 +16,44 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics; package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/** /**
* @author juven.xuxb * @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class ConsumerInstrumentation extends Instrumentation { public class ConsumerInstrumentation extends Instrumentation {
private final Counter totalConsumed; private final Counter totalConsumed;
private final Counter totalConsumedFailures; private final Counter totalConsumedFailures;
private final Meter consumedPerSecond; private final Meter consumedPerSecond;
private final Meter consumedFailuresPerSecond; private final Meter consumedFailuresPerSecond;
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName); super(baseMetricName);
this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed")); this.totalConsumed = registry
this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond")); .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED));
this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures")); this.consumedPerSecond = registry
this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond")); .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND));
} this.totalConsumedFailures = registry
.counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES));
this.consumedFailuresPerSecond = registry
.meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND));
}
public void markConsumed() { public void markConsumed() {
totalConsumed.inc(); totalConsumed.inc();
consumedPerSecond.mark(); consumedPerSecond.mark();
} }
public void markConsumedFailure() { public void markConsumedFailure() {
totalConsumedFailures.inc(); totalConsumedFailures.inc();
consumedFailuresPerSecond.mark(); consumedFailuresPerSecond.mark();
} }
} }

View File

@ -22,28 +22,24 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class InstrumentationManager { public class InstrumentationManager {
private final MetricRegistry metricRegistry; private final MetricRegistry metricRegistry = new MetricRegistry();
private final Map<String, Object> runtime; private final Map<String, Object> runtime = new HashMap<>();
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>(); private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>(); private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>(); private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>(); private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public InstrumentationManager(MetricRegistry metricRegistry,
Map<String, Object> runtime) {
this.metricRegistry = metricRegistry;
this.runtime = runtime;
}
public ProducerInstrumentation getProducerInstrumentation(String destination) { public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = "scs-rocketmq.producer." + destination; String key = Producer.PREFIX + destination;
ProducerInstrumentation producerInstrumentation = producerInstrumentations ProducerInstrumentation producerInstrumentation = producerInstrumentations
.get(key); .get(key);
if (producerInstrumentation == null) { if (producerInstrumentation == null) {
@ -54,7 +50,7 @@ public class InstrumentationManager {
} }
public ConsumerInstrumentation getConsumerInstrumentation(String destination) { public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
String key = "scs-rocketmq.consumer." + destination; String key = Consumer.PREFIX + destination;
ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations
.get(key); .get(key);
if (consumerInstrumentation == null) { if (consumerInstrumentation == null) {
@ -65,7 +61,7 @@ public class InstrumentationManager {
} }
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
String key = "scs-rocketmq.consumerGroup." + group; String key = Consumer.GROUP_PREFIX + group;
ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations
.get(key); .get(key);
if (consumerGroupInstrumentation == null) { if (consumerGroupInstrumentation == null) {
@ -86,4 +82,8 @@ public class InstrumentationManager {
public Map<String, Object> getRuntime() { public Map<String, Object> getRuntime() {
return runtime; return runtime;
} }
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
} }

View File

@ -16,38 +16,44 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics; package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/** /**
* @author juven.xuxb * @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class ProducerInstrumentation extends Instrumentation { public class ProducerInstrumentation extends Instrumentation {
private final Counter totalSent; private final Counter totalSent;
private final Counter totalSentFailures; private final Counter totalSentFailures;
private final Meter sentPerSecond; private final Meter sentPerSecond;
private final Meter sentFailuresPerSecond; private final Meter sentFailuresPerSecond;
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName); super(baseMetricName);
this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
}
public void markSent() { this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
totalSent.inc(); this.totalSentFailures = registry
sentPerSecond.mark(); .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
} this.sentPerSecond = registry
.meter(name(baseMetricName, Producer.SENT_PER_SECOND));
this.sentFailuresPerSecond = registry
.meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
}
public void markSentFailure() { public void markSent() {
totalSentFailures.inc(); totalSent.inc();
sentFailuresPerSecond.mark(); sentPerSecond.mark();
} }
public void markSentFailure() {
totalSentFailures.inc();
sentFailuresPerSecond.mark();
}
} }