1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Merge branch 'master' into patch-2

This commit is contained in:
xiaojing
2018-11-29 11:04:49 +08:00
committed by GitHub
21 changed files with 248 additions and 193 deletions

View File

@@ -100,7 +100,6 @@
<module>spring-cloud-alicloud-oss</module> <module>spring-cloud-alicloud-oss</module>
<module>spring-cloud-alicloud-acm</module> <module>spring-cloud-alicloud-acm</module>
<module>spring-cloud-alicloud-ans</module> <module>spring-cloud-alicloud-ans</module>
<module>spring-cloud-starter-bus-rocketmq</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>

View File

@@ -12,7 +12,7 @@ Spring Cloud Alibaba Nacos Config 是 Config Server 和 Client 的替代方案
1、启动Nacos Server。启动方式可见 https://nacos.io/zh-cn/docs/quick-start.html[Nacos 官网] 1、启动Nacos Server。启动方式可见 https://nacos.io/zh-cn/docs/quick-start.html[Nacos 官网]
2、启动好Nacos之后在Nacos添加如下的配置。注意dataid是以 properties为扩展名默认的文件扩展名方式。如下所示 2、启动好Nacos之后在Nacos添加如下的配置
[source,subs="normal"] [source,subs="normal"]
---- ----
@@ -20,15 +20,17 @@ Data ID: nacos-config.properties
Group : DEFAULT_GROUP Group : DEFAULT_GROUP
配置格式: TEXT 配置格式: Properties
配置内容: user.name: nacos-config-properties 配置内容: user.name=nacos-config-properties
user.age: 90 user.age=90
---- ----
NOTE: 注意dataid是以 properties(默认的文件扩展名方式)为扩展名。
==== 客户端使用方式 ==== 客户端使用方式
为了能够在应用程序使用Nacos实现应用的外部化配置在构建应用的同时添加一个Spring Boot Starter org.springframework.cloud:spring-cloud-starter-alibaba-nacos-config。以下是一个基础的maven 依赖配置: 为了能够在应用程序使用 Nacos实现应用的外部化配置在构建应用的同时添加一个Spring Boot Starter org.springframework.cloud:spring-cloud-starter-alibaba-nacos-config。以下是一个基本的 maven 依赖配置:
[source,xml] [source,xml]
---- ----
@@ -95,7 +97,7 @@ public class ProviderApplication {
} }
---- ----
spring-cloud-starter-alibaba-nacos-config 对于 Nacos 服务端的基础配置没有默认值,因此在运行此Example 之前, 必须使用 bootstrap.properties 配置文件来配置Nacos Server地址例如 在运行此 Example 之前, 必须使用 bootstrap.properties 配置文件来配置Nacos Server 地址,例如:
.bootstrap.properties .bootstrap.properties
[source,properties] [source,properties]
@@ -108,7 +110,7 @@ NOTE: 注意当你使用域名的方式来访问 Nacos 时,`spring.cloud.nacos
例如 Nacos 的域名为abc.com.nacos监听的端口为 80则 `spring.cloud.nacos.config.server-addr=abc.com.nacos:80`。 例如 Nacos 的域名为abc.com.nacos监听的端口为 80则 `spring.cloud.nacos.config.server-addr=abc.com.nacos:80`。
注意 80 端口不能省略。 注意 80 端口不能省略。
启动这个Example可以在控制台看到打印出的值正是在Nacos上预先配置好的值。 启动这个 Example可以看到如下输出结果:
[source,subs="normal"] [source,subs="normal"]
---- ----
@@ -120,7 +122,7 @@ user name :nacos-config-properties; age: 90
=== 基于 dataid 为 yaml 的文件扩展名配置方式 === 基于 dataid 为 yaml 的文件扩展名配置方式
spring-cloud-starter-alibaba-nacos-config 对于 yaml 格式也是完美支持。这个时候只需要完成以下两步: spring-cloud-starter-alibaba-nacos-config 对于 yaml 格式也是完美支持。这个时候只需要完成以下两步:
1、在应用的 bootstrap.properties 配置文件中显示的声明 dataid 文件扩展名。如下所示 1、在应用的 bootstrap.properties 配置文件中显示的声明 dataid 文件扩展名。如下所示
@@ -144,7 +146,7 @@ Group : DEFAULT_GROUP
user.age: 68 user.age: 68
---- ----
这两步完成后,重启测试程序,可以在控制台看到输出是以dataid为 nacos-config.yaml 配置的值 这两步完成后,重启测试程序,可以看到如下输出结果
[source,subs="normal"] [source,subs="normal"]
---- ----
@@ -165,10 +167,9 @@ public class ProviderApplication {
public static void main(String[] args) { public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(ProviderApplication.class, args); ConfigurableApplicationContext applicationContext = SpringApplication.run(ProviderApplication.class, args);
while(true) { while(true) {
//当动态配置刷新时,会更新到 Enviroment中因此这里每隔一秒中从Enviroment中获取配置
String userName = applicationContext.getEnvironment().getProperty("user.name"); String userName = applicationContext.getEnvironment().getProperty("user.name");
String userAge = applicationContext.getEnvironment().getProperty("user.age"); String userAge = applicationContext.getEnvironment().getProperty("user.age");
//获取当前部署的环境
String currentEnv = applicationContext.getEnvironment().getProperty("current.env");
System.err.println("user name :" + userName + "; age: " + userAge); System.err.println("user name :" + userName + "; age: " + userAge);
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
@@ -195,13 +196,15 @@ NOTE: 你可以通过配置 `spring.cloud.nacos.config.refresh.enabled=false`
=== 可支持profile粒度的配置 === 可支持profile粒度的配置
spring-cloud-starter-alibaba-nacos-config 在加载配置的时候,不仅仅加载了以 dataid 为 `${spring.application.name}.${file-extension:properties}` 为前缀的基础配置还加载了dataid为 `${spring.application.name}-${profile}.${file-extension:properties}` 的基础配置。在日常开发中如果遇到多套环境下的不同配置,可以打开 Spring 自带的配置功能 spring-cloud-starter-alibaba-nacos-config 在加载配置的时候,不仅仅加载了以 dataid 为 `${spring.application.name}.${file-extension:properties}` 为前缀的基础配置还加载了dataid为 `${spring.application.name}-${profile}.${file-extension:properties}` 的基础配置。在日常开发中如果遇到多套环境下的不同配置,可以通过Spring 提供的 `${spring.profiles.active}` 这个配置项来配置
[source,properties] [source,properties]
---- ----
spring.profiles.active=develop spring.profiles.active=develop
---- ----
NOTE: ${spring.profiles.active} 当通过配置文件来指定时必须放在 bootstrap.properties 文件中。
Nacos 上新增一个dataid为nacos-config-develop.yaml的基础配置如下所示 Nacos 上新增一个dataid为nacos-config-develop.yaml的基础配置如下所示
[source,subs="normal"] [source,subs="normal"]
@@ -250,7 +253,7 @@ in develop-evn enviroment; user name :nacos-config-yaml-update; age: 68
spring.profiles.active=product spring.profiles.active=product
---- ----
同时生产环境上Nacos需要添加对应 dataid 的基础配置。例如,在生成环境下的 Naocs 添加了dataid为nacos-config-product.yaml的配置 同时生产环境上 Nacos 需要添加对应 dataid 的基础配置。例如,在生成环境下的 Naocs 添加了dataid为nacos-config-product.yaml的配置
[source,subs="normal"] [source,subs="normal"]
---- ----
@@ -271,6 +274,7 @@ in product-env enviroment; user name :nacos-config-yaml-update; age: 68
2018-11-02 15:42:14.628 INFO 33024 --- [Thread-11] ConfigServletWebServerApplicationContext : Closing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@6aa8e115: startup date [Fri Nov 02 15:42:03 CST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@19bb07ed 2018-11-02 15:42:14.628 INFO 33024 --- [Thread-11] ConfigServletWebServerApplicationContext : Closing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@6aa8e115: startup date [Fri Nov 02 15:42:03 CST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@19bb07ed
---- ----
NOTE: 此案例中我们通过 `spring.profiles.active=****` 的方式写死在配置文件中,而在真正的项目实施过程中这个变量的值是需要不同环境而有不同的值。这个时候通常的做法是通过 `-Dspring.profiles.active=****` 参数指定其配置来达到环境间灵活的切换。 NOTE: 此案例中我们通过 `spring.profiles.active=****` 的方式写死在配置文件中,而在真正的项目实施过程中这个变量的值是需要不同环境而有不同的值。这个时候通常的做法是通过 `-Dspring.profiles.active=****` 参数指定其配置来达到环境间灵活的切换。
=== 支持自定义 namespace 的配置 === 支持自定义 namespace 的配置

View File

@@ -31,6 +31,12 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -122,12 +122,12 @@ spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=application/json spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=application/json spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2 spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
@@ -195,7 +195,7 @@ Spring Boot 应用支持通过 Endpoint 来暴露相关信息RocketMQ Stream
* Spring Boot 1.x 中添加配置 `management.security.enabled=false` * Spring Boot 1.x 中添加配置 `management.security.enabled=false`
* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*` * Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`
Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq-binder 来查看 RocketMQ Binder Endpoint 的信息。Spring Boot 2.x 可以通过访问 http://127.0.0.1:28081/acutator/rocketmq-binder 来访问。 Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq_binder 来查看 RocketMQ Binder Endpoint 的信息。Spring Boot 2.x 可以通过访问 http://127.0.0.1:28081/actuator/rocketmq-binder 来访问。
这里会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。 这里会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。
@@ -249,6 +249,14 @@ Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq-binder 来查
} }
``` ```
注意要想查看统计数据需要在pom里加上 [metrics-core依赖](https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core) 。如若不加endpoint将会显示warning信息而不会显示统计信息
```json
{
"warning": "please add metrics-core dependency, we use it to metrics"
}
```
## More ## More
RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

View File

@@ -6,14 +6,14 @@ package org.springframework.cloud.alibaba.cloud.examples;
public class Foo { public class Foo {
private int id; private int id;
private String tag; private String bar;
public Foo() { public Foo() {
} }
public Foo(int id, String tag) { public Foo(int id, String bar) {
this.id = id; this.id = id;
this.tag = tag; this.bar = bar;
} }
public int getId() { public int getId() {
@@ -24,16 +24,16 @@ public class Foo {
this.id = id; this.id = id;
} }
public String getTag() { public String getBar() {
return tag; return bar;
} }
public void setTag(String tag) { public void setBar(String bar) {
this.tag = tag; this.bar = bar;
} }
@Override @Override
public String toString() { public String toString() {
return "Foo{" + "id=" + id + ", tag='" + tag + '\'' + '}'; return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}';
} }
} }

View File

@@ -6,13 +6,13 @@ spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=application/json spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input1.consumer.maxAttempts=1 spring.cloud.stream.bindings.input1.consumer.maxAttempts=1
spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=application/json spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2 spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr

View File

@@ -22,13 +22,16 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>metrics-core</artifactId> <artifactId>rocketmq-client</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>rocketmq-client</artifactId> <artifactId>metrics-core</artifactId>
<version>4.0.3</version>
<scope>provided</scope>
<optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>

View File

@@ -5,6 +5,8 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/ */
public interface RocketMQBinderConstants { public interface RocketMQBinderConstants {
String ENDPOINT_ID = "rocketmq-binder";
/** /**
* Header key * Header key
*/ */
@@ -17,10 +19,27 @@ public interface RocketMQBinderConstants {
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/** /**
* Instrumentation key * Instrumentation
*/ */
String LASTSEND_TIMESTAMP = "lastSend.timestamp"; String LASTSEND_TIMESTAMP = "lastSend.timestamp";
String ENDPOINT_ID = "rocketmq-binder"; interface Metrics {
interface Producer {
String PREFIX = "scs-rocketmq.producer.";
String TOTAL_SENT = "totalSent";
String TOTAL_SENT_FAILURES = "totalSentFailures";
String SENT_PER_SECOND = "sentPerSecond";
String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond";
}
interface Consumer {
String GROUP_PREFIX = "scs-rocketmq.consumerGroup.";
String PREFIX = "scs-rocketmq.consumer.";
String TOTAL_CONSUMED = "totalConsumed";
String CONSUMED_PER_SECOND = "consumedPerSecond";
String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures";
String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond";
}
}
} }

View File

@@ -34,7 +34,6 @@ public class RocketMQMessageChannelBinder extends
.getLogger(RocketMQMessageChannelBinder.class); .getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties; private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQTopicProvisioner rocketTopicProvisioner;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager; private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager; private final ConsumersManager consumersManager;
@@ -47,7 +46,6 @@ public class RocketMQMessageChannelBinder extends
super(null, provisioningProvider); super(null, provisioningProvider);
this.consumersManager = consumersManager; this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;
this.rocketTopicProvisioner = provisioningProvider;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@@ -74,7 +72,7 @@ public class RocketMQMessageChannelBinder extends
throws Exception { throws Exception {
if (group == null || "".equals(group)) { if (group == null || "".equals(group)) {
throw new RuntimeException( throw new RuntimeException(
"'group' must be configured for channel + " + destination.getName()); "'group must be configured for channel + " + destination.getName());
} }
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(

View File

@@ -4,12 +4,11 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint; import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.codahale.metrics.MetricRegistry;
/** /**
* @author Timur Valiev * @author Timur Valiev
@@ -18,23 +17,22 @@ import com.codahale.metrics.MetricRegistry;
@Endpoint(id = ENDPOINT_ID) @Endpoint(id = ENDPOINT_ID)
public class RocketMQBinderEndpoint { public class RocketMQBinderEndpoint {
private MetricRegistry metricRegistry = new MetricRegistry(); @Autowired(required = false)
private Map<String, Object> runtime = new ConcurrentHashMap<>(); private InstrumentationManager instrumentationManager;
@ReadOperation @ReadOperation
public Map<String, Object> invoke() { public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
result.put("metrics", metricRegistry().getMetrics()); if (instrumentationManager != null) {
result.put("runtime", runtime()); result.put("metrics",
instrumentationManager.getMetricRegistry().getMetrics());
result.put("runtime", instrumentationManager.getRuntime());
}
else {
result.put("warning",
"please add metrics-core dependency, we use it for metrics");
}
return result; return result;
} }
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public Map<String, Object> runtime() {
return runtime;
}
} }

View File

@@ -1,5 +1,6 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator; package org.springframework.cloud.stream.binder.rocketmq.actuator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
@@ -11,14 +12,12 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
*/ */
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
private final InstrumentationManager instrumentationManager; @Autowired(required = false)
private InstrumentationManager instrumentationManager;
public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
@Override @Override
protected void doHealthCheck(Health.Builder builder) throws Exception { protected void doHealthCheck(Health.Builder builder) throws Exception {
if (instrumentationManager != null) {
if (instrumentationManager.getHealthInstrumentations().stream() if (instrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isUp)) { .allMatch(Instrumentation::isUp)) {
builder.up(); builder.up();
@@ -35,4 +34,11 @@ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
.forEach(instrumentation1 -> builder .forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException())); .withException(instrumentation1.getStartException()));
} }
else {
builder.down();
builder.withDetail("warning",
"please add metrics-core dependency, we use it for metrics");
}
}
} }

View File

@@ -25,6 +25,9 @@ public class RocketMQBinderAutoConfiguration {
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired(required = false)
private InstrumentationManager instrumentationManager;
@Autowired @Autowired
public RocketMQBinderAutoConfiguration( public RocketMQBinderAutoConfiguration(
RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQExtendedBindingProperties extendedBindingProperties,
@@ -43,7 +46,6 @@ public class RocketMQBinderAutoConfiguration {
@Bean @Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder( public RocketMQMessageChannelBinder rocketMessageChannelBinder(
RocketMQTopicProvisioner provisioningProvider, RocketMQTopicProvisioner provisioningProvider,
InstrumentationManager instrumentationManager,
ConsumersManager consumersManager) { ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
consumersManager, extendedBindingProperties, provisioningProvider, consumersManager, extendedBindingProperties, provisioningProvider,
@@ -52,8 +54,7 @@ public class RocketMQBinderAutoConfiguration {
} }
@Bean @Bean
public ConsumersManager consumersManager( public ConsumersManager consumersManager() {
InstrumentationManager instrumentationManager) {
return new ConsumersManager(instrumentationManager, return new ConsumersManager(instrumentationManager,
rocketBinderConfigurationProperties); rocketBinderConfigurationProperties);
} }

View File

@@ -1,7 +1,9 @@
package org.springframework.cloud.stream.binder.rocketmq.config; package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration; import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@@ -13,6 +15,7 @@ import org.springframework.context.annotation.Configuration;
*/ */
@Configuration @Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class) @AutoConfigureAfter(EndpointAutoConfiguration.class)
@ConditionalOnClass(Endpoint.class)
public class RocketMQBinderEndpointAutoConfiguration { public class RocketMQBinderEndpointAutoConfiguration {
@Bean @Bean
@@ -21,16 +24,14 @@ public class RocketMQBinderEndpointAutoConfiguration {
} }
@Bean @Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator( public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
InstrumentationManager instrumentationManager) { return new RocketMQBinderHealthIndicator();
return new RocketMQBinderHealthIndicator(instrumentationManager);
} }
@Bean @Bean
public InstrumentationManager instrumentationManager( @ConditionalOnClass(name = "com.codahale.metrics.Counter")
RocketMQBinderEndpoint rocketBinderEndpoint) { public InstrumentationManager instrumentationManager() {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), return new InstrumentationManager();
rocketBinderEndpoint.runtime());
} }
} }

View File

@@ -3,6 +3,7 @@ package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -27,9 +28,10 @@ public class ConsumersManager {
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>(); private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>(); private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>(); private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>();
private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private InstrumentationManager instrumentationManager;
public ConsumersManager(InstrumentationManager instrumentationManager, public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
@@ -41,9 +43,12 @@ public class ConsumersManager {
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) { ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
consumerProperties); consumerProperties);
ConsumerGroupInstrumentation instrumentation = instrumentationManager
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
ConsumerGroupInstrumentation instrumentation = manager
.getConsumerGroupInstrumentation(group); .getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation); instrumentationManager.addHealthInstrumentation(instrumentation);
});
if (consumerGroups.containsKey(group)) { if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group); return consumerGroups.get(group);
@@ -87,16 +92,23 @@ public class ConsumersManager {
if (started.get(group)) { if (started.get(group)) {
return; return;
} }
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager
ConsumerGroupInstrumentation groupInstrumentation = null;
if (Optional.ofNullable(instrumentationManager).isPresent()) {
groupInstrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group); .getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation); instrumentationManager.addHealthInstrumentation(groupInstrumentation);
}
try { try {
consumerGroups.get(group).start(); consumerGroups.get(group).start();
started.put(group, true); started.put(group, true);
groupInstrumentation.markStartedSuccessfully(); Optional.ofNullable(groupInstrumentation)
.ifPresent(g -> g.markStartedSuccessfully());
} }
catch (MQClientException e) { catch (MQClientException e) {
groupInstrumentation.markStartFailed(e); Optional.ofNullable(groupInstrumentation)
.ifPresent(g -> g.markStartFailed(e));
logger.error("RocketMQ Consumer hasn't been started. Caused by " logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ e.getErrorMessage(), e); + e.getErrorMessage(), e);
throw e; throw e;

View File

@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -48,20 +49,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private ConsumerInstrumentation consumerInstrumentation; private ConsumerInstrumentation consumerInstrumentation;
private InstrumentationManager instrumentationManager;
private RetryTemplate retryTemplate;
private RecoveryCallback<? extends Object> recoveryCallback;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination; private final String destination;
private final String group; private final String group;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager; private final ConsumersManager consumersManager;
private RetryTemplate retryTemplate;
private RecoveryCallback<? extends Object> recoveryCallback;
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
String destination, String group, String destination, String group,
@@ -75,21 +76,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override @Override
protected void doStart() { protected void doStart() {
if (!consumerProperties.getExtension().getEnabled()) { if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return; return;
} }
String tags = consumerProperties == null ? null String tags = consumerProperties.getExtension().getTags();
: consumerProperties.getExtension().getTags(); Boolean isOrderly = consumerProperties.getExtension().getOrderly();
Boolean isOrderly = consumerProperties == null ? false
: consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties); destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly final CloudStreamMessageListener listener = isOrderly
? new CloudStreamMessageListenerOrderly(instrumentationManager) ? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently(instrumentationManager); : new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) { if (retryTemplate != null) {
retryTemplate.registerListener(listener); retryTemplate.registerListener(listener);
@@ -99,9 +99,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
: Arrays.stream(tags.split("\\|\\|")).map(String::trim) : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
consumerInstrumentation = instrumentationManager Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
.getConsumerInstrumentation(destination); consumerInstrumentation = manager.getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation); manager.addHealthInstrumentation(consumerInstrumentation);
});
try { try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
@@ -111,10 +112,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
else { else {
consumer.subscribe(destination, String.join(" || ", tagsSet)); consumer.subscribe(destination, String.join(" || ", tagsSet));
} }
consumerInstrumentation.markStartedSuccessfully(); Optional.ofNullable(consumerInstrumentation)
.ifPresent(c -> c.markStartedSuccessfully());
} }
catch (MQClientException e) { catch (MQClientException e) {
consumerInstrumentation.markStartFailed(e); Optional.ofNullable(consumerInstrumentation)
.ifPresent(c -> c.markStartFailed(e));
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
+ e.getErrorMessage(), e); + e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
@@ -148,12 +151,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListener implements MessageListener, RetryListener { protected class CloudStreamMessageListener implements MessageListener, RetryListener {
private final InstrumentationManager instrumentationManager;
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
Acknowledgement consumeMessage(final List<MessageExt> msgs) { Acknowledgement consumeMessage(final List<MessageExt> msgs) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
try { try {
@@ -180,23 +177,29 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
} }
else { else {
Acknowledgement result = doSendMsgs(msgs, null); Acknowledgement result = doSendMsgs(msgs, null);
instrumentationManager Optional.ofNullable(
.getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination) RocketMQInboundChannelAdapter.this.destination)
.markConsumed(); .markConsumed();
});
return result; return result;
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error( logger.error(
"Rocket Message hasn't been processed successfully. Caused by ", "RocketMQ Message hasn't been processed successfully. Caused by ",
e); e);
instrumentationManager Optional.ofNullable(
.getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination) RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure(); .markConsumedFailure();
});
throw new RuntimeException( throw new RuntimeException(
"Rocket Message hasn't been processed successfully. Caused by ", "RocketMQ Message hasn't been processed successfully. Caused by ",
e); e);
} }
} }
@@ -232,16 +235,22 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public <T, E extends Throwable> void close(RetryContext context, public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) { RetryCallback<T, E> callback, Throwable throwable) {
if (throwable != null) { if (throwable != null) {
instrumentationManager Optional.ofNullable(
.getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination) RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure(); .markConsumedFailure();
});
} }
else { else {
instrumentationManager Optional.ofNullable(
.getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination) RocketMQInboundChannelAdapter.this.destination)
.markConsumed(); .markConsumed();
});
} }
} }
@@ -254,11 +263,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerConcurrently protected class CloudStreamMessageListenerConcurrently
extends CloudStreamMessageListener implements MessageListenerConcurrently { extends CloudStreamMessageListener implements MessageListenerConcurrently {
public CloudStreamMessageListenerConcurrently(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { ConsumeConcurrentlyContext context) {
@@ -272,11 +276,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
implements MessageListenerOrderly { implements MessageListenerOrderly {
public CloudStreamMessageListenerOrderly(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) { ConsumeOrderlyContext context) {

View File

@@ -2,6 +2,7 @@ package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.time.Instant; import java.time.Instant;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
@@ -30,14 +31,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private ProducerInstrumentation producerInstrumentation; private ProducerInstrumentation producerInstrumentation;
private InstrumentationManager instrumentationManager;
private final RocketMQProducerProperties producerProperties; private final RocketMQProducerProperties producerProperties;
private final String destination; private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
protected volatile boolean running = false; protected volatile boolean running = false;
public RocketMQMessageHandler(String destination, public RocketMQMessageHandler(String destination,
@@ -54,9 +55,10 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void start() { public void start() {
producer = new DefaultMQProducer(destination); producer = new DefaultMQProducer(destination);
producerInstrumentation = instrumentationManager Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
.getProducerInstrumentation(destination); producerInstrumentation = manager.getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation); manager.addHealthInstrumentation(producerInstrumentation);
});
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
@@ -66,10 +68,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
try { try {
producer.start(); producer.start();
producerInstrumentation.markStartedSuccessfully(); Optional.ofNullable(producerInstrumentation)
.ifPresent(p -> p.markStartedSuccessfully());
} }
catch (MQClientException e) { catch (MQClientException e) {
producerInstrumentation.markStartFailed(e); Optional.ofNullable(producerInstrumentation)
.ifPresent(p -> p.markStartFailed(e));
logger.error( logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); "RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e); throw new MessagingException(e.getMessage(), e);
@@ -127,14 +131,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
sendRes); sendRes);
} }
instrumentationManager.getRuntime().put( Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
RocketMQBinderConstants.LASTSEND_TIMESTAMP, manager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
Instant.now().toEpochMilli()); Instant.now().toEpochMilli());
producerInstrumentation.markSent(); });
Optional.ofNullable(producerInstrumentation).ifPresent(p -> p.markSent());
} }
catch (MQClientException | RemotingException | MQBrokerException catch (MQClientException | RemotingException | MQBrokerException
| InterruptedException | UnsupportedOperationException e) { | InterruptedException | UnsupportedOperationException e) {
producerInstrumentation.markSentFailure(); Optional.ofNullable(producerInstrumentation)
.ifPresent(p -> p.markSentFailure());
logger.error( logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); "RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e); throw new MessagingException(e.getMessage(), e);

View File

@@ -1,7 +1,5 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics; package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
/** /**
@@ -11,24 +9,9 @@ import com.codahale.metrics.MetricRegistry;
public class ConsumerGroupInstrumentation extends Instrumentation { public class ConsumerGroupInstrumentation extends Instrumentation {
private MetricRegistry metricRegistry; private MetricRegistry metricRegistry;
private AtomicBoolean delayedStart = new AtomicBoolean(false);
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
super(name); super(name);
this.metricRegistry = metricRegistry; this.metricRegistry = metricRegistry;
} }
public void markDelayedStart() {
delayedStart.set(true);
}
@Override
public boolean isUp() {
return started.get() || delayedStart.get();
}
@Override
public boolean isOutOfService() {
return !started.get() && startException == null && !delayedStart.get();
}
} }

View File

@@ -2,6 +2,8 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
@@ -19,13 +21,15 @@ public class ConsumerInstrumentation extends Instrumentation {
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName); super(baseMetricName);
this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
this.totalConsumed = registry
.counter(name(baseMetricName, Consumer.TOTAL_CONSUMED));
this.consumedPerSecond = registry this.consumedPerSecond = registry
.meter(name(baseMetricName, "consumedPerSecond")); .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND));
this.totalConsumedFailures = registry this.totalConsumedFailures = registry
.counter(name(baseMetricName, "totalConsumedFailures")); .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES));
this.consumedFailuresPerSecond = registry this.consumedFailuresPerSecond = registry
.meter(name(baseMetricName, "consumedFailuresPerSecond")); .meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND));
} }
public void markConsumed() { public void markConsumed() {

View File

@@ -3,8 +3,12 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
/** /**
@@ -12,36 +16,32 @@ import com.codahale.metrics.MetricRegistry;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class InstrumentationManager { public class InstrumentationManager {
private final MetricRegistry metricRegistry;
private final Map<String, Object> runtime; private final MetricRegistry metricRegistry = new MetricRegistry();
private final Map<String, Object> runtime = new ConcurrentHashMap<>();
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>(); private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>(); private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>(); private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>(); private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public InstrumentationManager(MetricRegistry metricRegistry,
Map<String, Object> runtime) {
this.metricRegistry = metricRegistry;
this.runtime = runtime;
}
public ProducerInstrumentation getProducerInstrumentation(String destination) { public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = "scs-rocketmq.producer." + destination; String key = Producer.PREFIX + destination;
producerInstrumentations.putIfAbsent(key, producerInstrumentations.putIfAbsent(key,
new ProducerInstrumentation(metricRegistry, key)); new ProducerInstrumentation(metricRegistry, key));
return producerInstrumentations.get(key); return producerInstrumentations.get(key);
} }
public ConsumerInstrumentation getConsumerInstrumentation(String destination) { public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
String key = "scs-rocketmq.consumer." + destination; String key = Consumer.PREFIX + destination;
consumeInstrumentations.putIfAbsent(key, consumeInstrumentations.putIfAbsent(key,
new ConsumerInstrumentation(metricRegistry, key)); new ConsumerInstrumentation(metricRegistry, key));
return consumeInstrumentations.get(key); return consumeInstrumentations.get(key);
} }
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
String key = "scs-rocketmq.consumerGroup." + group; String key = Consumer.GROUP_PREFIX + group;
consumerGroupsInstrumentations.putIfAbsent(key, consumerGroupsInstrumentations.putIfAbsent(key,
new ConsumerGroupInstrumentation(metricRegistry, key)); new ConsumerGroupInstrumentation(metricRegistry, key));
return consumerGroupsInstrumentations.get(key); return consumerGroupsInstrumentations.get(key);
@@ -59,4 +59,8 @@ public class InstrumentationManager {
public Map<String, Object> getRuntime() { public Map<String, Object> getRuntime() {
return runtime; return runtime;
} }
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
} }

View File

@@ -2,6 +2,8 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
@@ -19,12 +21,14 @@ public class ProducerInstrumentation extends Instrumentation {
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName); super(baseMetricName);
this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
this.totalSentFailures = registry this.totalSentFailures = registry
.counter(name(baseMetricName, "totalSentFailures")); .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond")); this.sentPerSecond = registry
.meter(name(baseMetricName, Producer.SENT_PER_SECOND));
this.sentFailuresPerSecond = registry this.sentFailuresPerSecond = registry
.meter(name(baseMetricName, "sentFailuresPerSecond")); .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
} }
public void markSent() { public void markSent() {

View File

@@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties { public class RocketMQBinderConfigurationProperties {
private String namesrvAddr; private String namesrvAddr = "127.0.0.1:9876";
private String logLevel = "ERROR"; private String logLevel = "ERROR";