1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Merge pull request #420 from fangjian0423/binder-dev

RocketMQ Binder integrates RocketMQ Spring
This commit is contained in:
Xiaolong Zuo
2019-03-12 15:24:39 +08:00
committed by GitHub
44 changed files with 1596 additions and 1616 deletions

View File

@@ -27,7 +27,7 @@
<aliyun.sdk.version>4.0.1</aliyun.sdk.version> <aliyun.sdk.version>4.0.1</aliyun.sdk.version>
<alicloud.context.version>1.0.5</alicloud.context.version> <alicloud.context.version>1.0.5</alicloud.context.version>
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version> <aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
<rocketmq.version>4.3.1</rocketmq.version> <rocketmq.starter.version>2.0.2</rocketmq.starter.version>
<schedulerX.client.version>2.1.6</schedulerX.client.version> <schedulerX.client.version>2.1.6</schedulerX.client.version>
<dubbo.version>2.6.5</dubbo.version> <dubbo.version>2.6.5</dubbo.version>
<dubbo-spring-boot.version>0.2.1.RELEASE</dubbo-spring-boot.version> <dubbo-spring-boot.version>0.2.1.RELEASE</dubbo-spring-boot.version>
@@ -107,8 +107,8 @@
<!-- Apache RocketMQ --> <!-- Apache RocketMQ -->
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId> <artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version> <version>${rocketmq.starter.version}</version>
</dependency> </dependency>
<!-- Sentinel --> <!-- Sentinel -->

View File

