1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00
Files
2019-10-29 15:13:25 +08:00

334 lines
11 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

== Spring Cloud Alibaba RocketMQ Binder
=== Introduction of RocketMQ
https://rocketmq.apache.org[RocketMQ] is an open-source distributed message system. It is based on highly available distributed cluster technologies and provides message publishing and subscription service with low latency and high stability. RocketMQ is widely used in a variety of industries, such as decoupling of asynchronous communication, enterprise sulotions, financial settlements, telecommunication, e-commerce, logistics, marketing, social media, instant messaging, mobile applications, mobile games, vedios, IoT, and Internet of Vehicles.
It has the following features:
* Strict order of message sending and consumption
* Rich modes of message pulling
* Horizontal scalability of consumers
* Real-time message subscription
* Billion-level message accumulation capability
=== RocketMQ Usages
* Download RocketMQ
Download https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[Latest Binary File of RocketMQ], and decompress it.
The decompressed directory is as follows:
```
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
```
* Start NameServer
```bash
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
```
* Start Broker
```bash
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
```
* Send and Receive Messages
Send messages:
```bash
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
```
Output when the message is successfuly sent: `SendResult [sendStatus=SEND_OK, msgId= ...`
Receive messages:
```bash
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
```
Output when the message is successfully received `ConsumeMessageThread_%d Receive New Messages: [MessageExt...`
* Disable Server
```bash
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
```
=== Introduction of Spring Cloud Stream
Spring Cloud Stream is a microservice framework used to build architectures based on messages. It helps you to create production-ready single-server Spring applications based on SpringBoot, and connects with Broker using `Spring Integration`.
Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.
There are two concepts in Spring Cloud Stream: Binder and Binding
* Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.
For example, `Kafka` uses `KafkaMessageChannelBinder`, `RabbitMQ` uses `RabbitMessageChannelBinder`, while `RocketMQ` uses `RocketMQMessageChannelBinder`.
* Binding: Includes Input Binding and Output Binding.
Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.
.Spring Cloud Stream
image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[]
Now lets use Spring Cloud Stream to write a simple code for sending and receiving messages:
```java
MessageChannel messageChannel = new DirectChannel();
// Message subscription
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<? > message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// Message sending
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
```
All the message types in this code are provided by the `spring-messaging`module. It shields the lower-layer implementations of message middleware. If you would like to change the message middleware, you only need to configure the related message middleware information in the configuration file and modify the binder dependency.
**The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.**
=== How to use Spring Cloud Alibaba RocketMQ Binder
For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
```xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
```
Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:
```xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
=== How Spring Cloud Alibaba RocketMQ Binder Works
This is the implementation architecture of Spring Cloud Stream RocketMQ Binder:
.SCS RocketMQ Binder
image::https://img.alicdn.com/tfs/TB1v8rcbUY1gK0jSZFCXXcwqXXa-1236-773.png[]
The implementation of RocketMQ Binder depend on the https://github.com/apache/rocketmq-spring[RocketMQ-Spring] framework.
RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:
1. `RocketMQTemplate`: Sending messages, including synchronous, asynchronous, and transactional messages.
2. `@RocketMQTransactionListener`: Listen and check for transaction messages.
3. `@RocketMQMessageListener`: Consume messages.
`RocketMQMessageChannelBinder` is a standard implementation of Binder, it will build `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` internally.
`RocketMQMessageHandler` will construct `RocketMQTemplate` based on the Binding configuration. `RocketMQTemplate` will convert the `org.springframework.messaging.Message` message class of `spring-messaging` module to the RocketMQ message class `org.apache.rocketmq.common .message.Message` internally, then send it out.
`RocketMQInboundChannelAdapter` will also construct `RocketMQListenerBindingContainer` based on the Binding configuration, and `RocketMQListenerBindingContainer` will start the RocketMQ `Consumer` to receive the messages.
NOTE: RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration
Currently Binder supports setting the relevant key in `Header` to set the properties of the RocketMQ message.
For example, `TAGS`, `DELAY`, `TRANSACTIONAL_ARG`, `KEYS`, `WAIT_STORE_MSG_OK`, `FLAG` represent the labels corresponding to the RocketMQ message.
```java
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key")
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1");
Message message = builder.build();
output().send(message);
```
=== Support MessageSource
SCS RocketMQ Binder support `MessageSource`which can receive messages by pull mode
```java
@SpringBootApplication
@EnableBinding(MQApplication.PolledProcessor.class)
public class MQApplication {
private final Logger logger =
LoggerFactory.getLogger(MQApplication.class);
public static void main(String[] args) {
SpringApplication.run(MQApplication.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
```
=== Configuration Options
==== RocketMQ Binder Properties
spring.cloud.stream.rocketmq.binder.name-server::
The name server of RocketMQ Server(Older versions use the namesrv-addr configuration item).
+
Default: `127.0.0.1:9876`.
spring.cloud.stream.rocketmq.binder.access-key::
The AccessKey of Alibaba Cloud Account.
+
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key::
The SecretKey of Alibaba Cloud Account.
+
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
Enable Message Trace feature for all producers and consumers.
+
Default: `true`.
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
The trace topic for message trace.
+
Default: `RMQ_SYS_TRACE_TOPIC`.
==== RocketMQ Consumer Properties
The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.`.
enable::
Enable Consumer Binding.
+
Default: `true`.
tags::
Consumer subscription tags expression, tags split by `||`.
+
Default: empty.
sql::
Consumer subscription sql expression.
+
Default: empty.
broadcasting::
Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
+
Default: `false`.
orderly::
Receiving message concurrently or orderly.
+
Default: `false`.
delayLevelWhenNextConsume::
Message consume retry strategy for concurrently consume:
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
+
Default: `0`.
suspendCurrentQueueTimeMillis::
Time interval of message consume retry for orderly consume.
+
Default: `1000`.
==== RocketMQ Provider Properties
The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.`.
enable::
Enable Producer Binding.
+
Default: `true`.
group::
Producer group name.
+
Default: empty.
maxMessageSize::
Maximum allowed message size in bytes.
+
Default: `8249344`.
transactional::
Send Transactional Message.
+
Default: `false`.
sync::
Send message in synchronous mode.
+
Default: `false`.
vipChannelEnabled::
Send message with vip channel.
+
Default: `true`.
sendMessageTimeout::
Millis of send message timeout.
+
Default: `3000`.
compressMessageBodyThreshold::
Compress message body threshold, namely, message body larger than 4k will be compressed on default.
+
Default: `4096`.
retryTimesWhenSendFailed::
Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
+
Default: `2`.
retryTimesWhenSendAsyncFailed::
Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
+
Default: `2`.
retryNextServer::
Indicate whether to retry another broker on sending failure internally.
+
Default: `false`.