1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00
This commit is contained in:
fangjian0423 2019-02-28 15:30:12 +08:00
parent 509d4819b3
commit 5b2d068f7d
14 changed files with 552 additions and 303 deletions

View File

@ -112,139 +112,165 @@ messageChannel.send(MessageBuilder.withPayload("simple msg").build());
这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。 这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。
**Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。** **Spring Cloud Stream 底层基于这段代码去做了各种抽象。**
### Spring Cloud Alibaba RocketMQ Binder 实现原理
.RocketMQ Binder处理流程
image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[]
RocketMQ Binder 的核心主要就是这3个类`RocketMQMessageChannelBinder``RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 ### 如何使用 Spring Cloud Alibaba RocketMQ Binder ###
`RocketMQMessageChannelBinder` 是个标准的 Binder 实现,其内部构建 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 如果要在您的项目中引入 RocketMQ Binder需要引入如下 maven 依赖:
`RocketMQMessageHandler` 用于 RocketMQ `Producer` 的启动以及消息的发送,其内部会根据 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类,去创建 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`。 ```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
```
在构造 `org.apache.rocketmq.common.message.Message` 的过程中会根据 `org.springframework.messaging.Message` 的 Header 构造成 `RocketMQMessageHeaderAccessor`。然后再根据 `RocketMQMessageHeaderAccessor` 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message` 中。 或者可以使用 Spring Cloud Stream RocketMQ Starter
`RocketMQInboundChannelAdapter` 用于 RocketMQ `Consumer` 的启动以及消息的接收。其内部还支持 https://github.com/spring-projects/spring-retry[spring-retry] 的使用。 ```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
在消费消息的时候可以从 Header 中获取 `Acknowledgement` 并进行一些设置。 ### Spring Cloud Alibaba RocketMQ Binder 实现
比如使用 `MessageListenerConcurrently` 进行异步消费的时候,可以设置延迟消费: RocketMQ Binder 的实现依赖于 https://github.com/apache/rocketmq-spring[RocketMQ-Spring] 框架。
RocketMQ-Spring 框架是 RocketMQ 与 Spring Boot 的整合RocketMQ Spring 主要提供了 3 个特性:
1. 使用 `RocketMQTemplate` 用来统一发送消息,包括同步、异步发送消息和事务消息
2. `@RocketMQTransactionListener` 注解用来处理事务消息的监听和回查
3. `@RocketMQMessageListener` 注解用来消费消息
RocketMQ Binder 的核心类 RocketMQMessageChannelBinder 实现了 Spring Cloud Stream 规范,内部构建会 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。
`RocketMQMessageHandler` 会基于 Binding 配置构造 `RocketMQTemplate``RocketMQTemplate` 内部会把 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类转换成 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`,然后发送出去。
`RocketMQInboundChannelAdapter` 也会基于 Binding 配置构造 `RocketMQListenerBindingContainer``RocketMQListenerBindingContainer` 内部会启动 RocketMQ `Consumer` 接收消息。
NOTE: 在使用 RocketMQ Binder 的同时也可以配置 rocketmq.** 用于触发 RocketMQ Spring 相关的 AutoConfiguration
`RocketMQHeaders` 中定义了很多 Header 常量,在发送消息的时候可以设置到 Spring Message 的 Header 中,用于触发 RocketMQ 相关的 feature
```java ```java
@StreamListener("input") MessageBuilder builder = MessageBuilder.withPayload(msg)
public void receive(Message message) { .setHeader(RocketMQHeaders.TAGS, "binder")
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); .setHeader(RocketMQHeaders.MESSAGE_ID, "my-msg-id")
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); .setHeader("DELAY", "1");
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); Message message = builder.build();
acknowledgement.setConsumeConcurrentlyDelayLevel(1); output().send(message);
}
``` ```
比如使用 `MessageListenerOrderly` 进行顺序消费的时候,可以设置延迟消费: ### 配置选项
```java #### RocketMQ Binder Properties
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
```
Provider端支持的配置 spring.cloud.stream.rocketmq.binder.name-server::
RocketMQ NameServer 地址。
+
Default: `127.0.0.1:9876`.
spring.cloud.stream.rocketmq.binder.access-key::
阿里云账号 AccessKey。
+
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key::
阿里云账号 SecretKey。
+
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
是否为 Producer 和 Consumer 开启消息轨迹功能
+
Default: `true`.
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
消息轨迹开启后存储的 topic 名称。
+
Default: `RMQ_SYS_TRACE_TOPIC`.
:frame: topbot
[width="60%",options="header"]
|====
^|配置项 ^|含义 ^| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效RocketMQ 默认值为4M = 1024 * 1024 * 4)
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`|
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`|
|====
Consumer端支持的配置 #### RocketMQ Consumer Properties
:frame: topbot 下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.` 为前缀的 RocketMQ Consumer 相关的配置。
[width="60%",options="header"]
|====
^|配置项 ^|含义| 默认值
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤订阅所有消息)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容sql的优先级更高)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false
|====
### Endpoint支持 enable::
是否启用 Consumer。
+
默认值: `true`.
tags::
Consumer 基于 TAGS 订阅,多个 tag 以 `||` 分割。
+
默认值: empty.
sql::
Consumer 基于 SQL 订阅。
+
默认值: empty.
broadcasting::
Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式。
+
默认值: `false`.
orderly::
Consumer 是否同步消费消息模式。
+
默认值: `false`.
delayLevelWhenNextConsume::
异步消费消息模式下消费失败重试策略:
* -1,不重复,直接放入死信队列
* 0,broker 控制重试策略
* >0,client 控制重试策略
+
默认值: `0`.
suspendCurrentQueueTimeMillis::
同步消费消息模式下消费失败后再次消费的时间间隔。
+
默认值: `1000`.
在使用Endpoint特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。 #### RocketMQ Provider Properties
* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的 endpoint 路径为 `/rocketmq_binder` 下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.` 为前缀的 RocketMQ Producer 相关的配置。
* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的 endpoint 路径为 `/actuator/rocketmq-binder`
Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。 enable::
是否启用 Producer。
```json +
{ 默认值: `true`.
"runtime": { group::
"lastSend.timestamp": 1542786623915 Producer group name。
}, +
"metrics": { 默认值: empty.
"scs-rocketmq.consumer.test-topic.totalConsumed": { maxMessageSize::
"count": 11 消息发送的最大字节数。
}, +
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": { 默认值: `8249344`.
"count": 0 transactional::
}, 是否发送事务消息。
"scs-rocketmq.producer.test-topic.totalSentFailures": { +
"count": 0 默认值: `false`.
}, sync::
"scs-rocketmq.consumer.test-topic.consumedPerSecond": { 是否使用同步得方式发送消息。
"count": 11, +
"fifteenMinuteRate": 0.012163847780107841, 默认值: `false`.
"fiveMinuteRate": 0.03614605351360527, vipChannelEnabled::
"meanRate": 0.3493213353657594, 是否在 Vip Channel 上发送消息。
"oneMinuteRate": 0.17099243039490175 +
}, 默认值: `true`.
"scs-rocketmq.producer.test-topic.totalSent": { sendMessageTimeout::
"count": 5 发送消息的超时时间(毫秒)。
}, +
"scs-rocketmq.producer.test-topic.sentPerSecond": { 默认值: `3000`.
"count": 5, compressMessageBodyThreshold::
"fifteenMinuteRate": 0.005540151995103271, 消息体压缩阀值(当消息体超过 4k 的时候会被压缩)。
"fiveMinuteRate": 0.01652854617838251, +
"meanRate": 0.10697493212602836, 默认值: `4096`.
"oneMinuteRate": 0.07995558537067671 retryTimesWhenSendFailed::
}, 在同步发送消息的模式下,消息发送失败的重试次数。
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { +
"count": 0, 默认值: `2`.
"fifteenMinuteRate": 0.0, retryTimesWhenSendAsyncFailed::
"fiveMinuteRate": 0.0, 在异步发送消息的模式下,消息发送失败的重试次数。
"meanRate": 0.0, +
"oneMinuteRate": 0.0 默认值: `2`.
}, retryNextServer::
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { 消息发送失败的情况下是否重试其它的 broker。
"count": 0, +
"fifteenMinuteRate": 0.0, 默认值: `false`.
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
```
注意要想查看统计数据需要在pom里加上 https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core依赖]。如若不加endpoint 将会显示 warning 信息而不会显示统计信息:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```

View File

@ -114,137 +114,162 @@ All the message types in this code are provided by the `spring-messaging`module.
**The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.** **The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.**
### How to use Spring Cloud Alibaba RocketMQ Binder ###
For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
```
Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
### How Spring Cloud Alibaba RocketMQ Binder Works ### How Spring Cloud Alibaba RocketMQ Binder Works
.RocketMQ Binder Workflow The implementation of RocketMQ Binder depend on the https://github.com/apache/rocketmq-spring[RocketMQ-Spring] framework.
image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[]
RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:
The core of RocketMQ Binder are the 3 classes below: `RocketMQMessageChannelBinder``RocketMQInboundChannelAdapter` and `RocketMQMessageHandler`. 1. `RocketMQTemplate`: Sending messages, including synchronous, asynchronous, and transactional messages.
2. `@RocketMQTransactionListener`: Listen and check for transaction messages.
3. `@RocketMQMessageListener`: Consume messages.
`RocketMQMessageChannelBinder` is a standard implementation of Binder. It contains `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` as its internal constructions. `RocketMQMessageChannelBinder` is a standard implementation of Binder, it will build `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` internally.
`RocketMQMessageHandler` is used to start RocketMQ `Producer` and send messages. It creates message type of RocketMQ `org.apache.rocketmq.common.message.Message` based on the message type of `org.springframework.messaging.Message` in the `spring-messaging` module. `RocketMQMessageHandler` will construct `RocketMQTemplate` based on the Binding configuration. `RocketMQTemplate` will convert the `org.springframework.messaging.Message` message class of `spring-messaging` module to the RocketMQ message class `org.apache.rocketmq.common .message.Message` internally, then send it out.
When constructing `org.apache.rocketmq.common.message.Message`, it constructs `RocketMQMessageHeaderAccessor` based on the header of `org.springframework.messaging.Message`. Then, based on some of the properties in `RocketMQMessageHeaderAccessor` , it sets some of message attributes such as tags, keys, and flag in `org.apache.rocketmq.common.message.Message` of RocketMQ. `RocketMQInboundChannelAdapter` will also construct `RocketMQListenerBindingContainer` based on the Binding configuration, and `RocketMQListenerBindingContainer` will start the RocketMQ `Consumer` to receive the messages.
`RocketMQInboundChannelAdapter` is used to start RocketMQ `Consumer` and receive messages. It also support the usage of https://github.com/spring-projects/spring-retry[spring-retry]. NOTE: RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration
You can also obtain `Acknowledgement` from the Header and make some configurations. The headers defined in `RocketMQHeaders`, which can be set to the header of spring message when sending a message to trigger the RocketMQ related feature:
For example, you can set delayed message consumption when `MessageListenerConcurrently` is used for asynchronous message consumption:
```java ```java
@StreamListener("input") MessageBuilder builder = MessageBuilder.withPayload(msg)
public void receive(Message message) { .setHeader(RocketMQHeaders.TAGS, "binder")
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); .setHeader(RocketMQHeaders.MESSAGE_ID, "my-msg-id")
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); .setHeader("DELAY", "1");
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); Message message = builder.build();
acknowledgement.setConsumeConcurrentlyDelayLevel(1); output().send(message);
}
``` ```
You can also set delayed message consumption when `MessageListenerOrderly` is used for consuming ordered messages. ### Configuration Options
```java #### RocketMQ Binder Properties
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
```
Supported Configurations of Provider: spring.cloud.stream.rocketmq.binder.name-server::
The name server of RocketMQ Server.
+
Default: `127.0.0.1:9876`.
spring.cloud.stream.rocketmq.binder.access-key::
The AccessKey of Alibaba Cloud Account.
+
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key::
The SecretKey of Alibaba Cloud Account.
+
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
Enable Message Trace feature for all producers and consumers.
+
Default: `true`.
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
The trace topic for message trace.
+
Default: `RMQ_SYS_TRACE_TOPIC`.
:frame: topbot
[width="60%",options="header"]
|====
^|Configuration ^|Description ^| Default Value
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|Whether to use producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|Maximum bytes of messages sent|0(Take effect only when its bigger than 0. The default value of RocketMQ is 4M = 1024 * 1024 * 4)
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|Whether to use `TransactionMQProducer` to send transaction messages|false
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.LocalTransactionExecuter` For example, `org.test.MyExecuter`|
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.TransactionCheckListener` For example, `org.test.MyTransactionCheckListener`|
|====
Supported Configurations of Consumer: #### RocketMQ Consumer Properties
:frame: topbot The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.`.
[width="60%",options="header"]
|====
^|Configuration ^|Description| Default Value
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|Whether to use consumer|true
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer will only subscribe to messages with these tags Tags are separated by "\|\|" (If not specified, it means the consumer subscribes to all messages)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer subscribes to the messages as requested in the SQL(If tags are also specified, SQL has a higher priority than tags.)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|If the consumer uses the broadcasting mode|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|Ordered message consumption or asychronous consumption|false
|====
### Endpoint Support enable::
Enable Consumer Binding.
+
Default: `true`.
tags::
Consumer subscription tags expression, tags split by `||`.
+
Default: empty.
sql::
Consumer subscription sql expression.
+
Default: empty.
broadcasting::
Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
+
Default: `false`.
orderly::
Receiving message concurrently or orderly.
+
Default: `false`.
delayLevelWhenNextConsume::
Message consume retry strategy for concurrently consume:
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
+
Default: `0`.
suspendCurrentQueueTimeMillis::
Time interval of message consume retry for orderly consume.
+
Default: `1000`.
Before you use the Endpoint feature, please add the `spring-boot-starter-actuator` dependency in Maven, and enable access of Endpoints in your configuration. #### RocketMQ Provider Properties
* Add `management.security.enabled=false`in Spring Boot 1.x. The exposed endpoint path is `/rocketmq_binder` The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.`.
* Add `management.endpoints.web.exposure.include=*`in Spring Boot 2.x. The exposed endpoint path is `/actuator/rocketmq-binder`
Endpoint will collects data about the last message that is sent, the number of successes or failures of message sending, and the number of successes of failures of message consumption. enable::
Enable Producer Binding.
```json +
{ Default: `true`.
"runtime": { group::
"lastSend.timestamp": 1542786623915 Producer group name.
}, +
"metrics": { Default: empty.
"scs-rocketmq.consumer.test-topic.totalConsumed": { maxMessageSize::
"count": 11 Maximum allowed message size in bytes.
}, +
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": { Default: `8249344`.
"count": 0 transactional::
}, Send Transactional Message.
"scs-rocketmq.producer.test-topic.totalSentFailures": { +
"count": 0 Default: `false`.
}, sync::
"scs-rocketmq.consumer.test-topic.consumedPerSecond": { Send message in synchronous mode.
"count": 11, +
"fifteenMinuteRate": 0.012163847780107841, Default: `false`.
"fiveMinuteRate": 0.03614605351360527, vipChannelEnabled::
"meanRate": 0.3493213353657594, Send message with vip channel.
"oneMinuteRate": 0.17099243039490175 +
}, Default: `true`.
"scs-rocketmq.producer.test-topic.totalSent": { sendMessageTimeout::
"count": 5 Millis of send message timeout.
}, +
"scs-rocketmq.producer.test-topic.sentPerSecond": { Default: `3000`.
"count": 5, compressMessageBodyThreshold::
"fifteenMinuteRate": 0.005540151995103271, Compress message body threshold, namely, message body larger than 4k will be compressed on default.
"fiveMinuteRate": 0.01652854617838251, +
"meanRate": 0.10697493212602836, Default: `4096`.
"oneMinuteRate": 0.07995558537067671 retryTimesWhenSendFailed::
}, Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { +
"count": 0, Default: `2`.
"fifteenMinuteRate": 0.0, retryTimesWhenSendAsyncFailed::
"fiveMinuteRate": 0.0, Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
"meanRate": 0.0, +
"oneMinuteRate": 0.0 Default: `2`.
}, retryNextServer::
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { Indicate whether to retry another broker on sending failure internally.
"count": 0, +
"fifteenMinuteRate": 0.0, Default: `false`.
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
```
Note To view statistics, add the https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core dependency] in POM. If not added, the endpoint will return warning instead of statistics:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```

View File

@ -0,0 +1,72 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBinderUtils {
public static RocketMQBinderConfigurationProperties mergeProducerProperties(
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQProperties rocketMQProperties) {
RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties();
if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) {
result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
}
else {
result.setNameServer(rocketMQProperties.getNameServer());
}
if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey());
}
else {
result.setAccessKey(rocketMQProperties.getProducer().getAccessKey());
}
if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) {
result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey());
}
else {
result.setSecretKey(rocketMQProperties.getProducer().getSecretKey());
}
if (rocketMQProperties.getProducer() == null || StringUtils
.isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) {
result.setCustomizedTraceTopic(
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
}
else {
result.setCustomizedTraceTopic(
rocketMQProperties.getProducer().getCustomizedTraceTopic());
}
if (rocketMQProperties.getProducer() != null
&& rocketMQProperties.getProducer().isEnableMsgTrace()) {
result.setEnableMsgTrace(Boolean.TRUE);
}
else {
result.setEnableMsgTrace(
rocketBinderConfigurationProperties.isEnableMsgTrace());
}
return result;
}
}