@@ -112,139 +112,165 @@ messageChannel.send(MessageBuilder.withPayload("simple msg").build());
这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。 这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。
**Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。** **Spring Cloud Stream 底层基于这段代码去做了各种抽象。**
### Spring Cloud Alibaba RocketMQ Binder 实现原理
.RocketMQ Binder处理流程
image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[]
RocketMQ Binder 的核心主要就是这3个类`RocketMQMessageChannelBinder``RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 ### 如何使用 Spring Cloud Alibaba RocketMQ Binder ###
`RocketMQMessageChannelBinder` 是个标准的 Binder 实现,其内部构建 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 如果要在您的项目中引入 RocketMQ Binder需要引入如下 maven 依赖:
`RocketMQMessageHandler` 用于 RocketMQ `Producer` 的启动以及消息的发送,其内部会根据 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类,去创建 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`。 ```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
```
在构造 `org.apache.rocketmq.common.message.Message` 的过程中会根据 `org.springframework.messaging.Message` 的 Header 构造成 `RocketMQMessageHeaderAccessor`。然后再根据 `RocketMQMessageHeaderAccessor` 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message` 中。 或者可以使用 Spring Cloud Stream RocketMQ Starter
`RocketMQInboundChannelAdapter` 用于 RocketMQ `Consumer` 的启动以及消息的接收。其内部还支持 https://github.com/spring-projects/spring-retry[spring-retry] 的使用。 ```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
在消费消息的时候可以从 Header 中获取 `Acknowledgement` 并进行一些设置。 ### Spring Cloud Alibaba RocketMQ Binder 实现
比如使用 `MessageListenerConcurrently` 进行异步消费的时候,可以设置延迟消费: RocketMQ Binder 的实现依赖于 https://github.com/apache/rocketmq-spring[RocketMQ-Spring] 框架。
RocketMQ-Spring 框架是 RocketMQ 与 Spring Boot 的整合RocketMQ Spring 主要提供了 3 个特性:
1. 使用 `RocketMQTemplate` 用来统一发送消息,包括同步、异步发送消息和事务消息
2. `@RocketMQTransactionListener` 注解用来处理事务消息的监听和回查
3. `@RocketMQMessageListener` 注解用来消费消息
RocketMQ Binder 的核心类 RocketMQMessageChannelBinder 实现了 Spring Cloud Stream 规范,内部构建会 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。
`RocketMQMessageHandler` 会基于 Binding 配置构造 `RocketMQTemplate``RocketMQTemplate` 内部会把 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类转换成 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`,然后发送出去。
`RocketMQInboundChannelAdapter` 也会基于 Binding 配置构造 `RocketMQListenerBindingContainer``RocketMQListenerBindingContainer` 内部会启动 RocketMQ `Consumer` 接收消息。
NOTE: 在使用 RocketMQ Binder 的同时也可以配置 rocketmq.** 用于触发 RocketMQ Spring 相关的 AutoConfiguration
`RocketMQHeaders` 中定义了很多 Header 常量,在发送消息的时候可以设置到 Spring Message 的 Header 中,用于触发 RocketMQ 相关的 feature
```java ```java
@StreamListener("input") MessageBuilder builder = MessageBuilder.withPayload(msg)
public void receive(Message message) { .setHeader(RocketMQHeaders.TAGS, "binder")
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); .setHeader(RocketMQHeaders.MESSAGE_ID, "my-msg-id")
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); .setHeader("DELAY", "1");
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); Message message = builder.build();
acknowledgement.setConsumeConcurrentlyDelayLevel(1); output().send(message);
}
``` ```
比如使用 `MessageListenerOrderly` 进行顺序消费的时候,可以设置延迟消费: ### 配置选项
```java #### RocketMQ Binder Properties
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
```
Provider端支持的配置 spring.cloud.stream.rocketmq.binder.name-server::
RocketMQ NameServer 地址。
+
Default: `127.0.0.1:9876`.
spring.cloud.stream.rocketmq.binder.access-key::
阿里云账号 AccessKey。
+
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key::
阿里云账号 SecretKey。
+
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
是否为 Producer 和 Consumer 开启消息轨迹功能
+
Default: `true`.
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
消息轨迹开启后存储的 topic 名称。
+
Default: `RMQ_SYS_TRACE_TOPIC`.
:frame: topbot
[width="60%",options="header"]
|====
^|配置项 ^|含义 ^| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效RocketMQ 默认值为4M = 1024 * 1024 * 4)
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`|
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`|
|====
Consumer端支持的配置 #### RocketMQ Consumer Properties
:frame: topbot 下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.` 为前缀的 RocketMQ Consumer 相关的配置。
[width="60%",options="header"]
|====
^|配置项 ^|含义| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤订阅所有消息)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容sql的优先级更高)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false
|====
### Endpoint支持 enable::
是否启用 Consumer。
+
默认值: `true`.
tags::
Consumer 基于 TAGS 订阅,多个 tag 以 `||` 分割。
+
默认值: empty.
sql::
Consumer 基于 SQL 订阅。
+
默认值: empty.
broadcasting::
Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式。
+
默认值: `false`.
orderly::
Consumer 是否同步消费消息模式。
+
默认值: `false`.
delayLevelWhenNextConsume::
异步消费消息模式下消费失败重试策略:
* -1,不重复,直接放入死信队列
* 0,broker 控制重试策略
* >0,client 控制重试策略
+
默认值: `0`.
suspendCurrentQueueTimeMillis::
同步消费消息模式下消费失败后再次消费的时间间隔。
+
默认值: `1000`.
在使用Endpoint特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。 #### RocketMQ Provider Properties
* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的 endpoint 路径为 `/rocketmq_binder` 下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.` 为前缀的 RocketMQ Producer 相关的配置。
* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的 endpoint 路径为 `/actuator/rocketmq-binder`
Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。 enable::
是否启用 Producer。
```json +
{ 默认值: `true`.
"runtime": { group::
"lastSend.timestamp": 1542786623915 Producer group name。
}, +
"metrics": { 默认值: empty.
"scs-rocketmq.consumer.test-topic.totalConsumed": { maxMessageSize::
"count": 11 消息发送的最大字节数。
}, +
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": { 默认值: `8249344`.
"count": 0 transactional::
}, 是否发送事务消息。
"scs-rocketmq.producer.test-topic.totalSentFailures": { +
"count": 0 默认值: `false`.
}, sync::
"scs-rocketmq.consumer.test-topic.consumedPerSecond": { 是否使用同步得方式发送消息。
"count": 11, +
"fifteenMinuteRate": 0.012163847780107841, 默认值: `false`.
"fiveMinuteRate": 0.03614605351360527, vipChannelEnabled::
"meanRate": 0.3493213353657594, 是否在 Vip Channel 上发送消息。
"oneMinuteRate": 0.17099243039490175 +
}, 默认值: `true`.
"scs-rocketmq.producer.test-topic.totalSent": { sendMessageTimeout::
"count": 5 发送消息的超时时间(毫秒)。
}, +
"scs-rocketmq.producer.test-topic.sentPerSecond": { 默认值: `3000`.
"count": 5, compressMessageBodyThreshold::
"fifteenMinuteRate": 0.005540151995103271, 消息体压缩阀值(当消息体超过 4k 的时候会被压缩)。
"fiveMinuteRate": 0.01652854617838251, +
"meanRate": 0.10697493212602836, 默认值: `4096`.
"oneMinuteRate": 0.07995558537067671 retryTimesWhenSendFailed::
}, 在同步发送消息的模式下,消息发送失败的重试次数。
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { +
"count": 0, 默认值: `2`.
"fifteenMinuteRate": 0.0, retryTimesWhenSendAsyncFailed::
"fiveMinuteRate": 0.0, 在异步发送消息的模式下,消息发送失败的重试次数。
"meanRate": 0.0, +
"oneMinuteRate": 0.0 默认值: `2`.
}, retryNextServer::
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { 消息发送失败的情况下是否重试其它的 broker。
"count": 0, +
"fifteenMinuteRate": 0.0, 默认值: `false`.
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
```
注意要想查看统计数据需要在pom里加上 https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core依赖]。如若不加endpoint 将会显示 warning 信息而不会显示统计信息:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```

View File

@@ -114,137 +114,162 @@ All the message types in this code are provided by the `spring-messaging`module.
**The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.** **The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.**
### How to use Spring Cloud Alibaba RocketMQ Binder ###
For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
```
Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
### How Spring Cloud Alibaba RocketMQ Binder Works ### How Spring Cloud Alibaba RocketMQ Binder Works
.RocketMQ Binder Workflow The implementation of RocketMQ Binder depend on the https://github.com/apache/rocketmq-spring[RocketMQ-Spring] framework.
image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[]
RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:
The core of RocketMQ Binder are the 3 classes below: `RocketMQMessageChannelBinder``RocketMQInboundChannelAdapter` and `RocketMQMessageHandler`. 1. `RocketMQTemplate`: Sending messages, including synchronous, asynchronous, and transactional messages.
2. `@RocketMQTransactionListener`: Listen and check for transaction messages.
3. `@RocketMQMessageListener`: Consume messages.
`RocketMQMessageChannelBinder` is a standard implementation of Binder. It contains `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` as its internal constructions. `RocketMQMessageChannelBinder` is a standard implementation of Binder, it will build `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` internally.
`RocketMQMessageHandler` is used to start RocketMQ `Producer` and send messages. It creates message type of RocketMQ `org.apache.rocketmq.common.message.Message` based on the message type of `org.springframework.messaging.Message` in the `spring-messaging` module. `RocketMQMessageHandler` will construct `RocketMQTemplate` based on the Binding configuration. `RocketMQTemplate` will convert the `org.springframework.messaging.Message` message class of `spring-messaging` module to the RocketMQ message class `org.apache.rocketmq.common .message.Message` internally, then send it out.
When constructing `org.apache.rocketmq.common.message.Message`, it constructs `RocketMQMessageHeaderAccessor` based on the header of `org.springframework.messaging.Message`. Then, based on some of the properties in `RocketMQMessageHeaderAccessor` , it sets some of message attributes such as tags, keys, and flag in `org.apache.rocketmq.common.message.Message` of RocketMQ. `RocketMQInboundChannelAdapter` will also construct `RocketMQListenerBindingContainer` based on the Binding configuration, and `RocketMQListenerBindingContainer` will start the RocketMQ `Consumer` to receive the messages.
`RocketMQInboundChannelAdapter` is used to start RocketMQ `Consumer` and receive messages. It also support the usage of https://github.com/spring-projects/spring-retry[spring-retry]. NOTE: RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration
You can also obtain `Acknowledgement` from the Header and make some configurations. The headers defined in `RocketMQHeaders`, which can be set to the header of spring message when sending a message to trigger the RocketMQ related feature:
For example, you can set delayed message consumption when `MessageListenerConcurrently` is used for asynchronous message consumption:
```java ```java
@StreamListener("input") MessageBuilder builder = MessageBuilder.withPayload(msg)
public void receive(Message message) { .setHeader(RocketMQHeaders.TAGS, "binder")
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); .setHeader(RocketMQHeaders.MESSAGE_ID, "my-msg-id")
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); .setHeader("DELAY", "1");
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); Message message = builder.build();
acknowledgement.setConsumeConcurrentlyDelayLevel(1); output().send(message);
}
``` ```
You can also set delayed message consumption when `MessageListenerOrderly` is used for consuming ordered messages. ### Configuration Options
```java #### RocketMQ Binder Properties
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
```
Supported Configurations of Provider: spring.cloud.stream.rocketmq.binder.name-server::
The name server of RocketMQ Server.
+
Default: `127.0.0.1:9876`.
spring.cloud.stream.rocketmq.binder.access-key::
The AccessKey of Alibaba Cloud Account.
+
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key::
The SecretKey of Alibaba Cloud Account.
+
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
Enable Message Trace feature for all producers and consumers.
+
Default: `true`.
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
The trace topic for message trace.
+
Default: `RMQ_SYS_TRACE_TOPIC`.
:frame: topbot
[width="60%",options="header"]
|====
^|Configuration ^|Description ^| Default Value
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|Whether to use producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|Maximum bytes of messages sent|0(Take effect only when its bigger than 0. The default value of RocketMQ is 4M = 1024 * 1024 * 4)
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|Whether to use `TransactionMQProducer` to send transaction messages|false
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.LocalTransactionExecuter` For example, `org.test.MyExecuter`|
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.TransactionCheckListener` For example, `org.test.MyTransactionCheckListener`|
|====
Supported Configurations of Consumer: #### RocketMQ Consumer Properties
:frame: topbot The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.`.
[width="60%",options="header"]
|====
^|Configuration ^|Description| Default Value
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|Whether to use consumer|true
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer will only subscribe to messages with these tags Tags are separated by "\|\|" (If not specified, it means the consumer subscribes to all messages)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer subscribes to the messages as requested in the SQL(If tags are also specified, SQL has a higher priority than tags.)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|If the consumer uses the broadcasting mode|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|Ordered message consumption or asychronous consumption|false
|====
### Endpoint Support enable::
Enable Consumer Binding.
+
Default: `true`.
tags::
Consumer subscription tags expression, tags split by `||`.
+
Default: empty.
sql::
Consumer subscription sql expression.
+
Default: empty.
broadcasting::
Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
+
Default: `false`.
orderly::
Receiving message concurrently or orderly.
+
Default: `false`.
delayLevelWhenNextConsume::
Message consume retry strategy for concurrently consume:
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
+
Default: `0`.
suspendCurrentQueueTimeMillis::
Time interval of message consume retry for orderly consume.
+
Default: `1000`.
Before you use the Endpoint feature, please add the `spring-boot-starter-actuator` dependency in Maven, and enable access of Endpoints in your configuration. #### RocketMQ Provider Properties
* Add `management.security.enabled=false`in Spring Boot 1.x. The exposed endpoint path is `/rocketmq_binder` The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.`.
* Add `management.endpoints.web.exposure.include=*`in Spring Boot 2.x. The exposed endpoint path is `/actuator/rocketmq-binder`
Endpoint will collects data about the last message that is sent, the number of successes or failures of message sending, and the number of successes of failures of message consumption. enable::
Enable Producer Binding.
```json +
{ Default: `true`.
"runtime": { group::
"lastSend.timestamp": 1542786623915 Producer group name.
}, +
"metrics": { Default: empty.
"scs-rocketmq.consumer.test-topic.totalConsumed": { maxMessageSize::
"count": 11 Maximum allowed message size in bytes.
}, +
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": { Default: `8249344`.
"count": 0 transactional::
}, Send Transactional Message.
"scs-rocketmq.producer.test-topic.totalSentFailures": { +
"count": 0 Default: `false`.
}, sync::
"scs-rocketmq.consumer.test-topic.consumedPerSecond": { Send message in synchronous mode.
"count": 11, +
"fifteenMinuteRate": 0.012163847780107841, Default: `false`.
"fiveMinuteRate": 0.03614605351360527, vipChannelEnabled::
"meanRate": 0.3493213353657594, Send message with vip channel.
"oneMinuteRate": 0.17099243039490175 +
}, Default: `true`.
"scs-rocketmq.producer.test-topic.totalSent": { sendMessageTimeout::
"count": 5 Millis of send message timeout.
}, +
"scs-rocketmq.producer.test-topic.sentPerSecond": { Default: `3000`.
"count": 5, compressMessageBodyThreshold::
"fifteenMinuteRate": 0.005540151995103271, Compress message body threshold, namely, message body larger than 4k will be compressed on default.
"fiveMinuteRate": 0.01652854617838251, +
"meanRate": 0.10697493212602836, Default: `4096`.
"oneMinuteRate": 0.07995558537067671 retryTimesWhenSendFailed::
}, Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { +
"count": 0, Default: `2`.
"fifteenMinuteRate": 0.0, retryTimesWhenSendAsyncFailed::
"fiveMinuteRate": 0.0, Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
"meanRate": 0.0, +
"oneMinuteRate": 0.0 Default: `2`.
}, retryNextServer::
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { Indicate whether to retry another broker on sending failure internally.
"count": 0, +
"fifteenMinuteRate": 0.0, Default: `false`.
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
```
Note To view statistics, add the https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core dependency] in POM. If not added, the endpoint will return warning instead of statistics:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```

View File

@@ -34,7 +34,8 @@
<module>fescar-example/storage-service</module> <module>fescar-example/storage-service</module>
<module>fescar-example/account-service</module> <module>fescar-example/account-service</module>
<module>acm-example/acm-local-example</module> <module>acm-example/acm-local-example</module>
<module>rocketmq-example</module> <module>rocketmq-example/rocketmq-consume-example</module>
<module>rocketmq-example/rocketmq-produce-example</module>
<module>sms-example</module> <module>sms-example</module>
<module>spring-cloud-bus-rocketmq-example</module> <module>spring-cloud-bus-rocketmq-example</module>
<module>schedulerx-example/schedulerx-simple-task-example</module> <module>schedulerx-example/schedulerx-simple-task-example</module>

View File

@@ -6,14 +6,14 @@
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-examples</artifactId> <artifactId>spring-cloud-alibaba-examples</artifactId>
<version>0.2.2.BUILD-SNAPSHOT</version> <version>0.2.2.BUILD-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-example</artifactId> <artifactId>rocketmq-consume-example</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<description>Example demonstrating how to use rocketmq</description> <description>Example demonstrating how to use rocketmq consume</description>
<dependencies> <dependencies>
@@ -32,11 +32,6 @@
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -25,14 +25,9 @@ public class ReceiveService {
System.out.println("input3 receive: " + foo); System.out.println("input3 receive: " + foo);
} }
@StreamListener("input1") @StreamListener("input4")
public void receiveInput1Again(String receiveMsg) { public void receiveTransactionalMsg(String transactionMsg) {
System.out.println("input1 receive again: " + receiveMsg); System.out.println("input4 receive transaction msg: " + transactionMsg);
} }
@StreamListener("input4")
public void receiveTransactionalMsg(String transactionMsg) {
System.out.println("input4 receive transaction msg: " + transactionMsg);
}
} }

View File

@@ -0,0 +1,36 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@SpringBootApplication
@EnableBinding({ MySink.class })
public class RocketMQConsumerApplication {
public interface MySink {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
@Input("input4")
SubscribableChannel input4();
}
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class, args);
}
}

