mirror of
				https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
				synced 2021-06-26 13:25:11 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			334 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| == Spring Cloud Alibaba RocketMQ Binder
 | ||
| 
 | ||
| === Introduction of RocketMQ
 | ||
| 
 | ||
| https://rocketmq.apache.org[RocketMQ] is an open-source distributed message system. It is based on highly available distributed cluster technologies and provides message publishing and subscription service with low latency and high stability. RocketMQ is widely used in a variety of industries, such as decoupling of asynchronous communication, enterprise sulotions, financial settlements, telecommunication, e-commerce, logistics, marketing, social media, instant messaging, mobile applications, mobile games, vedios, IoT, and Internet of Vehicles.
 | ||
| 
 | ||
| It has the following features:
 | ||
| 
 | ||
| * Strict order of message sending and consumption
 | ||
| 
 | ||
| * Rich modes of message pulling
 | ||
| 
 | ||
| * Horizontal scalability of consumers
 | ||
| 
 | ||
| * Real-time message subscription
 | ||
| 
 | ||
| * Billion-level message accumulation capability
 | ||
| 
 | ||
| === RocketMQ Usages
 | ||
| 
 | ||
| * Download RocketMQ
 | ||
| 
 | ||
| Download https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[Latest Binary File of RocketMQ], and decompress it.
 | ||
| 
 | ||
| The decompressed directory is as follows:
 | ||
| 
 | ||
| ```
 | ||
| apache-rocketmq
 | ||
| ├── LICENSE
 | ||
| ├── NOTICE
 | ||
| ├── README.md
 | ||
| ├── benchmark
 | ||
| ├── bin
 | ||
| ├── conf
 | ||
| └── lib
 | ||
| ```
 | ||
| 
 | ||
| * Start NameServer
 | ||
| 
 | ||
| ```bash
 | ||
| nohup sh bin/mqnamesrv &
 | ||
| tail -f ~/logs/rocketmqlogs/namesrv.log
 | ||
| ```
 | ||
| 
 | ||
| * Start Broker
 | ||
| 
 | ||
| ```bash
 | ||
| nohup sh bin/mqbroker -n localhost:9876 &
 | ||
| tail -f ~/logs/rocketmqlogs/broker.log
 | ||
| ```
 | ||
| 
 | ||
| * Send and Receive Messages
 | ||
| 
 | ||
| Send messages:
 | ||
| 
 | ||
| ```bash
 | ||
| sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 | ||
| ```
 | ||
| 
 | ||
| Output when the message is successfuly sent: `SendResult [sendStatus=SEND_OK, msgId= ...`
 | ||
| 
 | ||
| Receive messages:
 | ||
| 
 | ||
| ```bash
 | ||
| sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 | ||
| ```
 | ||
| 
 | ||
| Output when the message is successfully received: `ConsumeMessageThread_%d Receive New Messages: [MessageExt...`
 | ||
| 
 | ||
| * Disable Server
 | ||
| 
 | ||
| ```bash
 | ||
| sh bin/mqshutdown broker
 | ||
| sh bin/mqshutdown namesrv
 | ||
| ```
 | ||
| 
 | ||
| === Introduction of Spring Cloud Stream
 | ||
| 
 | ||
| Spring Cloud Stream is a microservice framework used to build architectures based on messages. It helps you to create production-ready single-server Spring applications based on SpringBoot, and connects with Broker using `Spring Integration`.
 | ||
| 
 | ||
| Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.
 | ||
| 
 | ||
| There are two concepts in Spring Cloud Stream: Binder and Binding
 | ||
| 
 | ||
| * Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.
 | ||
| 
 | ||
| For example, `Kafka` uses `KafkaMessageChannelBinder`, `RabbitMQ` uses `RabbitMessageChannelBinder`, while `RocketMQ` uses `RocketMQMessageChannelBinder`.
 | ||
| 
 | ||
| * Binding: Includes Input Binding and Output Binding.
 | ||
| 
 | ||
| Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.
 | ||
| 
 | ||
| .Spring Cloud Stream
 | ||
| image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[]
 | ||
| 
 | ||
| Now let’s use Spring Cloud Stream to write a simple code for sending and receiving messages:
 | ||
| 
 | ||
| ```java
 | ||
| MessageChannel messageChannel = new DirectChannel();
 | ||
| 
 | ||
| // Message subscription
 | ||
| ((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
 | ||
|     @Override
 | ||
|     public void handleMessage(Message<? > message) throws MessagingException {
 | ||
|         System.out.println("receive msg: " + message.getPayload());
 | ||
|     }
 | ||
| });
 | ||
| 
 | ||
| // Message sending
 | ||
| messageChannel.send(MessageBuilder.withPayload("simple msg").build());
 | ||
| ```
 | ||
| 
 | ||
