mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
update docs
This commit is contained in:
@@ -137,6 +137,11 @@ messageChannel.send(MessageBuilder.withPayload("simple msg").build());
|
||||
|
||||
### Spring Cloud Alibaba RocketMQ Binder 实现
|
||||
|
||||
这是 Spring Cloud Stream RocketMQ Binder 的实现架构:
|
||||
|
||||
.SCS RocketMQ Binder
|
||||
image::https://img.alicdn.com/tfs/TB1v8rcbUY1gK0jSZFCXXcwqXXa-1236-773.png[]
|
||||
|
||||
RocketMQ Binder 的实现依赖于 https://github.com/apache/rocketmq-spring[RocketMQ-Spring] 框架。
|
||||
|
||||
RocketMQ-Spring 框架是 RocketMQ 与 Spring Boot 的整合,RocketMQ Spring 主要提供了 3 个特性:
|
||||
@@ -166,12 +171,66 @@ Message message = builder.build();
|
||||
output().send(message);
|
||||
```
|
||||
|
||||
### MessageSource 支持
|
||||
|
||||
SCS RocketMQ Binder 支持 `MessageSource`,可以进行消息的拉取,例子如下:
|
||||
|
||||
```java
|
||||
@SpringBootApplication
|
||||
@EnableBinding(MQApplication.PolledProcessor.class)
|
||||
public class MQApplication {
|
||||
|
||||
private final Logger logger =
|
||||
LoggerFactory.getLogger(MQApplication.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(MQApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationRunner runner(PollableMessageSource source,
|
||||
MessageChannel dest) {
|
||||
return args -> {
|
||||
while (true) {
|
||||
boolean result = source.poll(m -> {
|
||||
String payload = (String) m.getPayload();
|
||||
logger.info("Received: " + payload);
|
||||
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
|
||||
.copyHeaders(m.getHeaders())
|
||||
.build());
|
||||
}, new ParameterizedTypeReference<String>() { });
|
||||
if (result) {
|
||||
logger.info("Processed a message");
|
||||
}
|
||||
else {
|
||||
logger.info("Nothing to do");
|
||||
}
|
||||
Thread.sleep(5_000);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static interface PolledProcessor {
|
||||
|
||||
@Input
|
||||
PollableMessageSource source();
|
||||
|
||||
@Output
|
||||
MessageChannel dest();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
### 配置选项
|
||||
|
||||
#### RocketMQ Binder Properties
|
||||
|
||||
spring.cloud.stream.rocketmq.binder.name-server::
|
||||
RocketMQ NameServer 地址。
|
||||
RocketMQ NameServer 地址(老版本使用 namesrv-addr 配置项)。
|
||||
+
|
||||
Default: `127.0.0.1:9876`.
|
||||
spring.cloud.stream.rocketmq.binder.access-key::
|
||||
@@ -276,3 +335,20 @@ retryNextServer::
|
||||
消息发送失败的情况下是否重试其它的 broker。
|
||||
+
|
||||
默认值: `false`.
|
||||
|
||||
### 阿里云 MQ 服务
|
||||
|
||||
使用阿里云 MQ 服务需要配置 AccessKey、SecretKey 以及云上的 NameServer 地址。
|
||||
|
||||
NOTE: 0.1.2 & 0.2.2 & 0.9.0 才支持该功能
|
||||
|
||||
```properties
|
||||
spring.cloud.stream.rocketmq.binder.access-key=YourAccessKey
|
||||
spring.cloud.stream.rocketmq.binder.secret-key=YourSecretKey
|
||||
spring.cloud.stream.rocketmq.binder.name-server=NameServerInMQ
|
||||
```
|
||||
|
||||
NOTE: topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test"
|
||||
|
||||
.NameServer 的获取(配置中请去掉 http:// 前缀)
|
||||
image::https://spring-cloud-alibaba.oss-cn-beijing.aliyuncs.com/MQ.png[]
|
||||
Reference in New Issue
Block a user