View File

@@ -1,21 +1,9 @@
spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.output1.destination=test-topic
spring.cloud.stream.bindings.output1.content-type=application/json
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter
spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener
spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input1.consumer.maxAttempts=1
spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain spring.cloud.stream.bindings.input2.content-type=text/plain
@@ -30,15 +18,15 @@ spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3 spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
spring.cloud.stream.bindings.input4.destination=TransactionTopic spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=210 spring.cloud.stream.bindings.input4.consumer.concurrency=5
spring.application.name=rocketmq-example spring.application.name=rocketmq-consume-example
server.port=28081 server.port=28082
management.endpoints.web.exposure.include=* management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

View File

@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<version>0.2.2.BUILD-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-produce-example</artifactId>
<packaging>jar</packaging>
<description>Example demonstrating how to use rocketmq produce</description>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>${maven-deploy-plugin.version}</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,39 @@
package org.springframework.cloud.alibaba.cloud.examples;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Foo {
private int id;
private String bar;
public Foo() {
}
public Foo(int id, String bar) {
this.id = id;
this.bar = bar;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getBar() {
return bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}';
}
}

View File

@@ -4,36 +4,18 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@SpringBootApplication @SpringBootApplication
@EnableBinding({ MySource.class, MySink.class }) @EnableBinding({ MySource.class })
public class RocketMQApplication { public class RocketMQProduceApplication {
public interface MySink {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
@Input("input4")
SubscribableChannel input4();
}
public interface MySource { public interface MySource {
@Output("output1") @Output("output1")
@@ -44,7 +26,7 @@ public class RocketMQApplication {
} }
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args); SpringApplication.run(RocketMQProduceApplication.class, args);
} }
@Bean @Bean
@@ -86,13 +68,13 @@ public class RocketMQApplication {
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
// COMMIT_MESSAGE message // COMMIT_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg1", false); senderService.sendTransactionalMsg("transactional-msg1", 1);
// ROLLBACK_MESSAGE message // ROLLBACK_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg2", true); senderService.sendTransactionalMsg("transactional-msg2", 2);
// ROLLBACK_MESSAGE message // ROLLBACK_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg3", true); senderService.sendTransactionalMsg("transactional-msg3", 3);
// COMMIT_MESSAGE message // COMMIT_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg4", false); senderService.sendTransactionalMsg("transactional-msg4", 4);
} }
} }

View File

@@ -4,8 +4,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@@ -40,12 +41,11 @@ public class SenderService {
source.output1().send(message); source.output1().send(message);
} }
public <T> void sendTransactionalMsg(T msg, boolean error) throws Exception { public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
MessageBuilder builder = MessageBuilder.withPayload(msg) MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
if (error) { builder.setHeader("test", String.valueOf(num));
builder.setHeader("test", "1"); builder.setHeader(RocketMQHeaders.TAGS, "binder");
}
Message message = builder.build(); Message message = builder.build();
source.output2().send(message); source.output2().send(message);
} }

View File

@@ -0,0 +1,54 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
}
else if ("2".equals(num)) {
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) msg.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}
}

View File

@@ -0,0 +1,20 @@
logging.level.org.springframework.cloud.stream.binder.rocketmq=DEBUG
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.output1.destination=test-topic
spring.cloud.stream.bindings.output1.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
spring.application.name=rocketmq-produce-example
server.port=28081
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

View File