View File

@ -21,10 +21,13 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger; import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@ -44,6 +47,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer; import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -55,12 +59,10 @@ public class RocketMQMessageChannelBinder extends
implements implements
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory
.getLogger(RocketMQMessageChannelBinder.class);
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties();
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final RocketMQProperties rocketMQProperties;
private final InstrumentationManager instrumentationManager; private final InstrumentationManager instrumentationManager;
private Set<String> clientConfigId = new HashSet<>(); private Set<String> clientConfigId = new HashSet<>();
@ -69,10 +71,12 @@ public class RocketMQMessageChannelBinder extends
public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQProperties rocketMQProperties,
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
super(null, provisioningProvider); super(null, provisioningProvider);
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQProperties = rocketMQProperties;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@ -82,6 +86,10 @@ public class RocketMQMessageChannelBinder extends
MessageChannel errorChannel) throws Exception { MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) { if (producerProperties.getExtension().getEnabled()) {
RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
.mergeProducerProperties(rocketBinderConfigurationProperties,
rocketMQProperties);
RocketMQTemplate rocketMQTemplate; RocketMQTemplate rocketMQTemplate;
if (producerProperties.getExtension().getTransactional()) { if (producerProperties.getExtension().getTransactional()) {
Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory() Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
@ -101,9 +109,27 @@ public class RocketMQMessageChannelBinder extends
rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setObjectMapper(this.getApplicationContext() rocketMQTemplate.setObjectMapper(this.getApplicationContext()
.getBeansOfType(ObjectMapper.class).values().iterator().next()); .getBeansOfType(ObjectMapper.class).values().iterator().next());
DefaultMQProducer producer = new DefaultMQProducer(destination.getName()); DefaultMQProducer producer;
producer.setNamesrvAddr( String ak = mergedProperties.getAccessKey();
rocketBinderConfigurationProperties.getNamesrvAddr()); String sk = mergedProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ak, sk));
producer = new DefaultMQProducer(
producerProperties.getExtension().getGroup(), rpcHook,
mergedProperties.isEnableMsgTrace(),
mergedProperties.getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
producer.setInstanceName(
RocketMQUtil.getInstanceName(rpcHook, destination.getName()));
}
else {
producer = new DefaultMQProducer(
producerProperties.getExtension().getGroup());
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
}
producer.setNamesrvAddr(mergedProperties.getNameServer());
producer.setSendMsgTimeout( producer.setSendMsgTimeout(
producerProperties.getExtension().getSendMessageTimeout()); producerProperties.getExtension().getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed( producer.setRetryTimesWhenSendFailed(
@ -116,14 +142,13 @@ public class RocketMQMessageChannelBinder extends
producerProperties.getExtension().isRetryNextServer()); producerProperties.getExtension().isRetryNextServer());
producer.setMaxMessageSize( producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize()); producerProperties.getExtension().getMaxMessageSize());
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
rocketMQTemplate.setProducer(producer); rocketMQTemplate.setProducer(producer);
clientConfigId.add(producer.buildMQClientId()); clientConfigId.add(producer.buildMQClientId());
} }
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), rocketMQTemplate, destination.getName(),
producerProperties.getExtension().getGroup(),
producerProperties.getExtension().getTransactional(), producerProperties.getExtension().getTransactional(),
instrumentationManager); instrumentationManager);
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
@ -160,7 +185,7 @@ public class RocketMQMessageChannelBinder extends
listenerContainer.setDelayLevelWhenNextConsume( listenerContainer.setDelayLevelWhenNextConsume(
consumerProperties.getExtension().getDelayLevelWhenNextConsume()); consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNamesrvAddr()); .setNameServer(rocketBinderConfigurationProperties.getNameServer());
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
listenerContainer, consumerProperties, instrumentationManager); listenerContainer, consumerProperties, instrumentationManager);