| All the message types in this code are provided by the `spring-messaging`module. It shields the lower-layer implementations of message middleware. If you would like to change the message middleware, you only need to configure the related message middleware information in the configuration file and modify the binder dependency.
 | ||
| 
 | ||
| **The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.**
 | ||
| 
 | ||
| === How to use Spring Cloud Alibaba RocketMQ Binder
 | ||
| 
 | ||
| For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
 | ||
| 
 | ||
| ```xml
 | ||
| <dependency>
 | ||
|   <groupId>com.alibaba.cloud</groupId>
 | ||
|   <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
 | ||
| </dependency>
 | ||
| ```
 | ||
| 
 | ||
| Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:
 | ||
| 
 | ||
| ```xml
 | ||
| <dependency>
 | ||
|     <groupId>com.alibaba.cloud</groupId>
 | ||
|     <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
 | ||
| </dependency>
 | ||
| ```
 | ||
| 
 | ||
| === How Spring Cloud Alibaba RocketMQ Binder Works
 | ||
| 
 | ||
| This is the implementation architecture of Spring Cloud Stream RocketMQ Binder:
 | ||
| 
 | ||
| .SCS RocketMQ Binder
 | ||
| image::https://img.alicdn.com/tfs/TB1v8rcbUY1gK0jSZFCXXcwqXXa-1236-773.png[]
 | ||
| 
 | ||
| The implementation of RocketMQ Binder depend on the https://github.com/apache/rocketmq-spring[RocketMQ-Spring] framework.
 | ||
| 
 | ||
| RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:
 | ||
| 
 | ||
| 1. `RocketMQTemplate`: Sending messages, including synchronous, asynchronous, and transactional messages.
 | ||
| 2. `@RocketMQTransactionListener`: Listen and check for transaction messages.
 | ||
| 3. `@RocketMQMessageListener`: Consume messages.
 | ||
| 
 | ||
| `RocketMQMessageChannelBinder` is a standard implementation of Binder, it will build `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` internally.
 | ||
| 
 | ||
| `RocketMQMessageHandler` will construct `RocketMQTemplate` based on the Binding configuration. `RocketMQTemplate` will convert the `org.springframework.messaging.Message` message class of `spring-messaging` module to the RocketMQ message class `org.apache.rocketmq.common .message.Message` internally, then send it out.
 | ||
| 
 | ||
| `RocketMQInboundChannelAdapter` will also construct `RocketMQListenerBindingContainer` based on the Binding configuration, and `RocketMQListenerBindingContainer` will start the RocketMQ `Consumer` to receive the messages.
 | ||
| 
 | ||
| NOTE: RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration
 | ||
| 
 | ||
| Currently Binder supports setting the relevant key in `Header` to set the properties of the RocketMQ message.
 | ||
| 
 | ||
| For example, `TAGS`, `DELAY`, `TRANSACTIONAL_ARG`, `KEYS`, `WAIT_STORE_MSG_OK`, `FLAG` represent the labels corresponding to the RocketMQ message.
 | ||
| 
 | ||
| ```java
 | ||
| MessageBuilder builder = MessageBuilder.withPayload(msg)
 | ||
|     .setHeader(RocketMQHeaders.TAGS, "binder")
 | ||
|     .setHeader(RocketMQHeaders.KEYS, "my-key")
 | ||
|     .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1");
 | ||
| Message message = builder.build();
 | ||
| output().send(message);
 | ||
| ```
 | ||
| 
 | ||
| === Support MessageSource
 | ||
| 
 | ||
| SCS RocketMQ Binder support `MessageSource`,which can receive messages by pull mode:
 | ||
| 
 | ||
| ```java
 | ||
| @SpringBootApplication
 | ||
| @EnableBinding(MQApplication.PolledProcessor.class)
 | ||
| public class MQApplication {
 | ||
| 
 | ||
|   private final Logger logger =
 | ||
|   	  LoggerFactory.getLogger(MQApplication.class);
 | ||
| 
 | ||
|   public static void main(String[] args) {
 | ||
|     SpringApplication.run(MQApplication.class, args);
 | ||
|   }
 | ||
| 
 | ||
|   @Bean
 | ||
|   public ApplicationRunner runner(PollableMessageSource source,
 | ||
|   	    MessageChannel dest) {
 | ||
|     return args -> {
 | ||
|       while (true) {
 | ||
|         boolean result = source.poll(m -> {
 | ||
|           String payload = (String) m.getPayload();
 | ||
|           logger.info("Received: " + payload);
 | ||
|           dest.send(MessageBuilder.withPayload(payload.toUpperCase())
 | ||
|               .copyHeaders(m.getHeaders())
 | ||
|               .build());
 | ||
|         }, new ParameterizedTypeReference<String>() { });
 | ||
|         if (result) {
 | ||
|           logger.info("Processed a message");
 | ||
|         }
 | ||
|         else {
 | ||
|           logger.info("Nothing to do");
 | ||
|         }
 | ||
|         Thread.sleep(5_000);
 | ||
|       }
 | ||
|     };
 | ||
|   }
 | ||
| 
 | ||
|   public static interface PolledProcessor {
 | ||
| 
 | ||
|     @Input
 | ||
|     PollableMessageSource source();
 | ||
| 
 | ||
|     @Output
 | ||
|     MessageChannel dest();
 | ||
| 
 | ||
|   }
 | ||
| 
 | ||
| }
 | ||
| ```
 | ||