@@ -1,18 +0,0 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class MyTransactionCheckListener implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("TransactionCheckListener: " + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@@ -1,20 +0,0 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class MyTransactionExecuter implements LocalTransactionExecuter {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
if ("1".equals(msg.getUserProperty("test"))) {
System.out.println(new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println(new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@@ -21,54 +21,41 @@
<artifactId>spring-cloud-stream</artifactId> <artifactId>spring-cloud-stream</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.0.3</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId> <artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId> <artifactId>spring-boot</artifactId>
<scope>provided</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId> <artifactId>spring-boot-autoconfigure</artifactId>
<scope>provided</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator</artifactId> <artifactId>spring-boot-actuator</artifactId>
<scope>provided</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator-autoconfigure</artifactId> <artifactId>spring-boot-actuator-autoconfigure</artifactId>
<scope>provided</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>

View File

@@ -21,43 +21,16 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/ */
public interface RocketMQBinderConstants { public interface RocketMQBinderConstants {
String ENDPOINT_ID = "rocketmq-binder";
/** /**
* Header key * Header key
*/ */
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE"; String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
String ROCKET_FLAG = "ROCKETMQ_FLAG";
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG";
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/** /**
* Instrumentation * Default value
*/ */
String LASTSEND_TIMESTAMP = "lastSend.timestamp"; String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
interface Metrics { String DEFAULT_GROUP = "rocketmq_binder_default_group_name";
interface Producer {
String PREFIX = "scs-rocketmq.producer.";
String TOTAL_SENT = "totalSent";
String TOTAL_SENT_FAILURES = "totalSentFailures";
String SENT_PER_SECOND = "sentPerSecond";
String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond";
}
interface Consumer {
String GROUP_PREFIX = "scs-rocketmq.consumerGroup.";
String PREFIX = "scs-rocketmq.consumer.";
String TOTAL_CONSUMED = "totalConsumed";
String CONSUMED_PER_SECOND = "consumedPerSecond";
String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures";
String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond";
}
}
} }

View File

@@ -0,0 +1,72 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBinderUtils {
public static RocketMQBinderConfigurationProperties mergeProperties(
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQProperties rocketMQProperties) {
RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties();
if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) {
result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
}
else {
result.setNameServer(rocketMQProperties.getNameServer());
}
if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey());
}
else {
result.setAccessKey(rocketMQProperties.getProducer().getAccessKey());
}
if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) {
result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey());
}
else {
result.setSecretKey(rocketMQProperties.getProducer().getSecretKey());
}
if (rocketMQProperties.getProducer() == null || StringUtils
.isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) {
result.setCustomizedTraceTopic(
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
}
else {
result.setCustomizedTraceTopic(
rocketMQProperties.getProducer().getCustomizedTraceTopic());
}
if (rocketMQProperties.getProducer() != null
&& rocketMQProperties.getProducer().isEnableMsgTrace()) {
result.setEnableMsgTrace(Boolean.TRUE);
}
else {
result.setEnableMsgTrace(
rocketBinderConfigurationProperties.isEnableMsgTrace());
}
return result;
}
}

View File

@@ -16,17 +16,22 @@
package org.springframework.cloud.stream.binder.rocketmq; package org.springframework.cloud.stream.binder.rocketmq;
import org.apache.commons.lang3.StringUtils; import java.util.HashMap;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import java.util.Map;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.slf4j.Logger; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.slf4j.LoggerFactory; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@@ -40,10 +45,11 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer; import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQMessageChannelBinder extends public class RocketMQMessageChannelBinder extends
@@ -51,22 +57,23 @@ public class RocketMQMessageChannelBinder extends
implements implements
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory
.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties();
public RocketMQMessageChannelBinder(ConsumersManager consumersManager, private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
RocketMQTopicProvisioner provisioningProvider, private final RocketMQProperties rocketMQProperties;
private final InstrumentationManager instrumentationManager;
private Map<String, String> topicInUse = new HashMap<>();
public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQProperties rocketMQProperties,
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
super(null, provisioningProvider); super(null, provisioningProvider);
this.consumersManager = consumersManager; this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQProperties = rocketMQProperties;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@@ -75,22 +82,72 @@ public class RocketMQMessageChannelBinder extends
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties, ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel errorChannel) throws Exception { MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) { if (producerProperties.getExtension().getEnabled()) {
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
destination.getName(), producerProperties, RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
rocketBinderConfigurationProperties, instrumentationManager); .mergeProperties(rocketBinderConfigurationProperties,
rocketMQProperties);
RocketMQTemplate rocketMQTemplate;
if (producerProperties.getExtension().getTransactional()) { if (producerProperties.getExtension().getTransactional()) {
// transaction message check LocalTransactionExecuter Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
messageHandler.setLocalTransactionExecuter( .getBeansOfType(RocketMQTemplate.class);
getClassConfiguration(destination.getName(), if (rocketMQTemplates.size() == 0) {
producerProperties.getExtension().getExecuter(), throw new IllegalStateException(
LocalTransactionExecuter.class)); "there is no RocketMQTemplate in Spring BeanFactory");
// transaction message check TransactionCheckListener }
messageHandler.setTransactionCheckListener( else if (rocketMQTemplates.size() > 1) {
getClassConfiguration(destination.getName(), throw new IllegalStateException(
producerProperties.getExtension() "there is more than 1 RocketMQTemplates in Spring BeanFactory");
.getTransactionCheckListener(), }
TransactionCheckListener.class)); rocketMQTemplate = rocketMQTemplates.values().iterator().next();
} }
else {
rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setObjectMapper(this.getApplicationContext()
.getBeansOfType(ObjectMapper.class).values().iterator().next());
DefaultMQProducer producer;
String ak = mergedProperties.getAccessKey();
String sk = mergedProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ak, sk));
producer = new DefaultMQProducer(
producerProperties.getExtension().getGroup(), rpcHook,
mergedProperties.isEnableMsgTrace(),
mergedProperties.getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
producer.setInstanceName(
RocketMQUtil.getInstanceName(rpcHook, destination.getName()));
}
else {
producer = new DefaultMQProducer(
producerProperties.getExtension().getGroup());
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
}
producer.setNamesrvAddr(mergedProperties.getNameServer());
producer.setSendMsgTimeout(
producerProperties.getExtension().getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(
producerProperties.getExtension().getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerProperties
.getExtension().getRetryTimesWhenSendAsyncFailed());
producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(
producerProperties.getExtension().isRetryNextServer());
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
}
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(),
producerProperties.getExtension().getGroup(),
producerProperties.getExtension().getTransactional(),
instrumentationManager);
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync());
if (errorChannel != null) { if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel); messageHandler.setSendFailureChannel(errorChannel);
@@ -113,9 +170,22 @@ public class RocketMQMessageChannelBinder extends
"'group must be configured for channel " + destination.getName()); "'group must be configured for channel " + destination.getName());
} }
RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(
consumerProperties, rocketBinderConfigurationProperties, this);
listenerContainer.setConsumerGroup(group);
listenerContainer.setTopic(destination.getName());
listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
listenerContainer.setSuspendCurrentQueueTimeMillis(
consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());
listenerContainer.setDelayLevelWhenNextConsume(
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
consumersManager, consumerProperties, destination.getName(), group, listenerContainer, consumerProperties, instrumentationManager);
instrumentationManager);
topicInUse.put(destination.getName(), group);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
group, consumerProperties); group, consumerProperties);
@@ -143,6 +213,10 @@ public class RocketMQMessageChannelBinder extends
return extendedBindingProperties.getExtendedProducerProperties(channelName); return extendedBindingProperties.getExtendedProducerProperties(channelName);
} }
public Map<String, String> getTopicInUse() {
return topicInUse;
}
@Override @Override
public String getDefaultsPrefix() { public String getDefaultsPrefix() {
return extendedBindingProperties.getDefaultsPrefix(); return extendedBindingProperties.getDefaultsPrefix();
@@ -153,47 +227,6 @@ public class RocketMQMessageChannelBinder extends
return extendedBindingProperties.getExtendedPropertiesEntryClass(); return extendedBindingProperties.getExtendedPropertiesEntryClass();
} }
private <T> T getClassConfiguration(String destName, String className,
Class<T> interfaceClass) {
if (StringUtils.isEmpty(className)) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, should set "
+ interfaceClass.getSimpleName() + " configuration"
+ interfaceClass.getSimpleName() + " should be set, like "
+ "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour"
+ interfaceClass.getSimpleName() + "'");
}
else if (StringUtils.isNotEmpty(className)) {
Class fieldClass;
// check class exists
try {
fieldClass = ClassUtils.forName(className,
RocketMQMessageChannelBinder.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, but " + className
+ " class is not found");
}
// check interface incompatible
if (!interfaceClass.isAssignableFrom(fieldClass)) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, but " + className
+ " is incompatible with " + interfaceClass.getSimpleName()
+ " interface");
}
try {
return (T) fieldClass.newInstance();
}
catch (Exception e) {
throw new RuntimeException("Binding for channel " + destName
+ " using transactional message, but " + className
+ " instance error", e);
}
}
return null;
}
public void setExtendedBindingProperties( public void setExtendedBindingProperties(
RocketMQExtendedBindingProperties extendedBindingProperties) { RocketMQExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;

View File

@@ -1,135 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
public RocketMQMessageHeaderAccessor() {
super();
}
public RocketMQMessageHeaderAccessor(Message<?> message) {
super(message);
}
public Acknowledgement getAcknowledgement(Message message) {
return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
}
public RocketMQMessageHeaderAccessor withAcknowledgment(
Acknowledgement acknowledgment) {
setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
return this;
}
public String getTags() {
return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
}
public RocketMQMessageHeaderAccessor withTags(String tag) {
setHeader(MessageConst.PROPERTY_TAGS, tag);
return this;
}
public String getKeys() {
return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
}
public RocketMQMessageHeaderAccessor withKeys(String keys) {
setHeader(MessageConst.PROPERTY_KEYS, keys);
return this;
}
public MessageExt getRocketMessage() {
return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
}
public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
setHeader(ORIGINAL_ROCKET_MESSAGE, message);
return this;
}
public Integer getDelayTimeLevel() {
return (Integer) getMessageHeaders()
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
}
public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
return this;
}
public Integer getFlag() {
return (Integer) getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
}
public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
setHeader(ROCKET_FLAG, delayTimeLevel);
return this;
}
public Object getTransactionalArg() {
return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG);
}
public Object withTransactionalArg(Object arg) {
setHeader(ROCKET_TRANSACTIONAL_ARG, arg);
return this;
}
public SendResult getSendResult() {
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
}
public static void putSendResult(MutableMessage message, SendResult sendResult) {
message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
}
public Map<String, String> getUserProperties() {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, Object> entry : this.toMap().entrySet()) {
if (entry.getValue() instanceof String
&& !MessageConst.STRING_HASH_SET.contains(entry.getKey())
&& !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
result.put(entry.getKey(), (String) entry.getValue());
}
}
return result;
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Endpoint(id = ENDPOINT_ID)
public class RocketMQBinderEndpoint {
@Autowired(required = false)
private InstrumentationManager instrumentationManager;
@ReadOperation
public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>();
if (instrumentationManager != null) {
result.put("metrics",
instrumentationManager.getMetricRegistry().getMetrics());
result.put("runtime", instrumentationManager.getRuntime());
}
else {
result.put("warning",
"please add metrics-core dependency, we use it for metrics");
}
return result;
}
}

View File

@@ -28,33 +28,25 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
*/ */
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
@Autowired(required = false) @Autowired
private InstrumentationManager instrumentationManager; private InstrumentationManager instrumentationManager;
@Override @Override
protected void doHealthCheck(Health.Builder builder) throws Exception { protected void doHealthCheck(Health.Builder builder) throws Exception {
if (instrumentationManager != null) { if (instrumentationManager.getHealthInstrumentations().stream()
if (instrumentationManager.getHealthInstrumentations().stream() .allMatch(Instrumentation::isUp)) {
.allMatch(Instrumentation::isUp)) { builder.up();
builder.up(); return;
return;
}
if (instrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isOutOfService)) {
builder.outOfService();
return;
}
builder.down();
instrumentationManager.getHealthInstrumentations().stream()
.filter(instrumentation -> !instrumentation.isStarted())
.forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException()));
} }
else { if (instrumentationManager.getHealthInstrumentations().stream()
builder.down(); .allMatch(Instrumentation::isOutOfService)) {
builder.withDetail("warning", builder.outOfService();
"please add metrics-core dependency, we use it for metrics"); return;
} }
builder.down();
instrumentationManager.getHealthInstrumentations().stream()
.filter(instrumentation -> !instrumentation.isStarted())
.forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException()));
} }
} }