View File

@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.rocketmq.config; package org.springframework.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@ -39,7 +41,8 @@ import io.micrometer.core.instrument.binder.MeterBinder;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@Configuration @Configuration
@Import(RocketMQBinderHealthIndicatorAutoConfiguration.class) @Import({ RocketMQAutoConfiguration.class,
RocketMQBinderHealthIndicatorAutoConfiguration.class })
@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, @EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
RocketMQExtendedBindingProperties.class }) RocketMQExtendedBindingProperties.class })
public class RocketMQBinderAutoConfiguration { public class RocketMQBinderAutoConfiguration {
@ -48,6 +51,9 @@ public class RocketMQBinderAutoConfiguration {
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired(required = false)
private RocketMQProperties rocketMQProperties = new RocketMQProperties();
@Autowired @Autowired
public RocketMQBinderAutoConfiguration( public RocketMQBinderAutoConfiguration(
RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQExtendedBindingProperties extendedBindingProperties,
@ -67,7 +73,8 @@ public class RocketMQBinderAutoConfiguration {
InstrumentationManager instrumentationManager) { InstrumentationManager instrumentationManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
provisioningProvider, extendedBindingProperties, provisioningProvider, extendedBindingProperties,
rocketBinderConfigurationProperties, instrumentationManager); rocketBinderConfigurationProperties, rocketMQProperties,
instrumentationManager);
binder.setExtendedBindingProperties(extendedBindingProperties); binder.setExtendedBindingProperties(extendedBindingProperties);
return binder; return binder;
} }

View File

@ -16,11 +16,10 @@
package org.springframework.cloud.stream.binder.rocketmq.config; package org.springframework.cloud.stream.binder.rocketmq.config;
import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils; import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor; import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry; import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
@ -32,7 +31,7 @@ import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.util.Assert; import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -51,48 +50,34 @@ public class RocketMQComponent4BinderAutoConfiguration {
} }
@Bean @Bean
@ConditionalOnMissingBean(DefaultMQProducer.class)
public DefaultMQProducer defaultMQProducer() { public DefaultMQProducer defaultMQProducer() {
RocketMQProperties rocketMQProperties = new RocketMQProperties(); DefaultMQProducer producer;
String configNameServer = environment String configNameServer = environment.resolveRequiredPlaceholders(
.getProperty("spring.cloud.stream.rocketmq.binder.namesrv-addr"); "${spring.cloud.stream.rocketmq.binder.name-server:${rocketmq.producer.name-server:}}");
if (StringUtils.isEmpty(configNameServer)) { String ak = environment.resolveRequiredPlaceholders(
rocketMQProperties.setNameServer(RocketMQBinderConstants.DEFAULT_NAME_SERVER); "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}");
String sk = environment.resolveRequiredPlaceholders(
"${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}");
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP,
new AclClientRPCHook(new SessionCredentials(ak, sk)));
producer.setVipChannelEnabled(false);
} }
else { else {
rocketMQProperties.setNameServer(configNameServer); producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP);
} }
RocketMQProperties.Producer producerConfig = new Producer(); producer.setNamesrvAddr(configNameServer);
rocketMQProperties.setProducer(producerConfig);
producerConfig.setGroup(RocketMQBinderConstants.DEFAULT_GROUP);
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(
producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(
producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(
producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer; return producer;
} }
@Bean(destroyMethod = "destroy") @Bean(destroyMethod = "destroy")
@ConditionalOnBean(DefaultMQProducer.class) @ConditionalOnMissingBean
@ConditionalOnMissingBean(RocketMQTemplate.class)
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
ObjectMapper rocketMQMessageObjectMapper) { ObjectMapper objectMapper) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper); rocketMQTemplate.setObjectMapper(objectMapper);
return rocketMQTemplate; return rocketMQTemplate;
} }