| 
 | ||
| === Configuration Options
 | ||
| 
 | ||
| ==== RocketMQ Binder Properties
 | ||
| 
 | ||
| spring.cloud.stream.rocketmq.binder.name-server::
 | ||
| The name server of RocketMQ Server(Older versions use the namesrv-addr configuration item).
 | ||
| +
 | ||
| Default: `127.0.0.1:9876`.
 | ||
| spring.cloud.stream.rocketmq.binder.access-key::
 | ||
| The AccessKey of Alibaba Cloud Account.
 | ||
| +
 | ||
| Default: null.
 | ||
| spring.cloud.stream.rocketmq.binder.secret-key::
 | ||
| The SecretKey of Alibaba Cloud Account.
 | ||
| +
 | ||
| Default: null.
 | ||
| spring.cloud.stream.rocketmq.binder.enable-msg-trace::
 | ||
| Enable Message Trace feature for all producers and consumers.
 | ||
| +
 | ||
| Default: `true`.
 | ||
| spring.cloud.stream.rocketmq.binder.customized-trace-topic::
 | ||
| The trace topic for message trace.
 | ||
| +
 | ||
| Default: `RMQ_SYS_TRACE_TOPIC`.
 | ||
| 
 | ||
| 
 | ||
| ==== RocketMQ Consumer Properties
 | ||
| 
 | ||
| The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.`.
 | ||
| 
 | ||
| enable::
 | ||
| Enable Consumer Binding.
 | ||
| +
 | ||
| Default: `true`.
 | ||
| tags::
 | ||
| Consumer subscription tags expression, tags split by `||`.
 | ||
| +
 | ||
| Default: empty.
 | ||
| sql::
 | ||
| Consumer subscription sql expression.
 | ||
| +
 | ||
| Default: empty.
 | ||
| broadcasting::
 | ||
| Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
 | ||
| +
 | ||
| Default: `false`.
 | ||
| orderly::
 | ||
| Receiving message concurrently or orderly.
 | ||
| +
 | ||
| Default: `false`.
 | ||
| delayLevelWhenNextConsume::
 | ||
| Message consume retry strategy for concurrently consume:
 | ||
| * -1,no retry,put into DLQ directly
 | ||
| * 0,broker control retry frequency
 | ||
| * >0,client control retry frequency
 | ||
| +
 | ||
| Default: `0`.
 | ||
| suspendCurrentQueueTimeMillis::
 | ||
| Time interval of message consume retry for orderly consume.
 | ||
| +
 | ||
| Default: `1000`.
 | ||
| 
 | ||
| ==== RocketMQ Provider Properties
 | ||
| 
 | ||
| The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.`.
 | ||
| 
 | ||
| enable::
 | ||
| Enable Producer Binding.
 | ||
| +
 | ||
| Default: `true`.
 | ||
| group::
 | ||
| Producer group name.
 | ||
| +
 | ||
| Default: empty.
 | ||
| maxMessageSize::
 | ||
| Maximum allowed message size in bytes.
 | ||
| +
 | ||
| Default: `8249344`.
 | ||
| transactional::
 | ||
| Send Transactional Message.
 | ||
| +
 | ||
| Default: `false`.
 | ||
| sync::
 | ||
| Send message in synchronous mode.
 | ||
| +
 | ||
| Default: `false`.
 | ||
| vipChannelEnabled::
 | ||
| Send message with vip channel.
 | ||
| +
 | ||
| Default: `true`.
 | ||
| sendMessageTimeout::
 | ||
| Millis of send message timeout.
 | ||
| +
 | ||
| Default: `3000`.
 | ||
| compressMessageBodyThreshold::
 | ||
| Compress message body threshold, namely, message body larger than 4k will be compressed on default.
 | ||
| +
 | ||
| Default: `4096`.
 | ||
| retryTimesWhenSendFailed::
 | ||
| Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
 | ||
| +
 | ||
| Default: `2`.
 | ||
| retryTimesWhenSendAsyncFailed::
 | ||
| Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
 | ||
| +
 | ||
| Default: `2`.
 | ||
| retryNextServer::
 | ||
| Indicate whether to retry another broker on sending failure internally.
 | ||
| +
 | ||
| Default: `false`. |