View File

@@ -16,23 +16,26 @@
package org.springframework.cloud.stream.binder.rocketmq.config; package org.springframework.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@Configuration @Configuration
@Import({ RocketMQAutoConfiguration.class,
RocketMQBinderHealthIndicatorAutoConfiguration.class })
@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, @EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
RocketMQExtendedBindingProperties.class }) RocketMQExtendedBindingProperties.class })
public class RocketMQBinderAutoConfiguration { public class RocketMQBinderAutoConfiguration {
@@ -42,7 +45,7 @@ public class RocketMQBinderAutoConfiguration {
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired(required = false) @Autowired(required = false)
private InstrumentationManager instrumentationManager; private RocketMQProperties rocketMQProperties = new RocketMQProperties();
@Autowired @Autowired
public RocketMQBinderAutoConfiguration( public RocketMQBinderAutoConfiguration(
@@ -50,8 +53,6 @@ public class RocketMQBinderAutoConfiguration {
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL,
this.rocketBinderConfigurationProperties.getLogLevel());
} }
@Bean @Bean
@@ -62,18 +63,18 @@ public class RocketMQBinderAutoConfiguration {
@Bean @Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder( public RocketMQMessageChannelBinder rocketMessageChannelBinder(
RocketMQTopicProvisioner provisioningProvider, RocketMQTopicProvisioner provisioningProvider,
ConsumersManager consumersManager) { InstrumentationManager instrumentationManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
consumersManager, provisioningProvider, provisioningProvider, extendedBindingProperties,
rocketBinderConfigurationProperties, instrumentationManager); rocketBinderConfigurationProperties, rocketMQProperties,
instrumentationManager);
binder.setExtendedBindingProperties(extendedBindingProperties); binder.setExtendedBindingProperties(extendedBindingProperties);
return binder; return binder;
} }
@Bean @Bean
public ConsumersManager consumersManager() { public InstrumentationManager instrumentationManager() {
return new ConsumersManager(instrumentationManager, return new InstrumentationManager();
rocketBinderConfigurationProperties);
} }
} }

View File

@@ -16,13 +16,9 @@
package org.springframework.cloud.stream.binder.rocketmq.config; package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint; import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@@ -30,24 +26,12 @@ import org.springframework.context.annotation.Configuration;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@Configuration @Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class)
@ConditionalOnClass(Endpoint.class) @ConditionalOnClass(Endpoint.class)
public class RocketMQBinderEndpointAutoConfiguration { public class RocketMQBinderHealthIndicatorAutoConfiguration {
@Bean
public RocketMQBinderEndpoint rocketBinderEndpoint() {
return new RocketMQBinderEndpoint();
}
@Bean @Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() { public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
return new RocketMQBinderHealthIndicator(); return new RocketMQBinderHealthIndicator();
} }
@Bean
@ConditionalOnClass(name = "com.codahale.metrics.Counter")
public InstrumentationManager instrumentationManager() {
return new InstrumentationManager();
}
} }

View File

@@ -0,0 +1,99 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@AutoConfigureAfter(RocketMQAutoConfiguration.class)
@ConditionalOnMissingBean(DefaultMQProducer.class)
public class RocketMQComponent4BinderAutoConfiguration {
private final Environment environment;
public RocketMQComponent4BinderAutoConfiguration(Environment environment) {
this.environment = environment;
}
@Bean
@ConditionalOnMissingBean(DefaultMQProducer.class)
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer;
String configNameServer = environment.resolveRequiredPlaceholders(
"${spring.cloud.stream.rocketmq.binder.name-server:${rocketmq.producer.name-server:}}");
String ak = environment.resolveRequiredPlaceholders(
"${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}");
String sk = environment.resolveRequiredPlaceholders(
"${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}");
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP,
new AclClientRPCHook(new SessionCredentials(ak, sk)));
producer.setVipChannelEnabled(false);
}
else {
producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP);
}
producer.setNamesrvAddr(configNameServer);
return producer;
}
@Bean(destroyMethod = "destroy")
@ConditionalOnMissingBean
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
ObjectMapper objectMapper) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setObjectMapper(objectMapper);
return rocketMQTemplate;
}
@Bean
@ConditionalOnBean(RocketMQTemplate.class)
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
public TransactionHandlerRegistry transactionHandlerRegistry(
RocketMQTemplate template) {
return new TransactionHandlerRegistry(template);
}
@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnBean(TransactionHandlerRegistry.class)
public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(
TransactionHandlerRegistry transactionHandlerRegistry) {
return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
}
}

View File

@@ -1,98 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Acknowledgement {
/**
* for {@link ConsumeConcurrentlyContext} using
*/
private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
/**
* Message consume retry strategy<br>
* -1,no retry,put into DLQ directly<br>
* 0,broker control retry frequency<br>
* >0,client control retry frequency
*/
private Integer consumeConcurrentlyDelayLevel = 0;
/**
* for {@link ConsumeOrderlyContext} using
*/
private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
public Acknowledgement setConsumeConcurrentlyStatus(
ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
return this;
}
public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
return consumeConcurrentlyStatus;
}
public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
return consumeOrderlyStatus;
}
public Acknowledgement setConsumeOrderlyStatus(
ConsumeOrderlyStatus consumeOrderlyStatus) {
this.consumeOrderlyStatus = consumeOrderlyStatus;
return this;
}
public Integer getConsumeConcurrentlyDelayLevel() {
return consumeConcurrentlyDelayLevel;
}
public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
}
public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
return consumeOrderlySuspendCurrentQueueTimeMill;
}
public void setConsumeOrderlySuspendCurrentQueueTimeMill(
Long consumeOrderlySuspendCurrentQueueTimeMill) {
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
}
public static Acknowledgement buildOrderlyInstance() {
Acknowledgement acknowledgement = new Acknowledgement();
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
return acknowledgement;
}
public static Acknowledgement buildConcurrentlyInstance() {
Acknowledgement acknowledgement = new Acknowledgement();
acknowledgement
.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
return acknowledgement;
}
}

View File

@@ -1,137 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumersManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>();
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private InstrumentationManager instrumentationManager;
public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group,
String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
consumerProperties);
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
ConsumerGroupInstrumentation instrumentation = manager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
});
if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group);
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
consumerGroups.put(group, consumer);
started.put(group, false);
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
if (consumerProperties.getExtension().getBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
logger.info("RocketMQ consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumers() throws MQClientException {
for (String group : getConsumerGroups()) {
start(group);
}
}
public synchronized void startConsumer(String group) throws MQClientException {
start(group);
}
public synchronized void stopConsumer(String group) {
stop(group);
}
private void stop(String group) {
if (consumerGroups.get(group) != null) {
consumerGroups.get(group).shutdown();
started.put(group, false);
}
}
private synchronized void start(String group) throws MQClientException {
if (started.get(group)) {
return;
}
ConsumerGroupInstrumentation groupInstrumentation = null;
if (Optional.ofNullable(instrumentationManager).isPresent()) {
groupInstrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
}
try {
consumerGroups.get(group).start();
started.put(group, true);
Optional.ofNullable(groupInstrumentation)
.ifPresent(g -> g.markStartedSuccessfully());
}
catch (MQClientException e) {
Optional.ofNullable(groupInstrumentation)
.ifPresent(g -> g.markStartFailed(e));
logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() {
return consumerGroups.keySet();
}
}

View File

@@ -0,0 +1,419 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQListenerBindingContainer
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQListenerBindingContainer.class);
private long suspendCurrentQueueTimeMillis = 1000;
/**
* Message consume retry strategy<br>
* -1,no retry,put into DLQ directly<br>
* 0,broker control retry frequency<br>
* >0,client control retry frequency.
*/
private int delayLevelWhenNextConsume = 0;
private String nameServer;
private String consumerGroup;
private String topic;
private int consumeThreadMax = 64;
private String charset = "UTF-8";
private RocketMQListener rocketMQListener;
private DefaultMQPushConsumer consumer;
private boolean running;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
// The following properties came from RocketMQConsumerProperties.
private ConsumeMode consumeMode;
private SelectorType selectorType;
private String selectorExpression;
private MessageModel messageModel;
public RocketMQListenerBindingContainer(
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
? ConsumeMode.ORDERLY
: ConsumeMode.CONCURRENTLY;
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
this.selectorType = SelectorType.TAG;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
}
else {
this.selectorType = SelectorType.SQL92;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
}
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
? MessageModel.BROADCASTING
: MessageModel.CLUSTERING;
}
@Override
public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
this.rocketMQListener = rocketMQListener;
}
@Override
public void destroy() throws Exception {
this.setRunning(false);
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
log.info("container destroyed, {}", this.toString());
}
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException(
"container already running. " + this.toString());
}
try {
consumer.start();
}
catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
@Override
public void stop() {
if (this.isRunning()) {
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
setRunning(false);
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
private void initRocketMQPushConsumer() throws MQClientException {
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
String ak = rocketBinderConfigurationProperties.getAccessKey();
String sk = rocketBinderConfigurationProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
new AllocateMessageQueueAveragely(),
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic));
consumer.setVipChannelEnabled(false);
}
else {
consumer = new DefaultMQPushConsumer(consumerGroup,
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
}
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener)
.prepareStart(consumer);
}
}
@Override
public String toString() {
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+ '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+ messageModel + '}';
}
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public int getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getConsumeThreadMax() {
return consumeThreadMax;
}
public void setConsumeThreadMax(int consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
this.charset = charset;
}
public RocketMQListener getRocketMQListener() {
return rocketMQListener;
}
public void setRocketMQListener(RocketMQListener rocketMQListener) {
this.rocketMQListener = rocketMQListener;
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
return rocketMQConsumerProperties;
}
public ConsumeMode getConsumeMode() {
return consumeMode;
}
public SelectorType getSelectorType() {
return selectorType;
}
public String getSelectorExpression() {
return selectorExpression;
}
public MessageModel getMessageModel() {
return messageModel;
}
public class DefaultMessageListenerConcurrently
implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
}
catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
}
catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(
suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.exception;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* An exception that is the payload of an {@code ErrorMessage} when occurs send failure.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @since 0.2.2
*/
public class RocketMQSendFailureException extends MessagingException {
private final org.apache.rocketmq.common.message.Message rocketmqMsg;
public RocketMQSendFailureException(Message<?> message,
org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) {
super(message, cause);
this.rocketmqMsg = rocketmqMsg;
}
public org.apache.rocketmq.common.message.Message getRocketmqMsg() {
return rocketmqMsg;
}
@Override
public String toString() {
return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]";
}
}

