mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
277 lines
9.2 KiB
Plaintext
277 lines
9.2 KiB
Plaintext
== Spring Cloud Alibaba RocketMQ Binder
|
||
|
||
### Introduction of RocketMQ
|
||
|
||
https://rocketmq.apache.org[RocketMQ] is an open-source distributed message system. It is based on highly available distributed cluster technologies and provides message publishing and subscription service with low latency and high stability. RocketMQ is widely used in a variety of industries, such as decoupling of asynchronous communication, enterprise sulotions, financial settlements, telecommunication, e-commerce, logistics, marketing, social media, instant messaging, mobile applications, mobile games, vedios, IoT, and Internet of Vehicles.
|
||
|
||
It has the following features:
|
||
|
||
* Strict order of message sending and consumption
|
||
|
||
* Rich modes of message pulling
|
||
|
||
* Horizontal scalability of consumers
|
||
|
||
* Real-time message subscription
|
||
|
||
* Billion-level message accumulation capability
|
||
|
||
### RocketMQ Usages
|
||
|
||
* Download RocketMQ
|
||
|
||
Download https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[Latest Binary File of RocketMQ], and decompress it.
|
||
|
||
The decompressed directory is as follows:
|
||
|
||
```
|
||
apache-rocketmq
|
||
├── LICENSE
|
||
├── NOTICE
|
||
├── README.md
|
||
├── benchmark
|
||
├── bin
|
||
├── conf
|
||
└── lib
|
||
```
|
||
|
||
* Start NameServer
|
||
|
||
```bash
|
||
nohup sh bin/mqnamesrv &
|
||
tail -f ~/logs/rocketmqlogs/namesrv.log
|
||
```
|
||
|
||
* Start Broker
|
||
|
||
```bash
|
||
nohup sh bin/mqbroker -n localhost:9876 &
|
||
tail -f ~/logs/rocketmqlogs/broker.log
|
||
```
|
||
|
||
* Send and Receive Messages
|
||
|
||
Send messages:
|
||
|
||
```bash
|
||
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
|
||
```
|
||
|
||
Output when the message is successfuly sent: `SendResult [sendStatus=SEND_OK, msgId= ...`
|
||
|
||
Receive messages:
|
||
|
||
```bash
|
||
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
|
||
```
|
||
|
||
Output when the message is successfully received: `ConsumeMessageThread_%d Receive New Messages: [MessageExt...`
|
||
|
||
* Disable Server
|
||
|
||
```bash
|
||
sh bin/mqshutdown broker
|
||
sh bin/mqshutdown namesrv
|
||
```
|
||
|
||
### Introduction of Spring Cloud Stream
|
||
|
||
Spring Cloud Stream is a microservice framework used to build architectures based on messages. It helps you to create production-ready single-server Spring applications based on SpringBoot, and connects with Broker using `Spring Integration`.
|
||
|
||
Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.
|
||
|
||
There are two concepts in Spring Cloud Stream: Binder and Binding
|
||
|
||
* Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.
|
||
|
||
For example, `Kafka` uses `KafkaMessageChannelBinder`, `RabbitMQ` uses `RabbitMessageChannelBinder`, while `RocketMQ` uses `RocketMQMessageChannelBinder`.
|
||
|
||
* Binding: Includes Input Binding and Output Binding。
|
||
|
||
Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.
|
||
|
||
.Spring Cloud Stream
|
||
image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[]
|
||
|
||
Now let’s use Spring Cloud Stream to write a simple code for sending and receiving messages:
|
||
|
||
```java
|
||
MessageChannel messageChannel = new DirectChannel();
|
||
|
||
// Message subscription
|
||
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
|
||
@Override
|
||
public void handleMessage(Message<? > message) throws MessagingException {
|
||
System.out.println("receive msg: " + message.getPayload());
|
||
}
|
||
});
|
||
|
||
// Message sending
|
||
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
|
||
```
|
||
|
||
All the message types in this code are provided by the `spring-messaging`module. It shields the lower-layer implementations of message middleware. If you would like to change the message middleware, you only need to configure the related message middleware information in the configuration file and modify the binder dependency.
|
||
|
||
**The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.**
|
||
|
||
### How to use Spring Cloud Alibaba RocketMQ Binder ###
|
||
|
||
For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
|
||
|
||
```xml
|
||
<dependency>
|
||
<groupId>com.alibaba.cloud</groupId>
|
||
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
|
||
</dependency>
|
||
```
|
||
|
||
Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:
|
||
|
||
```xml
|
||
<dependency>
|
||
<groupId>com.alibaba.cloud</groupId>
|
||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||
</dependency>
|
||
```
|
||
|
||
### How Spring Cloud Alibaba RocketMQ Binder Works
|
||
|
||
The implementation of RocketMQ Binder depend on the https://github.com/apache/rocketmq-spring[RocketMQ-Spring] framework.
|
||
|
||
RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:
|
||
|
||
1. `RocketMQTemplate`: Sending messages, including synchronous, asynchronous, and transactional messages.
|
||
2. `@RocketMQTransactionListener`: Listen and check for transaction messages.
|
||
3. `@RocketMQMessageListener`: Consume messages.
|
||
|
||
`RocketMQMessageChannelBinder` is a standard implementation of Binder, it will build `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` internally.
|
||
|
||
`RocketMQMessageHandler` will construct `RocketMQTemplate` based on the Binding configuration. `RocketMQTemplate` will convert the `org.springframework.messaging.Message` message class of `spring-messaging` module to the RocketMQ message class `org.apache.rocketmq.common .message.Message` internally, then send it out.
|
||
|
||
`RocketMQInboundChannelAdapter` will also construct `RocketMQListenerBindingContainer` based on the Binding configuration, and `RocketMQListenerBindingContainer` will start the RocketMQ `Consumer` to receive the messages.
|
||
|
||
NOTE: RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration
|
||
|
||
Currently Binder supports setting the relevant key in `Header` to set the properties of the RocketMQ message.
|
||
|
||
For example, `TAGS`, `DELAY`, `TRANSACTIONAL_ARG`, `KEYS`, `WAIT_STORE_MSG_OK`, `FLAG` represent the labels corresponding to the RocketMQ message.
|
||
|
||
```java
|
||
MessageBuilder builder = MessageBuilder.withPayload(msg)
|
||
.setHeader(RocketMQHeaders.TAGS, "binder")
|
||
.setHeader(RocketMQHeaders.KEYS, "my-key")
|
||
.setHeader("DELAY", "1");
|
||
Message message = builder.build();
|
||
output().send(message);
|
||
```
|
||
|
||
### Configuration Options
|
||
|
||
#### RocketMQ Binder Properties
|
||
|
||
spring.cloud.stream.rocketmq.binder.name-server::
|
||
The name server of RocketMQ Server.
|
||
+
|
||
Default: `127.0.0.1:9876`.
|
||
spring.cloud.stream.rocketmq.binder.access-key::
|
||
The AccessKey of Alibaba Cloud Account.
|
||
+
|
||
Default: null.
|
||
spring.cloud.stream.rocketmq.binder.secret-key::
|
||
The SecretKey of Alibaba Cloud Account.
|
||
+
|
||
Default: null.
|
||
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
|
||
Enable Message Trace feature for all producers and consumers.
|
||
+
|
||
Default: `true`.
|
||
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
|
||
The trace topic for message trace.
|
||
+
|
||
Default: `RMQ_SYS_TRACE_TOPIC`.
|
||
|
||
|
||
#### RocketMQ Consumer Properties
|
||
|
||
The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.`.
|
||
|
||
enable::
|
||
Enable Consumer Binding.
|
||
+
|
||
Default: `true`.
|
||
tags::
|
||
Consumer subscription tags expression, tags split by `||`.
|
||
+
|
||
Default: empty.
|
||
sql::
|
||
Consumer subscription sql expression.
|
||
+
|
||
Default: empty.
|
||
broadcasting::
|
||
Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
|
||
+
|
||
Default: `false`.
|
||
orderly::
|
||
Receiving message concurrently or orderly.
|
||
+
|
||
Default: `false`.
|
||
delayLevelWhenNextConsume::
|
||
Message consume retry strategy for concurrently consume:
|
||
* -1,no retry,put into DLQ directly
|
||
* 0,broker control retry frequency
|
||
* >0,client control retry frequency
|
||
+
|
||
Default: `0`.
|
||
suspendCurrentQueueTimeMillis::
|
||
Time interval of message consume retry for orderly consume.
|
||
+
|
||
Default: `1000`.
|
||
|
||
#### RocketMQ Provider Properties
|
||
|
||
The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.`.
|
||
|
||
enable::
|
||
Enable Producer Binding.
|
||
+
|
||
Default: `true`.
|
||
group::
|
||
Producer group name.
|
||
+
|
||
Default: empty.
|
||
maxMessageSize::
|
||
Maximum allowed message size in bytes.
|
||
+
|
||
Default: `8249344`.
|
||
transactional::
|
||
Send Transactional Message.
|
||
+
|
||
Default: `false`.
|
||
sync::
|
||
Send message in synchronous mode.
|
||
+
|
||
Default: `false`.
|
||
vipChannelEnabled::
|
||
Send message with vip channel.
|
||
+
|
||
Default: `true`.
|
||
sendMessageTimeout::
|
||
Millis of send message timeout.
|
||
+
|
||
Default: `3000`.
|
||
compressMessageBodyThreshold::
|
||
Compress message body threshold, namely, message body larger than 4k will be compressed on default.
|
||
+
|
||
Default: `4096`.
|
||
retryTimesWhenSendFailed::
|
||
Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
|
||
+
|
||
Default: `2`.
|
||
retryTimesWhenSendAsyncFailed::
|
||
Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
|
||
+
|
||
Default: `2`.
|
||
retryNextServer::
|
||
Indicate whether to retry another broker on sending failure internally.
|
||
+
|
||
Default: `false`. |