View File

@ -19,6 +19,8 @@ package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@ -27,8 +29,10 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.annotation.SelectorType;
@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -84,6 +89,7 @@ public class RocketMQListenerBindingContainer
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
// The following properties came from RocketMQConsumerProperties. // The following properties came from RocketMQConsumerProperties.
private ConsumeMode consumeMode; private ConsumeMode consumeMode;
@ -93,8 +99,10 @@ public class RocketMQListenerBindingContainer
public RocketMQListenerBindingContainer( public RocketMQListenerBindingContainer(
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
this.rocketMQConsumerProperties = rocketMQConsumerProperties; this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
? ConsumeMode.ORDERLY ? ConsumeMode.ORDERLY
@ -190,7 +198,23 @@ public class RocketMQListenerBindingContainer
Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required"); Assert.notNull(topic, "Property 'topic' is required");
consumer = new DefaultMQPushConsumer(consumerGroup); String ak = rocketBinderConfigurationProperties.getAccessKey();
String sk = rocketBinderConfigurationProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
new AllocateMessageQueueAveragely(),
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic));
consumer.setVipChannelEnabled(false);
}
else {
consumer = new DefaultMQPushConsumer(consumerGroup,
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
}
consumer.setNamesrvAddr(nameServer); consumer.setNamesrvAddr(nameServer);
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
@ -236,7 +260,15 @@ public class RocketMQListenerBindingContainer
} }
rocketMQMessageChannelBinder.getClientConfigId().add(consumer.buildMQClientId()); rocketMQMessageChannelBinder.getClientConfigId().add(consumer.buildMQClientId());
}
@Override
public String toString() {
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+ '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+ messageModel + '}';
} }
public long getSuspendCurrentQueueTimeMillis() { public long getSuspendCurrentQueueTimeMillis() {

View File

@ -40,7 +40,7 @@ import org.springframework.util.Assert;
*/ */
public class RocketMQInboundChannelAdapter extends MessageProducerSupport { public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private static final Logger logger = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(RocketMQInboundChannelAdapter.class); .getLogger(RocketMQInboundChannelAdapter.class);
private RetryTemplate retryTemplate; private RetryTemplate retryTemplate;
@ -88,7 +88,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
} }
catch (Exception e) { catch (Exception e) {
logger.error("rocketMQListenerContainer init error: " + e.getMessage(), e); log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
throw new IllegalArgumentException( throw new IllegalArgumentException(
"rocketMQListenerContainer init error: " + e.getMessage(), e); "rocketMQListenerContainer init error: " + e.getMessage(), e);
} }
@ -116,7 +116,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
.getHealthInstrumentation(rocketMQListenerContainer.getTopic() .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
+ rocketMQListenerContainer.getConsumerGroup()) + rocketMQListenerContainer.getConsumerGroup())
.markStartFailed(e); .markStartFailed(e);
logger.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload( throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage()) "RocketMQTemplate startup failed, Caused by " + e.getMessage())
.build(), e); .build(), e);

