1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00
2019-07-10 10:44:08 +08:00

279 lines
8.5 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

== Spring Cloud Alibaba RocketMQ Binder
### RocketMQ 介绍
https://rocketmq.apache.org[RocketMQ] 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
* 能够保证严格的消息顺序
* 提供丰富的消息拉取模式
* 高效的订阅者水平扩展能力
* 实时的消息订阅机制
* 亿级消息堆积能力
### RocketMQ 基本使用
* 下载 RocketMQ
下载 https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[RocketMQ最新的二进制文件],并解压
解压后的目录结构如下:
```
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
```
* 启动 NameServer
```bash
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
```
* 启动 Broker
```bash
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
```
* 发送、接收消息
发送消息:
```bash
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
```
发送成功后显示:`SendResult [sendStatus=SEND_OK, msgId= ...`
接收消息:
```bash
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
```
接收成功后显示:`ConsumeMessageThread_%d Receive New Messages: [MessageExt...`
* 关闭 Server
```bash
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
```
### Spring Cloud Stream 介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 `Spring Integration` 与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念Binder 和 Binding。
* Binder: 跟外部消息中间件集成的组件,用来创建 Binding各消息中间件都有自己的 Binder 实现。
比如 `Kafka` 的实现 `KafkaMessageChannelBinder``RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder`。
* Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
.Spring Cloud Stream
image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[]
使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:
```java
MessageChannel messageChannel = new DirectChannel();
// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
```
这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。
**Spring Cloud Stream 底层基于这段代码去做了各种抽象。**
### 如何使用 Spring Cloud Alibaba RocketMQ Binder ###
如果要在您的项目中引入 RocketMQ Binder需要引入如下 maven 依赖:
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
```
或者可以使用 Spring Cloud Stream RocketMQ Starter
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
### Spring Cloud Alibaba RocketMQ Binder 实现
RocketMQ Binder 的实现依赖于 https://github.com/apache/rocketmq-spring[RocketMQ-Spring] 框架。
RocketMQ-Spring 框架是 RocketMQ 与 Spring Boot 的整合RocketMQ Spring 主要提供了 3 个特性:
1. 使用 `RocketMQTemplate` 用来统一发送消息,包括同步、异步发送消息和事务消息
2. `@RocketMQTransactionListener` 注解用来处理事务消息的监听和回查
3. `@RocketMQMessageListener` 注解用来消费消息
RocketMQ Binder 的核心类 RocketMQMessageChannelBinder 实现了 Spring Cloud Stream 规范,内部构建会 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。
`RocketMQMessageHandler` 会基于 Binding 配置构造 `RocketMQTemplate``RocketMQTemplate` 内部会把 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类转换成 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`,然后发送出去。
`RocketMQInboundChannelAdapter` 也会基于 Binding 配置构造 `RocketMQListenerBindingContainer``RocketMQListenerBindingContainer` 内部会启动 RocketMQ `Consumer` 接收消息。
NOTE: 在使用 RocketMQ Binder 的同时也可以配置 rocketmq.** 用于触发 RocketMQ Spring 相关的 AutoConfiguration
目前 Binder 支持在 `Header` 中设置相关的 key 来进行 RocketMQ Message 消息的特性设置。
比如 `TAGS`、`DELAY`、`TRANSACTIONAL_ARG`、`KEYS`、`WAIT_STORE_MSG_OK`、`FLAG` 表示 RocketMQ 消息对应的标签,
```java
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key")
.setHeader("DELAY", "1");
Message message = builder.build();
output().send(message);
```
### 配置选项
#### RocketMQ Binder Properties
spring.cloud.stream.rocketmq.binder.name-server::
RocketMQ NameServer 地址。
+
Default: `127.0.0.1:9876`.
spring.cloud.stream.rocketmq.binder.access-key::
阿里云账号 AccessKey。
+
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key::
阿里云账号 SecretKey。
+
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
是否为 Producer 和 Consumer 开启消息轨迹功能
+
Default: `true`.
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
消息轨迹开启后存储的 topic 名称。
+
Default: `RMQ_SYS_TRACE_TOPIC`.
#### RocketMQ Consumer Properties
下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.` 为前缀的 RocketMQ Consumer 相关的配置。
enable::
是否启用 Consumer。
+
默认值: `true`.
tags::
Consumer 基于 TAGS 订阅,多个 tag 以 `||` 分割。
+
默认值: empty.
sql::
Consumer 基于 SQL 订阅。
+
默认值: empty.
broadcasting::
Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式。
+
默认值: `false`.
orderly::
Consumer 是否同步消费消息模式。
+
默认值: `false`.
delayLevelWhenNextConsume::
异步消费消息模式下消费失败重试策略:
* -1,不重复,直接放入死信队列
* 0,broker 控制重试策略
* >0,client 控制重试策略
+
默认值: `0`.
suspendCurrentQueueTimeMillis::
同步消费消息模式下消费失败后再次消费的时间间隔。
+
默认值: `1000`.
#### RocketMQ Provider Properties
下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.` 为前缀的 RocketMQ Producer 相关的配置。
enable::
是否启用 Producer。
+
默认值: `true`.
group::
Producer group name。
+
默认值: empty.
maxMessageSize::
消息发送的最大字节数。
+
默认值: `8249344`.
transactional::
是否发送事务消息。
+
默认值: `false`.
sync::
是否使用同步得方式发送消息。
+
默认值: `false`.
vipChannelEnabled::
是否在 Vip Channel 上发送消息。
+
默认值: `true`.
sendMessageTimeout::
发送消息的超时时间(毫秒)。
+
默认值: `3000`.
compressMessageBodyThreshold::
消息体压缩阀值(当消息体超过 4k 的时候会被压缩)。
+
默认值: `4096`.
retryTimesWhenSendFailed::
在同步发送消息的模式下,消息发送失败的重试次数。
+
默认值: `2`.
retryTimesWhenSendAsyncFailed::
在异步发送消息的模式下,消息发送失败的重试次数。
+
默认值: `2`.
retryNextServer::
消息发送失败的情况下是否重试其它的 broker。
+
默认值: `false`.