View File

@@ -16,82 +16,49 @@
package org.springframework.cloud.stream.binder.rocketmq.integration; package org.springframework.cloud.stream.binder.rocketmq.integration;
import org.apache.commons.lang3.ClassUtils; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.MessagingException;
import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext; import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener; import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQInboundChannelAdapter extends MessageProducerSupport { public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private static final Logger logger = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(RocketMQInboundChannelAdapter.class); .getLogger(RocketMQInboundChannelAdapter.class);
private ConsumerInstrumentation consumerInstrumentation;
private InstrumentationManager instrumentationManager;
private RetryTemplate retryTemplate; private RetryTemplate retryTemplate;
private RecoveryCallback<? extends Object> recoveryCallback; private RecoveryCallback<? extends Object> recoveryCallback;
private DefaultMQPushConsumer consumer; private RocketMQListenerBindingContainer rocketMQListenerContainer;
private CloudStreamMessageListener listener;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination; private final InstrumentationManager instrumentationManager;
private final String group; public RocketMQInboundChannelAdapter(
RocketMQListenerBindingContainer rocketMQListenerContainer,
private final ConsumersManager consumersManager;
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
String destination, String group,
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
this.consumersManager = consumersManager; this.rocketMQListenerContainer = rocketMQListenerContainer;
this.consumerProperties = consumerProperties; this.consumerProperties = consumerProperties;
this.destination = destination;
this.group = group;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@@ -108,16 +75,27 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted"); + "send an error message when retries are exhausted");
} }
this.consumer = consumersManager.getOrCreateConsumer(group, destination,
consumerProperties);
Boolean isOrderly = consumerProperties.getExtension().getOrderly(); BindingRocketMQListener listener = new BindingRocketMQListener();
this.listener = isOrderly ? new CloudStreamMessageListenerOrderly() rocketMQListenerContainer.setRocketMQListener(listener);
: new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) { if (retryTemplate != null) {
this.retryTemplate.registerListener(this.listener); this.retryTemplate.registerListener(listener);
} }
try {
rocketMQListenerContainer.afterPropertiesSet();
}
catch (Exception e) {
log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
throw new IllegalArgumentException(
"rocketMQListenerContainer init error: " + e.getMessage(), e);
}
instrumentationManager.addHealthInstrumentation(
new Instrumentation(rocketMQListenerContainer.getTopic()
+ rocketMQListenerContainer.getConsumerGroup()));
} }
@Override @Override
@@ -126,53 +104,28 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|| !consumerProperties.getExtension().getEnabled()) { || !consumerProperties.getExtension().getEnabled()) {
return; return;
} }
String tags = consumerProperties.getExtension().getTags();
Set<String> tagsSet = tags == null ? new HashSet<>()
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
.collect(Collectors.toSet());
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
consumerInstrumentation = manager.getConsumerInstrumentation(destination);
manager.addHealthInstrumentation(consumerInstrumentation);
});
try { try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { rocketMQListenerContainer.start();
this.consumer.subscribe(destination, MessageSelector instrumentationManager
.bySql(consumerProperties.getExtension().getSql())); .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
} + rocketMQListenerContainer.getConsumerGroup())
else { .markStartedSuccessfully();
this.consumer.subscribe(destination, String.join(" || ", tagsSet));
}
Optional.ofNullable(consumerInstrumentation)
.ifPresent(c -> c.markStartedSuccessfully());
} }
catch (MQClientException e) { catch (Exception e) {
Optional.ofNullable(consumerInstrumentation) instrumentationManager
.ifPresent(c -> c.markStartFailed(e)); .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + rocketMQListenerContainer.getConsumerGroup())
+ e.getErrorMessage(), e); .markStartFailed(e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
} throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
this.consumer.registerMessageListener(this.listener); .build(), e);
try {
consumersManager.startConsumer(group);
}
catch (MQClientException e) {
logger.error(
"RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(),
e);
throw new RuntimeException("RocketMQ Consumer startup failed.", e);
} }
} }
@Override @Override
protected void doStop() { protected void doStop() {
consumersManager.stopConsumer(group); rocketMQListenerContainer.stop();
} }
public void setRetryTemplate(RetryTemplate retryTemplate) { public void setRetryTemplate(RetryTemplate retryTemplate) {
@@ -183,84 +136,21 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
this.recoveryCallback = recoveryCallback; this.recoveryCallback = recoveryCallback;
} }
protected class CloudStreamMessageListener implements MessageListener, RetryListener { protected class BindingRocketMQListener
implements RocketMQListener<Message>, RetryListener {
Acknowledgement consumeMessage(final List<MessageExt> msgs) { @Override
public void onMessage(Message message) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
try { if (enableRetry) {
if (enableRetry) { RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
return RocketMQInboundChannelAdapter.this.retryTemplate.execute( RocketMQInboundChannelAdapter.this.sendMessage(message);
(RetryCallback<Acknowledgement, Exception>) context -> doSendMsgs( return null;
msgs, context), }, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback);
new RecoveryCallback<Acknowledgement>() {
@Override
public Acknowledgement recover(RetryContext context)
throws Exception {
RocketMQInboundChannelAdapter.this.recoveryCallback
.recover(context);
if (ClassUtils.isAssignable(this.getClass(),
MessageListenerConcurrently.class)) {
return Acknowledgement
.buildConcurrentlyInstance();
}
else {
return Acknowledgement.buildOrderlyInstance();
}
}
});
}
else {
Acknowledgement result = doSendMsgs(msgs, null);
Optional.ofNullable(
RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
});
return result;
}
} }
catch (Exception e) { else {
logger.error( RocketMQInboundChannelAdapter.this.sendMessage(message);
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
Optional.ofNullable(
RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
});
} }
return null;
}
private Acknowledgement doSendMsgs(final List<MessageExt> msgs,
RetryContext context) {
List<Acknowledgement> acknowledgements = new ArrayList<>();
msgs.forEach(msg -> {
String retryInfo = context == null ? ""
: "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
logger.debug(retryInfo + "consuming msg:\n" + msg);
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
Acknowledgement acknowledgement = new Acknowledgement();
Message<byte[]> toChannel = convertMessagingFromRocketMQMsg(msg,
acknowledgement);
acknowledgements.add(acknowledgement);
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
});
return acknowledgements.get(0);
}
private Message convertMessagingFromRocketMQMsg(MessageExt msg,
Acknowledgement acknowledgement) {
return MessageBuilder.withPayload(msg.getBody())
.setHeaders(new RocketMQMessageHeaderAccessor()
.withAcknowledgment(acknowledgement).withTags(msg.getTags())
.withKeys(msg.getKeys()).withFlag(msg.getFlag())
.withRocketMessage(msg))
.build();
} }
@Override @Override
@@ -272,69 +162,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override @Override
public <T, E extends Throwable> void close(RetryContext context, public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) { RetryCallback<T, E> callback, Throwable throwable) {
if (throwable != null) {
Optional.ofNullable(
RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
});
}
else {
Optional.ofNullable(
RocketMQInboundChannelAdapter.this.instrumentationManager)
.ifPresent(manager -> {
manager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
});
}
} }
@Override @Override
public <T, E extends Throwable> void onError(RetryContext context, public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) { RetryCallback<T, E> callback, Throwable throwable) {
}
}
protected class CloudStreamMessageListenerConcurrently
extends CloudStreamMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
if (acknowledgement != null) {
context.setDelayLevelWhenNextConsume(
acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
}
else {
context.setDelayLevelWhenNextConsume(consumerProperties.getExtension()
.getError().getDelayLevelWhenNextConsume());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
if (acknowledgement != null) {
context.setSuspendCurrentQueueTimeMillis(
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
}
else {
context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension()
.getError().getSuspendCurrentQueueTimeMillis());
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
} }
} }