View File

@ -25,6 +25,8 @@ import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@ -44,6 +46,9 @@ import org.springframework.util.StringUtils;
*/ */
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageHandler.class);
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private MessageChannel sendFailureChannel; private MessageChannel sendFailureChannel;
@ -54,6 +59,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private final String destination; private final String destination;
private final String groupName;
private final InstrumentationManager instrumentationManager; private final InstrumentationManager instrumentationManager;
private boolean sync = false; private boolean sync = false;
@ -61,9 +68,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private volatile boolean running = false; private volatile boolean running = false;
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
Boolean transactional, InstrumentationManager instrumentationManager) { String groupName, Boolean transactional,
InstrumentationManager instrumentationManager) {
this.rocketMQTemplate = rocketMQTemplate; this.rocketMQTemplate = rocketMQTemplate;
this.destination = destination; this.destination = destination;
this.groupName = groupName;
this.transactional = transactional; this.transactional = transactional;
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
} }
@ -81,8 +90,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
catch (Exception e) { catch (Exception e) {
instrumentationManager.getHealthInstrumentation(destination) instrumentationManager.getHealthInstrumentation(destination)
.markStartFailed(e); .markStartFailed(e);
logger.error( log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
"RocketMQTemplate startup failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload( throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage()) "RocketMQTemplate startup failed, Caused by " + e.getMessage())
.build(), e); .build(), e);
@ -108,18 +116,19 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
throws Exception { throws Exception {
try { try {
StringBuilder topicWithTags = new StringBuilder(destination); final StringBuilder topicWithTags = new StringBuilder(destination);
String tags = Optional String tags = Optional
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
.toString(); .toString();
if (!StringUtils.isEmpty(tags)) { if (!StringUtils.isEmpty(tags)) {
topicWithTags = topicWithTags.append(":").append(tags); topicWithTags.append(":").append(tags);
} }
SendResult sendRes = null; SendResult sendRes = null;
if (transactional) { if (transactional) {
sendRes = rocketMQTemplate.sendMessageInTransaction(destination, sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
topicWithTags.toString(), message, message.getHeaders() topicWithTags.toString(), message, message.getHeaders()
.get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
} }
else { else {
int delayLevel = 0; int delayLevel = 0;
@ -140,17 +149,22 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
rocketMQTemplate.getProducer().getSendMsgTimeout(), rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel); delayLevel);
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
} }
else { else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message, rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
new SendCallback() { new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
} }
@Override @Override
public void onException(Throwable e) { public void onException(Throwable e) {
log.error(
"RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) { if (getSendFailureChannel() != null) {
getSendFailureChannel().send( getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy RocketMQMessageHandler.this.errorMessageStrategy
@ -174,8 +188,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error( log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
if (getSendFailureChannel() != null) { if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy getSendFailureChannel().send(this.errorMessageStrategy
.buildErrorMessage(new MessagingException(message, e), null)); .buildErrorMessage(new MessagingException(message, e), null));

View File

@ -59,9 +59,9 @@ public class RocketMQBinderMetrics
public void bindTo(@NonNull MeterRegistry registry) { public void bindTo(@NonNull MeterRegistry registry) {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(); DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
pushConsumer pushConsumer
.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNamesrvAddr()); .setNamesrvAddr(rocketMQBinderConfigurationProperties.getNameServer());
DefaultMQProducer producer = new DefaultMQProducer(); DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNamesrvAddr()); producer.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNameServer());
rocketMQMessageChannelBinder.getTopicInUse().forEach((topic, group) -> { rocketMQMessageChannelBinder.getTopicInUse().forEach((topic, group) -> {
Gauge.builder(METRIC_NAME, this, o -> calculateMsgQueueOffset(topic, group)) Gauge.builder(METRIC_NAME, this, o -> calculateMsgQueueOffset(topic, group))

View File

@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.rocketmq.properties; package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
@ -26,14 +27,69 @@ import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties { public class RocketMQBinderConfigurationProperties {
private String namesrvAddr = RocketMQBinderConstants.DEFAULT_NAME_SERVER; /**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
public String getNamesrvAddr() { /**
return namesrvAddr; * The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = true;
/**
* The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
public String getNameServer() {
return nameServer;
} }
public void setNamesrvAddr(String namesrvAddr) { public void setNameServer(String nameServer) {
this.namesrvAddr = namesrvAddr; this.nameServer = nameServer;
} }
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
} }

View File

@ -27,7 +27,12 @@ public class RocketMQProducerProperties {
private Boolean enabled = true; private Boolean enabled = true;
/** /**
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize} * Name of producer.
*/
private String group;
/**
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}.
*/ */
private Integer maxMessageSize = 1024 * 1024 * 4; private Integer maxMessageSize = 1024 * 1024 * 4;
@ -70,6 +75,14 @@ public class RocketMQProducerProperties {
*/ */
private boolean retryNextServer = false; private boolean retryNextServer = false;
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public Boolean getEnabled() { public Boolean getEnabled() {
return enabled; return enabled;
} }

View File

@ -18,8 +18,6 @@ package org.springframework.cloud.stream.binder.rocketmq.provisioning;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
@ -36,9 +34,6 @@ import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
public class RocketMQTopicProvisioner implements public class RocketMQTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>> { ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory
.getLogger(RocketMQTopicProvisioner.class);
@Override @Override
public ProducerDestination provisionProducerDestination(String name, public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties> properties) ExtendedProducerProperties<RocketMQProducerProperties> properties)

View File

@ -53,7 +53,7 @@ public class RocketMQAutoConfigurationTests {
this.contextRunner.run(context -> { this.contextRunner.run(context -> {
RocketMQBinderConfigurationProperties binderConfigurationProperties = context RocketMQBinderConfigurationProperties binderConfigurationProperties = context
.getBean(RocketMQBinderConfigurationProperties.class); .getBean(RocketMQBinderConfigurationProperties.class);
assertThat(binderConfigurationProperties.getNamesrvAddr()) assertThat(binderConfigurationProperties.getNameServer())
.isEqualTo("127.0.0.1:9876"); .isEqualTo("127.0.0.1:9876");
RocketMQExtendedBindingProperties bindingProperties = context RocketMQExtendedBindingProperties bindingProperties = context
.getBean(RocketMQExtendedBindingProperties.class); .getBean(RocketMQExtendedBindingProperties.class);