1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00
2018-12-18 20:17:50 +08:00

250 lines
11 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

== Spring Cloud Alibaba RocketMQ Binder
### Introduction of RocketMQ
https://rocketmq.apache.org[RocketMQ] is an open-source distributed message system. It is based on highly available distributed cluster technologies and provides message publishing and subscription service with low latency and high stability. RocketMQ is widely used in a variety of industries, such as decoupling of asynchronous communication, enterprise sulotions, financial settlements, telecommunication, e-commerce, logistics, marketing, social media, instant messaging, mobile applications, mobile games, vedios, IoT, and Internet of Vehicles.
It has the following features:
* Strict order of message sending and consumption
* Rich modes of message pulling
* Horizontal scalability of consumers
* Real-time message subscription
* Billion-level message accumulation capability
### RocketMQ Usages
* Download RocketMQ
Download https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[Latest Binary File of RocketMQ], and decompress it.
The decompressed directory is as follows:
```
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
```
* Start NameServer
```bash
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
```
* Start Broker
```bash
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
```
* Send and Receive Messages
Send messages:
```bash
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
```
Output when the message is successfuly sent: `SendResult [sendStatus=SEND_OK, msgId= ...`
Receive messages:
```bash
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
```
Output when the message is successfully received `ConsumeMessageThread_%d Receive New Messages: [MessageExt...`
* Disable Server
```bash
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
```
### Introduction of Spring Cloud Stream
Spring Cloud Stream is a microservice framework used to build architectures based on messages. It helps you to create production-ready single-server Spring applications based on SpringBoot, and connects with Broker using `Spring Integration`.
Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.
There are two concepts in Spring Cloud Stream: Binder and Binding
* Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.
For example, `Kafka` uses `KafkaMessageChannelBinder`, `RabbitMQ` uses `RabbitMessageChannelBinder`, while `RocketMQ` uses `RocketMQMessageChannelBinder`.
* Binding: Includes Input Binding and Output Binding。
Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.
.Spring Cloud Stream
image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[]
Now lets use Spring Cloud Stream to write a simple code for sending and receiving messages:
```java
MessageChannel messageChannel = new DirectChannel();
// Message subscription
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<? > message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// Message sending
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
```
All the message types in this code are provided by the `spring-messaging`module. It shields the lower-layer implementations of message middleware. If you would like to change the message middleware, you only need to configure the related message middleware information in the configuration file and modify the binder dependency.
**The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.**
### How Spring Cloud Alibaba RocketMQ Binder Works
.RocketMQ Binder Workflow
image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[]
The core of RocketMQ Binder are the 3 classes below: `RocketMQMessageChannelBinder``RocketMQInboundChannelAdapter` and `RocketMQMessageHandler`.
`RocketMQMessageChannelBinder` is a standard implementation of Binder. It contains `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` as its internal constructions.
`RocketMQMessageHandler` is used to start RocketMQ `Producer` and send messages. It creates message type of RocketMQ `org.apache.rocketmq.common.message.Message` based on the message type of `org.springframework.messaging.Message` in the `spring-messaging` module.
When constructing `org.apache.rocketmq.common.message.Message`, it constructs `RocketMQMessageHeaderAccessor` based on the header of `org.springframework.messaging.Message`. Then, based on some of the properties in `RocketMQMessageHeaderAccessor` , it sets some of message attributes such as tags, keys, and flag in `org.apache.rocketmq.common.message.Message` of RocketMQ.
`RocketMQInboundChannelAdapter` is used to start RocketMQ `Consumer` and receive messages. It also support the usage of https://github.com/spring-projects/spring-retry[spring-retry].
You can also obtain `Acknowledgement` from the Header and make some configurations.
For example, you can set delayed message consumption when `MessageListenerConcurrently` is used for asynchronous message consumption:
```java
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER);
acknowledgement.setConsumeConcurrentlyDelayLevel(1);
}
```
You can also set delayed message consumption when `MessageListenerOrderly` is used for consuming ordered messages.
```java
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
```
Supported Configurations of Provider:
:frame: topbot
[width="60%",options="header"]
|====
^|Configuration ^|Description ^| Default Value
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|Whether to use producer|true
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|Maximum bytes of messages sent|0(Take effect only when its bigger than 0. The default value of RocketMQ is 4M = 1024 * 1024 * 4)
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|Whether to use `TransactionMQProducer` to send transaction messages|false
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.LocalTransactionExecuter` For example, `org.test.MyExecuter`|
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.TransactionCheckListener` For example, `org.test.MyTransactionCheckListener`|
|====
Supported Configurations of Consumer:
:frame: topbot
[width="60%",options="header"]
|====
^|Configuration ^|Description| Default Value
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|Whether to use consumer|true
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer will only subscribe to messages with these tags Tags are separated by "\|\|" (If not specified, it means the consumer subscribes to all messages)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer subscribes to the messages as requested in the SQL(If tags are also specified, SQL has a higher priority than tags.)|
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|If the consumer uses the broadcasting mode|false
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|Ordered message consumption or asychronous consumption|false
|====
### Endpoint Support
Before you use the Endpoint feature, please add the `spring-boot-starter-actuator` dependency in Maven, and enable access of Endpoints in your configuration.
* Add `management.security.enabled=false`in Spring Boot 1.x. The exposed endpoint path is `/rocketmq_binder`
* Add `management.endpoints.web.exposure.include=*`in Spring Boot 2.x. The exposed endpoint path is `/actuator/rocketmq-binder`
Endpoint will collects data about the last message that is sent, the number of successes or failures of message sending, and the number of successes of failures of message consumption.
```json
{
"runtime": {
"lastSend.timestamp": 1542786623915
},
"metrics": {
"scs-rocketmq.consumer.test-topic.totalConsumed": {
"count": 11
},
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
"count": 0
},
"scs-rocketmq.producer.test-topic.totalSentFailures": {
"count": 0
},
"scs-rocketmq.consumer.test-topic.consumedPerSecond": {
"count": 11,
"fifteenMinuteRate": 0.012163847780107841,
"fiveMinuteRate": 0.03614605351360527,
"meanRate": 0.3493213353657594,
"oneMinuteRate": 0.17099243039490175
},
"scs-rocketmq.producer.test-topic.totalSent": {
"count": 5
},
"scs-rocketmq.producer.test-topic.sentPerSecond": {
"count": 5,
"fifteenMinuteRate": 0.005540151995103271,
"fiveMinuteRate": 0.01652854617838251,
"meanRate": 0.10697493212602836,
"oneMinuteRate": 0.07995558537067671
},
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
},
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
```
Note To view statistics, add the https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core dependency] in POM. If not added, the endpoint will return warning instead of statistics:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```