View File

@@ -16,122 +16,93 @@
package org.springframework.cloud.stream.binder.rocketmq.integration; package org.springframework.cloud.stream.binder.rocketmq.integration;
import org.apache.rocketmq.client.exception.MQBrokerException; import java.util.Optional;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle; import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MutableMessage; import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageHandler.class);
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
private InstrumentationManager instrumentationManager;
private LocalTransactionExecuter localTransactionExecuter;
private TransactionCheckListener transactionCheckListener;
private MessageChannel sendFailureChannel; private MessageChannel sendFailureChannel;
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties; private final RocketMQTemplate rocketMQTemplate;
private final Boolean transactional;
private final String destination; private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final String groupName;
private final InstrumentationManager instrumentationManager;
private boolean sync = false;
private volatile boolean running = false; private volatile boolean running = false;
public RocketMQMessageHandler(String destination, public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties, String groupName, Boolean transactional,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
this.rocketMQTemplate = rocketMQTemplate;
this.destination = destination; this.destination = destination;
this.producerProperties = producerProperties; this.groupName = groupName;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.transactional = transactional;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@Override @Override
public void start() { public void start() {
if (producerProperties.getExtension().getTransactional()) { if (!transactional) {
producer = new TransactionMQProducer(destination); instrumentationManager
if (transactionCheckListener != null) { .addHealthInstrumentation(new Instrumentation(destination));
((TransactionMQProducer) producer) try {
.setTransactionCheckListener(transactionCheckListener); rocketMQTemplate.afterPropertiesSet();
instrumentationManager.getHealthInstrumentation(destination)
.markStartedSuccessfully();
}
catch (Exception e) {
instrumentationManager.getHealthInstrumentation(destination)
.markStartFailed(e);
log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
.build(), e);
} }
}
else {
producer = new DefaultMQProducer(destination);
}
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
producerInstrumentation = manager.getProducerInstrumentation(destination);
manager.addHealthInstrumentation(producerInstrumentation);
});
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
if (producerProperties.getExtension().getMaxMessageSize() > 0) {
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
}
try {
producer.start();
Optional.ofNullable(producerInstrumentation)
.ifPresent(p -> p.markStartedSuccessfully());
}
catch (MQClientException e) {
Optional.ofNullable(producerInstrumentation)
.ifPresent(p -> p.markStartFailed(e));
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
} }
running = true; running = true;
} }
@Override @Override
public void stop() { public void stop() {
if (producer != null) { if (!transactional) {
producer.shutdown(); rocketMQTemplate.destroy();
} }
running = false; running = false;
} }
@@ -144,100 +115,95 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override @Override
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
throws Exception { throws Exception {
Message toSend = null;
try { try {
if (message.getPayload() instanceof byte[]) { final StringBuilder topicWithTags = new StringBuilder(destination);
toSend = new Message(destination, (byte[]) message.getPayload()); String tags = Optional
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
.toString();
if (!StringUtils.isEmpty(tags)) {
topicWithTags.append(":").append(tags);
} }
else if (message.getPayload() instanceof String) { SendResult sendRes = null;
toSend = new Message(destination, if (transactional) {
((String) message.getPayload()).getBytes()); sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
topicWithTags.toString(), message, message.getHeaders()
.get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
} }
else { else {
throw new UnsupportedOperationException("Payload class isn't supported: " int delayLevel = 0;
+ message.getPayload().getClass()); try {
} Object delayLevelObj = message.getHeaders()
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor( .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
message); if (delayLevelObj instanceof Number) {
headerAccessor.setLeaveMutable(true); delayLevel = ((Number) delayLevelObj).intValue();
toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel()); }
toSend.setTags(headerAccessor.getTags()); else if (delayLevelObj instanceof String) {
toSend.setKeys(headerAccessor.getKeys()); delayLevel = Integer.parseInt((String) delayLevelObj);
toSend.setFlag(headerAccessor.getFlag()); }
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties() }
.entrySet()) { catch (Exception e) {
toSend.putUserProperty(entry.getKey(), entry.getValue()); // ignore
} }
if (sync) {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
}
else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
}
SendResult sendRes; @Override
if (producerProperties.getExtension().getTransactional()) { public void onException(Throwable e) {
sendRes = producer.sendMessageInTransaction(toSend, log.error(
localTransactionExecuter, headerAccessor.getTransactionalArg()); "RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(
new MessagingException(
message, e),
null));
}
}
});
}
} }
else { if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
sendRes = producer.send(toSend);
}
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
if (getSendFailureChannel() != null) { if (getSendFailureChannel() != null) {
this.getSendFailureChannel().send(message); this.getSendFailureChannel().send(message);
} }
else { else {
throw new RocketMQSendFailureException(message, toSend, throw new MessagingException(message,
new MQClientException("message hasn't been sent", null)); new MQClientException("message hasn't been sent", null));
} }
} }
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
sendRes);
}
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
manager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
Instant.now().toEpochMilli());
});
Optional.ofNullable(producerInstrumentation).ifPresent(p -> p.markSent());
} }
catch (MQClientException | RemotingException | MQBrokerException catch (Exception e) {
| InterruptedException | UnsupportedOperationException e) { log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
Optional.ofNullable(producerInstrumentation)
.ifPresent(p -> p.markSentFailure());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
if (getSendFailureChannel() != null) { if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage( getSendFailureChannel().send(this.errorMessageStrategy
new RocketMQSendFailureException(message, toSend, e), null)); .buildErrorMessage(new MessagingException(message, e), null));
} }
else { else {
throw new RocketMQSendFailureException(message, toSend, e); throw new MessagingException(message, e);
} }
} }
} }
/**
* Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in
* {@link DefaultMQProducer#sendMessageInTransaction}.
* @param localTransactionExecuter the executer running when produce msg.
*/
public void setLocalTransactionExecuter(
LocalTransactionExecuter localTransactionExecuter) {
this.localTransactionExecuter = localTransactionExecuter;
}
/**
* Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in
* {@link TransactionMQProducer#setTransactionCheckListener}.
* @param transactionCheckListener the listener set in {@link TransactionMQProducer}.
*/
public void setTransactionCheckListener(
TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
/** /**
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent * Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link RocketMQSendFailureException} with the * to this channel with a payload of a {@link MessagingException} with the failed
* failed message and cause. * message and cause.
* @param sendFailureChannel the failure channel. * @param sendFailureChannel the failure channel.
* @since 0.2.2 * @since 0.2.2
*/ */
@@ -259,4 +225,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public MessageChannel getSendFailureChannel() { public MessageChannel getSendFailureChannel() {
return sendFailureChannel; return sendFailureChannel;
} }
public void setSync(boolean sync) {
this.sync = sync;
}
} }

View File

@@ -1,33 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumerGroupInstrumentation extends Instrumentation {
private MetricRegistry metricRegistry;
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
super(name);
this.metricRegistry = metricRegistry;
}
}

View File

@@ -1,60 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumerInstrumentation extends Instrumentation {
private final Counter totalConsumed;
private final Counter totalConsumedFailures;
private final Meter consumedPerSecond;
private final Meter consumedFailuresPerSecond;
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalConsumed = registry
.counter(name(baseMetricName, Consumer.TOTAL_CONSUMED));
this.consumedPerSecond = registry
.meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND));
this.totalConsumedFailures = registry
.counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES));
this.consumedFailuresPerSecond = registry
.meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND));
}
public void markConsumed() {
totalConsumed.inc();
consumedPerSecond.mark();
}
public void markConsumedFailure() {
totalConsumedFailures.inc();
consumedFailuresPerSecond.mark();
}
}

View File

@@ -27,7 +27,7 @@ public class Instrumentation {
protected final AtomicBoolean started = new AtomicBoolean(false); protected final AtomicBoolean started = new AtomicBoolean(false);
protected Exception startException = null; protected Exception startException = null;
Instrumentation(String name) { public Instrumentation(String name) {
this.name = name; this.name = name;
} }

View File

@@ -22,47 +22,16 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
import com.codahale.metrics.MetricRegistry;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class InstrumentationManager { public class InstrumentationManager {
private final MetricRegistry metricRegistry = new MetricRegistry();
private final Map<String, Object> runtime = new ConcurrentHashMap<>(); private final Map<String, Object> runtime = new ConcurrentHashMap<>();
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>(); private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = Producer.PREFIX + destination;
producerInstrumentations.putIfAbsent(key,
new ProducerInstrumentation(metricRegistry, key));
return producerInstrumentations.get(key);
}
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
String key = Consumer.PREFIX + destination;
consumeInstrumentations.putIfAbsent(key,
new ConsumerInstrumentation(metricRegistry, key));
return consumeInstrumentations.get(key);
}
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
String key = Consumer.GROUP_PREFIX + group;
consumerGroupsInstrumentations.putIfAbsent(key,
new ConsumerGroupInstrumentation(metricRegistry, key));
return consumerGroupsInstrumentations.get(key);
}
public Set<Instrumentation> getHealthInstrumentations() { public Set<Instrumentation> getHealthInstrumentations() {
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue) return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
@@ -72,11 +41,12 @@ public class InstrumentationManager {
healthInstrumentations.put(instrumentation.getName(), instrumentation); healthInstrumentations.put(instrumentation.getName(), instrumentation);
} }
public Instrumentation getHealthInstrumentation(String key) {
return healthInstrumentations.get(key);
}
public Map<String, Object> getRuntime() { public Map<String, Object> getRuntime() {
return runtime; return runtime;
} }
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
} }

View File

@@ -1,59 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ProducerInstrumentation extends Instrumentation {
private final Counter totalSent;
private final Counter totalSentFailures;
private final Meter sentPerSecond;
private final Meter sentFailuresPerSecond;
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
this.totalSentFailures = registry
.counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
this.sentPerSecond = registry
.meter(name(baseMetricName, Producer.SENT_PER_SECOND));
this.sentFailuresPerSecond = registry
.meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
}
public void markSent() {
totalSent.inc();
sentPerSecond.mark();
}
public void markSentFailure() {
totalSentFailures.inc();
sentFailuresPerSecond.mark();
}
}

View File

@@ -16,7 +16,9 @@
package org.springframework.cloud.stream.binder.rocketmq.properties; package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
/** /**
* @author Timur Valiev * @author Timur Valiev
@@ -25,24 +27,69 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties { public class RocketMQBinderConfigurationProperties {
private String namesrvAddr = "127.0.0.1:9876"; /**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
private String logLevel = "ERROR"; /**
* The property of "access-key".
*/
private String accessKey;
public String getNamesrvAddr() { /**
return namesrvAddr; * The property of "secret-key".
*/
private String secretKey;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = true;
/**
* The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
public String getNameServer() {
return nameServer;
} }
public void setNamesrvAddr(String namesrvAddr) { public void setNameServer(String nameServer) {
this.namesrvAddr = namesrvAddr; this.nameServer = nameServer;
} }
public String getLogLevel() { public String getAccessKey() {
return logLevel; return accessKey;
} }
public void setLogLevel(String logLevel) { public void setAccessKey(String accessKey) {
this.logLevel = logLevel; this.accessKey = accessKey;
} }
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
} }

View File

@@ -50,42 +50,18 @@ public class RocketMQConsumerProperties {
*/ */
private Boolean orderly = false; private Boolean orderly = false;
/**
* for concurrently listener. message consume retry strategy
*/
private int delayLevelWhenNextConsume = 0;
/**
* for orderly listener. next retry delay time
*/
private long suspendCurrentQueueTimeMillis = 1000;
private Boolean enabled = true; private Boolean enabled = true;
private Error error;
public static class Error {
/**
* Reconsume later timeMillis in ConsumeOrderlyContext.
*/
private Long suspendCurrentQueueTimeMillis = 1000L;
/**
* Message consume retry strategy in ConsumeConcurrentlyContext.
*
* -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client
* control retry frequency
*/
private Integer delayLevelWhenNextConsume = 0;
public Long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public Integer getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
}
public String getTags() { public String getTags() {
return tags; return tags;
} }
@@ -126,11 +102,19 @@ public class RocketMQConsumerProperties {
this.broadcasting = broadcasting; this.broadcasting = broadcasting;
} }
public Error getError() { public int getDelayLevelWhenNextConsume() {
return error; return delayLevelWhenNextConsume;
} }
public void setError(Error error) { public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
this.error = error; this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
} }
} }

View File

@@ -17,8 +17,6 @@
package org.springframework.cloud.stream.binder.rocketmq.properties; package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
/** /**
* @author Timur Valiev * @author Timur Valiev
@@ -29,24 +27,62 @@ public class RocketMQProducerProperties {
private Boolean enabled = true; private Boolean enabled = true;
/** /**
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize} * Name of producer.
*/ */
private Integer maxMessageSize = 0; private String group;
/**
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}.
*/
private Integer maxMessageSize = 1024 * 1024 * 4;
private Boolean transactional = false; private Boolean transactional = false;
/** private Boolean sync = false;
* full class name of {@link LocalTransactionExecuter}
*/
private String executer;
/**
* full class name of {@link TransactionCheckListener}
*/
private String transactionCheckListener;
private Boolean vipChannelEnabled = true; private Boolean vipChannelEnabled = true;
/**
* Millis of send message timeout.
*/
private int sendMessageTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be
* compressed on default.
*/
private int compressMessageBodyThreshold = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in
* synchronous mode. This may potentially cause message duplication which is up to
* application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* <p>
* Maximum number of retry to perform internally before claiming sending failure in
* asynchronous mode.
* </p>
* This may potentially cause message duplication which is up to application
* developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryNextServer = false;
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public Boolean getEnabled() { public Boolean getEnabled() {
return enabled; return enabled;
} }
@@ -71,20 +107,12 @@ public class RocketMQProducerProperties {
this.transactional = transactional; this.transactional = transactional;
} }
public String getExecuter() { public Boolean getSync() {
return executer; return sync;
} }
public void setExecuter(String executer) { public void setSync(Boolean sync) {
this.executer = executer; this.sync = sync;
}
public String getTransactionCheckListener() {
return transactionCheckListener;
}
public void setTransactionCheckListener(String transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
} }
public Boolean getVipChannelEnabled() { public Boolean getVipChannelEnabled() {
@@ -94,4 +122,45 @@ public class RocketMQProducerProperties {
public void setVipChannelEnabled(Boolean vipChannelEnabled) { public void setVipChannelEnabled(Boolean vipChannelEnabled) {
this.vipChannelEnabled = vipChannelEnabled; this.vipChannelEnabled = vipChannelEnabled;
} }
}
public int getSendMessageTimeout() {
return sendMessageTimeout;
}
public void setSendMessageTimeout(int sendMessageTimeout) {
this.sendMessageTimeout = sendMessageTimeout;
}
public int getCompressMessageBodyThreshold() {
return compressMessageBodyThreshold;
}
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
this.compressMessageBodyThreshold = compressMessageBodyThreshold;
}
public int getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
public int getRetryTimesWhenSendAsyncFailed() {
return retryTimesWhenSendAsyncFailed;
}
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public boolean isRetryNextServer() {
return retryNextServer;
}
public void setRetryNextServer(boolean retryNextServer) {
this.retryNextServer = retryNextServer;
}
}

View File

@@ -18,8 +18,6 @@ package org.springframework.cloud.stream.binder.rocketmq.provisioning;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
@@ -36,9 +34,6 @@ import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
public class RocketMQTopicProvisioner implements public class RocketMQTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>> { ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory
.getLogger(RocketMQTopicProvisioner.class);
@Override @Override
public ProducerDestination provisionProducerDestination(String name, public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties> properties) ExtendedProducerProperties<RocketMQProducerProperties> properties)

View File

@@ -1,2 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration org.springframework.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration

View File

@@ -22,7 +22,6 @@ import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
@@ -33,8 +32,7 @@ public class RocketMQAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner() private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration( .withConfiguration(
AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class, AutoConfigurations.of(RocketMQBinderAutoConfiguration.class))
RocketMQBinderAutoConfiguration.class))
.withPropertyValues( .withPropertyValues(
"spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876", "spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
"spring.cloud.stream.bindings.output.destination=TopicOrderTest", "spring.cloud.stream.bindings.output.destination=TopicOrderTest",
@@ -55,7 +53,7 @@ public class RocketMQAutoConfigurationTests {
this.contextRunner.run(context -> { this.contextRunner.run(context -> {
RocketMQBinderConfigurationProperties binderConfigurationProperties = context RocketMQBinderConfigurationProperties binderConfigurationProperties = context
.getBean(RocketMQBinderConfigurationProperties.class); .getBean(RocketMQBinderConfigurationProperties.class);
assertThat(binderConfigurationProperties.getNamesrvAddr()) assertThat(binderConfigurationProperties.getNameServer())
.isEqualTo("127.0.0.1:9876"); .isEqualTo("127.0.0.1:9876");
RocketMQExtendedBindingProperties bindingProperties = context RocketMQExtendedBindingProperties bindingProperties = context
.getBean(RocketMQExtendedBindingProperties.class); .getBean(RocketMQExtendedBindingProperties.class);