mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Merge branch 'master' of https://github.com/spring-cloud-incubator/spring-cloud-alibaba
This commit is contained in:
commit
94f3de3b36
1
pom.xml
1
pom.xml
@ -100,6 +100,7 @@
|
||||
<module>spring-cloud-alicloud-oss</module>
|
||||
<module>spring-cloud-alicloud-acm</module>
|
||||
<module>spring-cloud-alicloud-ans</module>
|
||||
<module>spring-cloud-alicloud-schedulerX</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -23,9 +23,10 @@
|
||||
<acm.version>1.0.8</acm.version>
|
||||
<ans.version>0.1.1</ans.version>
|
||||
<aliyun.sdk.version>4.0.1</aliyun.sdk.version>
|
||||
<alicloud.context.version>1.0.0</alicloud.context.version>
|
||||
<alicloud.context.version>1.0.1</alicloud.context.version>
|
||||
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
|
||||
<rocketmq.version>4.3.1</rocketmq.version>
|
||||
<schedulerX.version>2.3.1-SNAPSHOT</schedulerX.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
@ -63,6 +64,11 @@
|
||||
<artifactId>acm-sdk</artifactId>
|
||||
<version>${acm.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.schedulerx</groupId>
|
||||
<artifactId>schedulerx-client</artifactId>
|
||||
<version>${schedulerX.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--Nacos-->
|
||||
<dependency>
|
||||
@ -191,6 +197,11 @@
|
||||
<artifactId>spring-cloud-alicloud-ans</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alicloud-schedulerX</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alicloud-context</artifactId>
|
||||
@ -243,6 +254,12 @@
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alicloud-schedulerX</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||||
|
@ -301,3 +301,64 @@ spring.cloud.nacos.config.group=DEVELOP_GROUP
|
||||
----
|
||||
|
||||
NOTE: 该配置必须放在 bootstrap.properties 文件中。并且在添加配置时 Group 的值一定要和 `spring.cloud.nacos.config.group` 的配置值一致。
|
||||
|
||||
=== 支持自定义扩展的 Data Id 配置
|
||||
|
||||
Spring Cloud Alibaba Nacos Config 从 0.2.1 版本后,可支持自定义 Data Id 的配置。关于这部分详细的设计可参考 https://github.com/spring-cloud-incubator/spring-cloud-alibaba/issues/141[这里]。
|
||||
一个完整的配置案例如下所示:
|
||||
|
||||
[source,properties]
|
||||
----
|
||||
spring.application.name=opensource-service-provider
|
||||
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
|
||||
|
||||
# config external configuration
|
||||
# 1、Data Id 在默认的组 DEFAULT_GROUP,不支持配置的动态刷新
|
||||
spring.cloud.nacos.config.ext-config[0].data-id=ext-config-common01.properties
|
||||
|
||||
# 2、Data Id 不在默认的组,不支持动态刷新
|
||||
spring.cloud.nacos.config.ext-config[1].data-id=ext-config-common02.properties
|
||||
spring.cloud.nacos.config.ext-config[1].group=GLOBALE_GROUP
|
||||
|
||||
# 3、Data Id 既不在默认的组,也支持动态刷新
|
||||
spring.cloud.nacos.config.ext-config[2].data-id=ext-config-common03.properties
|
||||
spring.cloud.nacos.config.ext-config[2].group=REFRESH_GROUP
|
||||
spring.cloud.nacos.config.ext-config[2].refresh=true
|
||||
----
|
||||
|
||||
可以看到:
|
||||
|
||||
* 通过 `spring.cloud.nacos.config.ext-config[n].data-id` 的配置方式来支持多个 Data Id 的配置。
|
||||
* 通过 `spring.cloud.nacos.config.ext-config[n].group` 的配置方式自定义 Data Id 所在的组,不明确配置的话,默认是 DEFAULT_GROUP。
|
||||
* 通过 `spring.cloud.nacos.config.ext-config[n].refresh` 的配置方式来控制该 Data Id 在配置变更时,是否支持应用中可动态刷新,
|
||||
感知到最新的配置值。默认是不支持的。
|
||||
|
||||
|
||||
NOTE: 多个 Data Id 同时配置时,他的优先级关系是 `spring.cloud.nacos.config.ext-config[n].data-id` 其中 n 的值越大,优先级越高。
|
||||
|
||||
NOTE: `spring.cloud.nacos.config.ext-config[n].data-id` 的值必须带文件扩展名,文件扩展名既可支持 properties,又可以支持 yaml/yml。
|
||||
此时 `spring.cloud.nacos.config.file-extension` 的配置对自定义扩展配置的 Data Id 文件扩展名没有影响。
|
||||
|
||||
通过自定义扩展的 Data Id 配置,既可以解决多个应用间配置共享的问题,又可以支持一个应用有多个配置文件。
|
||||
|
||||
为了更加清晰的在多个应用间配置共享的 Data Id ,你可以通过以下的方式来配置:
|
||||
|
||||
[source,properties]
|
||||
----
|
||||
spring.cloud.nacos.config.shared-dataids=bootstrap-common.properties,all-common.properties
|
||||
spring.cloud.nacos.config.refreshable-dataids=bootstrap-common.properties
|
||||
----
|
||||
|
||||
可以看到:
|
||||
|
||||
* 通过 `spring.cloud.nacos.config.shared-dataids` 来支持多个共享 Data Id 的配置,多个之间用逗号隔开。
|
||||
* 通过 `spring.cloud.nacos.config.refreshable-dataids` 来支持哪些共享配置的 Data Id 在配置变化时,应用中是否可动态刷新,
|
||||
感知到最新的配置值,多个 Data Id 之间用逗号隔开。如果没有明确配置,默认情况下所有共享配置的 Data Id 都不支持动态刷新。
|
||||
|
||||
NOTE: 通过 `spring.cloud.nacos.config.shared-dataids` 来支持多个共享配置的 Data Id 时,
|
||||
多个共享配置间的一个优先级的关系我们约定:按照配置出现的先后顺序,即后面的优先级要高于前面。
|
||||
|
||||
NOTE: 通过 `spring.cloud.nacos.config.shared-dataids` 来配置时,Data Id 必须带文件扩展名,文件扩展名既可支持 properties,也可以支持 yaml/yml。
|
||||
此时 `spring.cloud.nacos.config.file-extension` 的配置对自定义扩展配置的 Data Id 文件扩展名没有影响。
|
||||
|
||||
NOTE: `spring.cloud.nacos.config.refreshable-dataids` 给出哪些需要支持动态刷新时,Data Id 的值也必须明确给出文件扩展名。
|
@ -164,6 +164,9 @@ Provider端支持的配置:
|
||||
^|配置项 ^|含义 ^| 默认值
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4)
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`|
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`|
|
||||
|====
|
||||
|
||||
Consumer端支持的配置:
|
||||
@ -173,8 +176,8 @@ Consumer端支持的配置:
|
||||
|====
|
||||
^|配置项 ^|含义| 默认值
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割|
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息|
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤,订阅所有消息)|
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)|
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false
|
||||
|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false
|
||||
|====
|
||||
|
131
spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc
Normal file
131
spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc
Normal file
@ -0,0 +1,131 @@
|
||||
== Spring Cloud AliCloud SchedulerX
|
||||
|
||||
SchedulerX(分布式任务调度) 是隶属于阿里云EDAS产品的组件, Spring Cloud AliCloud SchedulerX 提供了在Spring Cloud的配置规范下,分布式任务调度的功能支持。SchedulerX可提供秒级、精准、高可靠、高可用的定时任务调度服务,并支持多种类型的任务调度,如简单单机任务、简单多机任务、脚本任务以及网格任务。
|
||||
|
||||
=== 如何引入 Spring Cloud AliCloud SchedulerX
|
||||
|
||||
Spring Cloud Alibaba 已经发布了0.2.1版本,需要首先导入依赖管理POM。
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
|
||||
<version>0.2.1.RELEASE</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
----
|
||||
|
||||
接下来引入 Spring Cloud AliCloud SchedulerX Starter 即可。
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alicloud-schedulerX</artifactId>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
=== 启动SchedulerX任务调度
|
||||
|
||||
当客户端引入了 Spring Cloud AliCloud SchedulerX Starter 以后,只需要进行一些简单的配置,就可以自动初始化SchedulerX的任务调度服务。
|
||||
|
||||
以下是一个简单的应用示例。
|
||||
|
||||
[source,java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
public class ScxApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ScxApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
在application.properties中,需要加上以下配置。
|
||||
|
||||
[source,properties]
|
||||
----
|
||||
server.port=18033
|
||||
# 其中cn-test是SchedulerX的测试区域
|
||||
spring.cloud.alicloud.scx.group-id=***
|
||||
spring.cloud.alicloud.edas.namespace=cn-test
|
||||
----
|
||||
|
||||
在获取group-id之前,需要首先 https://account.aliyun.com/register/register.htm?spm=5176.8142029.388261.26.e9396d3eEIv28g&oauth_callback=https%3A%2F%2Fwww.aliyun.com%2F[注册阿里云账号] ,然后 https://common-buy.aliyun.com/?spm=5176.11451019.0.0.6f5965c0Uq5tue&commodityCode=edaspostpay#/buy[开通EDAS服务] ,并 https://edas.console.aliyun.com/#/edasTools[开通分布式任务管理组件] 。
|
||||
|
||||
其中group-id的获取,请参考 https://help.aliyun.com/document_detail/98784.html?spm=a2c4g.11186623.2.17.23c87da9P2F3tG[这里]。
|
||||
|
||||
NOTE: 在创建group的时候,要选择"测试"区域。
|
||||
|
||||
=== 编写一个简单任务
|
||||
|
||||
简单任务是最常用的任务类型,只需要实现 ScxSimpleJobProcessor 接口即可。
|
||||
|
||||
以下是一个简单的单机类型任务示例。
|
||||
|
||||
[source,java]
|
||||
----
|
||||
public class SimpleTask implements ScxSimpleJobProcessor {
|
||||
|
||||
@Override
|
||||
public ProcessResult process(ScxSimpleJobContext context) {
|
||||
System.out.println("-----------Hello world---------------");
|
||||
ProcessResult processResult = new ProcessResult(true);
|
||||
return processResult;
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
=== 对任务进行调度
|
||||
|
||||
进入[SchedulerX任务列表](https://edas.console.aliyun.com/#/edasSchedulerXJob?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建Job",创建一个Job,即如下所示。
|
||||
|
||||
[source,text]
|
||||
----
|
||||
Job分组:测试——***-*-*-****
|
||||
Job处理接口:org.springframework.cloud.alibaba.cloud.examples.SimpleTask
|
||||
类型:简单Job单机版
|
||||
定时表达式:默认选项——0 * * * * ?
|
||||
Job描述:无
|
||||
自定义参数:无
|
||||
----
|
||||
|
||||
以上任务类型选择了"简单Job单机版",并且制定了Cron表达式为"0 * * * * ?",这意味着,每过一分钟,任务将会被执行且只执行一次。
|
||||
|
||||
更多任务类型,请参考 https://help.aliyun.com/document_detail/43136.html?spm=a2c4g.11186623.6.703.64e17da9br61ZS[SchedulerX官方文档]。
|
||||
|
||||
=== 生产环境使用
|
||||
|
||||
以上使用的都是SchedulerX的"测试"区域,主要用于本地调试和测试。
|
||||
|
||||
在生产级别,除了上面的group-id和namespace以外,还需要一些额外的配置,如下所示。
|
||||
|
||||
[source,properties]
|
||||
----
|
||||
server.port=18033
|
||||
# 其中cn-test是SchedulerX的测试区域
|
||||
spring.cloud.alicloud.scx.group-id=***
|
||||
spring.cloud.alicloud.edas.namespace=***
|
||||
# 当应用运行在EDAS上时,以下配置不需要手动配置。
|
||||
spring.cloud.alicloud.access-key=***
|
||||
spring.cloud.alicloud.secret-key=***
|
||||
# 以下配置不是必须的,请参考SchedulerX文档
|
||||
spring.cloud.alicloud.scx.domain-name=***
|
||||
----
|
||||
|
||||
其中group-id与之前的获取方式一样,namespace则是从EDAS控制台左侧"命名空间"列表中获取命名空间ID。
|
||||
|
||||
NOTE: group-id必须创建在namespace当中。
|
||||
|
||||
access-key以及secret-key为阿里云账号的AK/SK信息,如果应用在EDAS上部署,则不需要填写这两项信息,否则请前往 https://usercenter.console.aliyun.com/#/manage/ak[安全信息管理]获取。
|
||||
|
||||
domain-name并不是必须的,具体请参考 https://help.aliyun.com/document_detail/35359.html?spm=a2c4g.11186623.6.704.4abf1994SbgXYS[SchedulerX官方文档]。
|
@ -142,7 +142,7 @@ class EchoServiceFallback implements EchoService {
|
||||
}
|
||||
```
|
||||
|
||||
NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://requesturl
|
||||
NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:protocol://requesturl
|
||||
|
||||
`EchoService` 接口中方法 `echo` 对应的资源名为 `GET:http://service-provider/echo/{str}`。
|
||||
|
||||
@ -150,17 +150,17 @@ NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://reque
|
||||
|
||||
### RestTemplate 支持
|
||||
|
||||
Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。
|
||||
Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelRestTemplate` 注解。
|
||||
|
||||
```java
|
||||
@Bean
|
||||
@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
|
||||
@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
|
||||
public RestTemplate restTemplate() {
|
||||
return new RestTemplate();
|
||||
}
|
||||
```
|
||||
|
||||
`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
|
||||
`@SentinelRestTemplate` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
|
||||
|
||||
限流的资源规则提供两种粒度:
|
||||
|
||||
|
@ -29,3 +29,6 @@ include::acm.adoc[]
|
||||
|
||||
include::oss.adoc[]
|
||||
|
||||
include::schedulerx.adoc[]
|
||||
|
||||
|
||||
|
@ -20,9 +20,14 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -25,4 +25,13 @@
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -30,6 +30,7 @@
|
||||
<module>acm-example/acm-local-example</module>
|
||||
<module>rocketmq-example</module>
|
||||
<module>spring-cloud-bus-rocketmq-example</module>
|
||||
<module>schedulerX-example/schedulerX-simple-task-example</module>
|
||||
</modules>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,18 @@
|
||||
package org.springframework.cloud.alibaba.cloud.examples;
|
||||
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class MyTransactionCheckListener implements TransactionCheckListener {
|
||||
|
||||
@Override
|
||||
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
|
||||
System.out.println("TransactionCheckListener: " + new String(msg.getBody()));
|
||||
return LocalTransactionState.COMMIT_MESSAGE;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package org.springframework.cloud.alibaba.cloud.examples;
|
||||
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class MyTransactionExecuter implements LocalTransactionExecuter {
|
||||
@Override
|
||||
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
|
||||
if ("1".equals(msg.getUserProperty("test"))) {
|
||||
System.out.println(new String(msg.getBody()) + " rollback");
|
||||
return LocalTransactionState.ROLLBACK_MESSAGE;
|
||||
}
|
||||
System.out.println(new String(msg.getBody()) + " commit");
|
||||
return LocalTransactionState.COMMIT_MESSAGE;
|
||||
}
|
||||
}
|
@ -30,4 +30,9 @@ public class ReceiveService {
|
||||
System.out.println("input1 receive again: " + receiveMsg);
|
||||
}
|
||||
|
||||
@StreamListener("input4")
|
||||
public void receiveTransactionalMsg(String transactionMsg) {
|
||||
System.out.println("input4 receive transaction msg: " + transactionMsg);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink;
|
||||
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.messaging.Source;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableBinding({ Source.class, MySink.class })
|
||||
@EnableBinding({ MySource.class, MySink.class })
|
||||
public class RocketMQApplication {
|
||||
|
||||
public interface MySink {
|
||||
@ -29,6 +31,16 @@ public class RocketMQApplication {
|
||||
@Input("input3")
|
||||
SubscribableChannel input3();
|
||||
|
||||
@Input("input4")
|
||||
SubscribableChannel input4();
|
||||
}
|
||||
|
||||
public interface MySource {
|
||||
@Output("output1")
|
||||
MessageChannel output1();
|
||||
|
||||
@Output("output2")
|
||||
MessageChannel output2();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
@ -40,6 +52,11 @@ public class RocketMQApplication {
|
||||
return new CustomRunner();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CustomRunnerWithTransactional customRunnerWithTransactional() {
|
||||
return new CustomRunnerWithTransactional();
|
||||
}
|
||||
|
||||
public static class CustomRunner implements CommandLineRunner {
|
||||
@Autowired
|
||||
private SenderService senderService;
|
||||
@ -62,4 +79,21 @@ public class RocketMQApplication {
|
||||
}
|
||||
}
|
||||
|
||||
public static class CustomRunnerWithTransactional implements CommandLineRunner {
|
||||
@Autowired
|
||||
private SenderService senderService;
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
// COMMIT_MESSAGE message
|
||||
senderService.sendTransactionalMsg("transactional-msg1", false);
|
||||
// ROLLBACK_MESSAGE message
|
||||
senderService.sendTransactionalMsg("transactional-msg2", true);
|
||||
// ROLLBACK_MESSAGE message
|
||||
senderService.sendTransactionalMsg("transactional-msg3", true);
|
||||
// COMMIT_MESSAGE message
|
||||
senderService.sendTransactionalMsg("transactional-msg4", false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import java.util.stream.Stream;
|
||||
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.messaging.Source;
|
||||
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils;
|
||||
public class SenderService {
|
||||
|
||||
@Autowired
|
||||
private Source source;
|
||||
private MySource source;
|
||||
|
||||
public void send(String msg) throws Exception {
|
||||
source.output().send(MessageBuilder.withPayload(msg).build());
|
||||
source.output1().send(MessageBuilder.withPayload(msg).build());
|
||||
}
|
||||
|
||||
public <T> void sendWithTags(T msg, String tag) throws Exception {
|
||||
Message message = MessageBuilder.createMessage(msg,
|
||||
new MessageHeaders(Stream.of(tag).collect(Collectors
|
||||
.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
|
||||
source.output().send(message);
|
||||
source.output1().send(message);
|
||||
}
|
||||
|
||||
public <T> void sendObject(T msg, String tag) throws Exception {
|
||||
@ -37,7 +37,17 @@ public class SenderService {
|
||||
.setHeader(MessageConst.PROPERTY_TAGS, tag)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
|
||||
.build();
|
||||
source.output().send(message);
|
||||
source.output1().send(message);
|
||||
}
|
||||
|
||||
public <T> void sendTransactionalMsg(T msg, boolean error) throws Exception {
|
||||
MessageBuilder builder = MessageBuilder.withPayload(msg)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
|
||||
if (error) {
|
||||
builder.setHeader("test", "1");
|
||||
}
|
||||
Message message = builder.build();
|
||||
source.output2().send(message);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq
|
||||
|
||||
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
|
||||
|
||||
spring.cloud.stream.bindings.output.destination=test-topic
|
||||
spring.cloud.stream.bindings.output.content-type=application/json
|
||||
spring.cloud.stream.bindings.output1.destination=test-topic
|
||||
spring.cloud.stream.bindings.output1.content-type=application/json
|
||||
|
||||
spring.cloud.stream.bindings.output2.destination=TransactionTopic
|
||||
spring.cloud.stream.bindings.output2.content-type=application/json
|
||||
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
|
||||
spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter
|
||||
spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener
|
||||
|
||||
spring.cloud.stream.bindings.input1.destination=test-topic
|
||||
spring.cloud.stream.bindings.input1.content-type=text/plain
|
||||
@ -26,6 +32,11 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
|
||||
spring.cloud.stream.bindings.input3.consumer.concurrency=20
|
||||
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
|
||||
|
||||
spring.cloud.stream.bindings.input4.destination=TransactionTopic
|
||||
spring.cloud.stream.bindings.input4.content-type=text/plain
|
||||
spring.cloud.stream.bindings.input4.group=transaction-group
|
||||
spring.cloud.stream.bindings.input4.consumer.concurrency=210
|
||||
|
||||
spring.application.name=rocketmq-example
|
||||
|
||||
server.port=28081
|
||||
|
@ -0,0 +1,37 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>spring-cloud-alibaba-examples</artifactId>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<version>0.2.1.BUILD-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>schedulerX-simple-task-example</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alicloud-schedulerX</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -0,0 +1,47 @@
|
||||
# SchedulerX Simple Task Example
|
||||
|
||||
## 项目说明
|
||||
|
||||
本项目展示了,在Spring Cloud体系中,如何快如接入SchedulerX,使用任务调度服务。
|
||||
|
||||
SchedulerX 是阿里中间件团队开发的一款分布式任务调度产品。它为您提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。同时提供分布式的任务执行模型,如网格任务。网格任务支持海量子任务均匀分配到所有 Worker(schedulerx-client)上执行。
|
||||
|
||||
## 示例
|
||||
|
||||
### 准备工作
|
||||
|
||||
1. 请先[注册阿里云账号](https://account.aliyun.com/register/register.htm?spm=5176.8142029.388261.26.e9396d3eEIv28g&oauth_callback=https%3A%2F%2Fwww.aliyun.com%2F)
|
||||
|
||||
2. SchedulerX集成到了EDAS组件中心,因此需要[开通EDAS服务](https://common-buy.aliyun.com/?spm=5176.11451019.0.0.6f5965c0Uq5tue&commodityCode=edaspostpay#/buy)
|
||||
|
||||
3. 到[EDAS组件中心](https://edas.console.aliyun.com/#/edasTools)开通SchedulerX组件,即分布式任务管理。
|
||||
|
||||
4. 进入[SchedulerX分组管理](https://edas.console.aliyun.com/#/schedulerXGroup?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建分组",创建一个分组。
|
||||
|
||||
5. 将"分组ID"的值填写到`application.properties`文件中`key`为`spring.cloud.alicloud.scx.group-id`对应的value值,即如下所示。
|
||||
|
||||
spring.cloud.alicloud.scx.group-id=111-1-1-1111
|
||||
|
||||
6. 进入[SchedulerX任务列表](https://edas.console.aliyun.com/#/edasSchedulerXJob?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建Job",创建一个Job,即如下所示。
|
||||
|
||||
Job分组:测试——111-1-1-1111
|
||||
Job处理接口:org.springframework.cloud.alibaba.cloud.examples.SimpleTask
|
||||
类型:简单Job单机版
|
||||
定时表达式:默认选项——0 * * * * ?
|
||||
Job描述:无
|
||||
自定义参数:无
|
||||
|
||||
### 启动应用
|
||||
|
||||
直接运行main class,即`ScxApplication`。
|
||||
|
||||
### 查看效果
|
||||
|
||||
观察应用的控制台日志输出,可以看到每一分钟会打印一次如下日志。
|
||||
|
||||
```
|
||||
-----------Hello world---------------
|
||||
```
|
||||
|
||||
如果您对 Spring Cloud SchedulerX Starter 有任何建议或想法,欢迎提交 issue 中或者通过其他社区渠道向我们反馈。
|
||||
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alibaba.cloud.examples;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class ScxApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ScxApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alibaba.cloud.examples;
|
||||
|
||||
import com.alibaba.edas.schedulerx.ProcessResult;
|
||||
import com.alibaba.edas.schedulerx.ScxSimpleJobContext;
|
||||
import com.alibaba.edas.schedulerx.ScxSimpleJobProcessor;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
public class SimpleTask implements ScxSimpleJobProcessor {
|
||||
|
||||
@Override
|
||||
public ProcessResult process(ScxSimpleJobContext context) {
|
||||
System.out.println("-----------Hello world---------------");
|
||||
ProcessResult processResult = new ProcessResult(true);
|
||||
return processResult;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
server.port=18033
|
||||
spring.cloud.alicloud.scx.group-id=***
|
||||
spring.cloud.alicloud.edas.namespace=cn-test
|
@ -44,6 +44,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
|
@ -18,11 +18,9 @@ package org.springframework.cloud.alicloud.acm;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.alicloud.acm.endpoint.AcmHealthIndicator;
|
||||
import org.springframework.cloud.alicloud.acm.refresh.AcmContextRefresher;
|
||||
import org.springframework.cloud.alicloud.acm.refresh.AcmRefreshHistory;
|
||||
import org.springframework.cloud.alicloud.context.acm.AcmIntegrationProperties;
|
||||
import org.springframework.cloud.alicloud.context.acm.AcmProperties;
|
||||
import org.springframework.cloud.context.refresh.ContextRefresher;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
@ -47,12 +45,6 @@ public class AcmAutoConfiguration implements ApplicationContextAware {
|
||||
return new AcmPropertySourceRepository(applicationContext);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AcmHealthIndicator acmHealthIndicator(AcmProperties acmProperties,
|
||||
AcmPropertySourceRepository acmPropertySourceRepository) {
|
||||
return new AcmHealthIndicator(acmProperties, acmPropertySourceRepository);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AcmRefreshHistory acmRefreshHistory() {
|
||||
return new AcmRefreshHistory();
|
||||
|
@ -33,20 +33,26 @@ import org.springframework.context.annotation.Bean;
|
||||
@ConditionalOnClass(name = "org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration")
|
||||
public class AcmEndpointAutoConfiguration {
|
||||
|
||||
@Autowired
|
||||
private AcmProperties acmProperties;
|
||||
@Autowired
|
||||
private AcmProperties acmProperties;
|
||||
|
||||
@Autowired
|
||||
private AcmRefreshHistory acmRefreshHistory;
|
||||
@Autowired
|
||||
private AcmRefreshHistory acmRefreshHistory;
|
||||
|
||||
@Autowired
|
||||
private AcmPropertySourceRepository acmPropertySourceRepository;
|
||||
@Autowired
|
||||
private AcmPropertySourceRepository acmPropertySourceRepository;
|
||||
|
||||
@ConditionalOnMissingBean
|
||||
@ConditionalOnEnabledEndpoint
|
||||
@Bean
|
||||
public AcmEndpoint acmEndpoint() {
|
||||
return new AcmEndpoint(acmProperties, acmRefreshHistory,
|
||||
acmPropertySourceRepository);
|
||||
}
|
||||
@ConditionalOnMissingBean
|
||||
@ConditionalOnEnabledEndpoint
|
||||
@Bean
|
||||
public AcmEndpoint acmEndpoint() {
|
||||
return new AcmEndpoint(acmProperties, acmRefreshHistory,
|
||||
acmPropertySourceRepository);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AcmHealthIndicator acmHealthIndicator(AcmProperties acmProperties,
|
||||
AcmPropertySourceRepository acmPropertySourceRepository) {
|
||||
return new AcmHealthIndicator(acmProperties, acmPropertySourceRepository);
|
||||
}
|
||||
}
|
||||
|
@ -44,9 +44,20 @@
|
||||
<artifactId>spring-cloud-commons</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
@ -38,6 +38,12 @@
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.schedulerx</groupId>
|
||||
<artifactId>schedulerx-client</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.ans</groupId>
|
||||
<artifactId>ans-sdk</artifactId>
|
||||
|
@ -16,14 +16,24 @@
|
||||
|
||||
package org.springframework.cloud.alicloud.context;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.commons.util.InetUtils;
|
||||
import org.springframework.cloud.commons.util.InetUtilsProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(AliCloudProperties.class)
|
||||
@EnableConfigurationProperties({ AliCloudProperties.class, InetUtilsProperties.class })
|
||||
public class AliCloudContextAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public InetUtils inetUtils(InetUtilsProperties inetUtilsProperties) {
|
||||
return new InetUtils(inetUtilsProperties);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,12 +18,8 @@ package org.springframework.cloud.alicloud.context.ans;
|
||||
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration;
|
||||
import org.springframework.cloud.commons.util.InetUtils;
|
||||
import org.springframework.cloud.commons.util.InetUtilsProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
@ -31,14 +27,8 @@ import org.springframework.context.annotation.Configuration;
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = "org.springframework.cloud.alicloud.ans.AnsAutoConfiguration")
|
||||
@EnableConfigurationProperties({ AnsProperties.class, InetUtilsProperties.class })
|
||||
@EnableConfigurationProperties(AnsProperties.class)
|
||||
@ImportAutoConfiguration(EdasContextAutoConfiguration.class)
|
||||
public class AnsContextAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public InetUtils inetUtils(InetUtilsProperties inetUtilsProperties) {
|
||||
return new InetUtils(inetUtilsProperties);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,12 +1,13 @@
|
||||
package org.springframework.cloud.alicloud.context.nacos;
|
||||
|
||||
import com.alibaba.cloud.context.edas.EdasChangeOrderConfiguration;
|
||||
import com.alibaba.cloud.context.edas.EdasChangeOrderConfigurationFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
|
||||
import com.alibaba.cloud.context.edas.EdasChangeOrderConfiguration;
|
||||
import com.alibaba.cloud.context.edas.EdasChangeOrderConfigurationFactory;
|
||||
|
||||
public class NacosParameterInitListener
|
||||
implements ApplicationListener<ApplicationEnvironmentPreparedEvent> {
|
||||
private static final Logger log = LoggerFactory
|
||||
@ -20,7 +21,7 @@ public class NacosParameterInitListener
|
||||
|
||||
private void preparedNacosConfiguration() {
|
||||
EdasChangeOrderConfiguration edasChangeOrderConfiguration = EdasChangeOrderConfigurationFactory
|
||||
.buildEdasChangeOrderConfiguration();
|
||||
.getEdasChangeOrderConfiguration();
|
||||
log.info("Initialize Nacos Parameter from edas change order,is edas managed {}.",
|
||||
edasChangeOrderConfiguration.isEdasManaged());
|
||||
if (!edasChangeOrderConfiguration.isEdasManaged()) {
|
||||
|
@ -51,9 +51,9 @@ public class OssContextAutoConfiguration {
|
||||
Assert.isTrue(!StringUtils.isEmpty(ossProperties.getEndpoint()),
|
||||
"Oss endpoint can't be empty.");
|
||||
Assert.isTrue(!StringUtils.isEmpty(aliCloudProperties.getAccessKey()),
|
||||
"Access key can't be empty.");
|
||||
"${spring.cloud.alicloud.access-key} can't be empty.");
|
||||
Assert.isTrue(!StringUtils.isEmpty(aliCloudProperties.getSecretKey()),
|
||||
"Secret key can't be empty.");
|
||||
"${spring.cloud.alicloud.secret-key} can't be empty.");
|
||||
return new OSSClientBuilder().build(ossProperties.getEndpoint(),
|
||||
aliCloudProperties.getAccessKey(), aliCloudProperties.getSecretKey(),
|
||||
ossProperties.getConfig());
|
||||
|
@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alicloud.context.scx;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.alicloud.context.AliCloudProperties;
|
||||
import org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration;
|
||||
import org.springframework.cloud.alicloud.context.edas.EdasProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.alibaba.cloud.context.edas.AliCloudEdasSdk;
|
||||
import com.alibaba.cloud.context.scx.AliCloudScxInitializer;
|
||||
import com.alibaba.dts.common.exception.InitException;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = "org.springframework.cloud.alicloud.scx.ScxAutoConfiguration")
|
||||
@EnableConfigurationProperties(ScxProperties.class)
|
||||
@ImportAutoConfiguration(EdasContextAutoConfiguration.class)
|
||||
public class ScxContextAutoConfiguration {
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(ScxContextAutoConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private AliCloudProperties aliCloudProperties;
|
||||
|
||||
@Autowired
|
||||
private EdasProperties edasProperties;
|
||||
|
||||
@Autowired
|
||||
private ScxProperties scxProperties;
|
||||
|
||||
@Autowired
|
||||
private AliCloudEdasSdk aliCloudEdasSdk;
|
||||
|
||||
@PostConstruct
|
||||
public void initAcmProperties() {
|
||||
try {
|
||||
AliCloudScxInitializer.initialize(aliCloudProperties, edasProperties,
|
||||
scxProperties, aliCloudEdasSdk);
|
||||
}
|
||||
catch (InitException e) {
|
||||
log.error("Init SchedulerX failed.", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alicloud.context.scx;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
import com.alibaba.cloud.context.scx.ScxConfiguration;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
@ConfigurationProperties("spring.cloud.alicloud.scx")
|
||||
public class ScxProperties implements ScxConfiguration {
|
||||
|
||||
private String groupId;
|
||||
|
||||
private String domainName;
|
||||
|
||||
@Override
|
||||
public String getGroupId() {
|
||||
return groupId;
|
||||
}
|
||||
|
||||
public void setGroupId(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDomainName() {
|
||||
return domainName;
|
||||
}
|
||||
|
||||
public void setDomainName(String domainName) {
|
||||
this.domainName = domainName;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alicloud.context.statistics;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.cloud.alicloud.context.acm.AcmContextBootstrapConfiguration;
|
||||
import org.springframework.cloud.alicloud.context.acm.AcmProperties;
|
||||
import org.springframework.cloud.alicloud.context.ans.AnsContextAutoConfiguration;
|
||||
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
|
||||
import org.springframework.cloud.alicloud.context.edas.EdasProperties;
|
||||
import org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration;
|
||||
import org.springframework.cloud.alicloud.context.oss.OssProperties;
|
||||
import org.springframework.cloud.alicloud.context.scx.ScxContextAutoConfiguration;
|
||||
import org.springframework.cloud.alicloud.context.scx.ScxProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.alibaba.cloud.context.AliCloudServerMode;
|
||||
import com.alibaba.cloud.context.edas.AliCloudEdasSdk;
|
||||
import com.alibaba.cloud.context.statistics.StatisticsTask;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter({ ScxContextAutoConfiguration.class,
|
||||
OssContextAutoConfiguration.class, AnsContextAutoConfiguration.class,
|
||||
AcmContextBootstrapConfiguration.class })
|
||||
public class StatisticsTaskStarter implements InitializingBean {
|
||||
|
||||
@Autowired(required = false)
|
||||
private AliCloudEdasSdk aliCloudEdasSdk;
|
||||
|
||||
@Autowired(required = false)
|
||||
private EdasProperties edasProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private ScxProperties scxProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private OssProperties ossProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private AnsProperties ansProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private AcmProperties acmProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private ScxContextAutoConfiguration scxContextAutoConfiguration;
|
||||
|
||||
@Autowired(required = false)
|
||||
private OssContextAutoConfiguration ossContextAutoConfiguration;
|
||||
|
||||
@Autowired(required = false)
|
||||
private AnsContextAutoConfiguration ansContextAutoConfiguration;
|
||||
|
||||
@Autowired(required = false)
|
||||
private AcmContextBootstrapConfiguration acmContextBootstrapConfiguration;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
StatisticsTask statisticsTask = new StatisticsTask(aliCloudEdasSdk,
|
||||
edasProperties, getComponents());
|
||||
statisticsTask.start();
|
||||
}
|
||||
|
||||
private List<String> getComponents() {
|
||||
List<String> components = new ArrayList<>();
|
||||
if (scxContextAutoConfiguration != null && scxProperties != null) {
|
||||
components.add("SC-SCX");
|
||||
}
|
||||
if (ossContextAutoConfiguration != null && ossProperties != null) {
|
||||
components.add("SC-OSS");
|
||||
}
|
||||
boolean edasEnabled = edasProperties != null && edasProperties.isEnabled();
|
||||
boolean ansEnableEdas = edasEnabled || (ansProperties != null
|
||||
&& ansProperties.getServerMode() == AliCloudServerMode.EDAS);
|
||||
if (ansContextAutoConfiguration != null && ansEnableEdas) {
|
||||
components.add("SC-ANS");
|
||||
}
|
||||
boolean acmEnableEdas = edasEnabled || (acmProperties != null
|
||||
&& acmProperties.getServerMode() == AliCloudServerMode.EDAS);
|
||||
if (acmContextBootstrapConfiguration != null && acmEnableEdas) {
|
||||
components.add("SC-ACM");
|
||||
}
|
||||
return components;
|
||||
}
|
||||
|
||||
}
|
@ -4,7 +4,8 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.alicloud.context.AliCloudContextAutoConfiguration,\
|
||||
org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration,\
|
||||
org.springframework.cloud.alicloud.context.ans.AnsContextAutoConfiguration,\
|
||||
org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration
|
||||
org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration,\
|
||||
org.springframework.cloud.alicloud.context.scx.ScxContextAutoConfiguration,\
|
||||
org.springframework.cloud.alicloud.context.statistics.StatisticsTaskStarter
|
||||
org.springframework.context.ApplicationListener=\
|
||||
org.springframework.cloud.alicloud.context.ans.AnsContextApplicationListener,\
|
||||
org.springframework.cloud.alicloud.context.nacos.NacosParameterInitListener
|
||||
org.springframework.cloud.alicloud.context.ans.AnsContextApplicationListener
|
@ -33,7 +33,9 @@ import org.springframework.test.context.junit4.SpringRunner;
|
||||
"spring.application.name=myapp",
|
||||
"spring.cloud.alicloud.edas.application.name=myapp",
|
||||
"spring.cloud.alicloud.access-key=ak", "spring.cloud.alicloud.secret-key=sk",
|
||||
"spring.cloud.alicloud.oss.endpoint=test" }, webEnvironment = RANDOM_PORT)
|
||||
"spring.cloud.alicloud.oss.endpoint=test",
|
||||
"spring.cloud.alicloud.scx.group-id=1-2-3-4",
|
||||
"spring.cloud.alicloud.edas.namespace=cn-test" }, webEnvironment = RANDOM_PORT)
|
||||
@DirtiesContext
|
||||
public class AliCloudSpringApplicationTests {
|
||||
|
||||
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alicloud.context.scx;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.cloud.alicloud.context.edas.EdasProperties;
|
||||
import org.springframework.cloud.alicloud.context.oss.OssProperties;
|
||||
|
||||
import com.aliyun.oss.OSS;
|
||||
|
||||
/**
|
||||
* {@link OSS} {@link OssProperties} Test
|
||||
*
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ScxAutoConfigurationTests {
|
||||
|
||||
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
|
||||
.withConfiguration(AutoConfigurations.of(ScxContextAutoConfiguration.class))
|
||||
.withPropertyValues("spring.cloud.alicloud.scx.group-id=1-2-3-4")
|
||||
.withPropertyValues("spring.cloud.alicloud.edas.namespace=cn-test");
|
||||
|
||||
@Test
|
||||
public void testSxcProperties() {
|
||||
this.contextRunner.run(context -> {
|
||||
assertThat(context.getBeansOfType(ScxProperties.class).size() == 1).isTrue();
|
||||
EdasProperties edasProperties = context.getBean(EdasProperties.class);
|
||||
ScxProperties scxProperties = context.getBean(ScxProperties.class);
|
||||
assertThat(scxProperties.getGroupId()).isEqualTo("1-2-3-4");
|
||||
assertThat(edasProperties.getNamespace()).isEqualTo("cn-test");
|
||||
assertThat(scxProperties.getDomainName()).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alicloud.scx;
|
||||
|
||||
/**
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
public class ScxAutoConfiguration {
|
||||
}
|
33
spring-cloud-alicloud-schedulerX/pom.xml
Normal file
33
spring-cloud-alicloud-schedulerX/pom.xml
Normal file
@ -0,0 +1,33 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>spring-cloud-alibaba</artifactId>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<version>0.2.1.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-cloud-alicloud-schedulerX</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alicloud-context</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.schedulerx</groupId>
|
||||
<artifactId>schedulerx-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>aliyun-java-sdk-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>aliyun-java-sdk-edas</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alicloud.scx;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* placeholder configuration
|
||||
*
|
||||
* @author xiaolongzuo
|
||||
*/
|
||||
@Configuration
|
||||
public class ScxAutoConfiguration {
|
||||
|
||||
}
|
@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.alicloud.scx.ScxAutoConfiguration
|
@ -14,5 +14,6 @@
|
||||
<module>spring-cloud-starter-alicloud-oss</module>
|
||||
<module>spring-cloud-starter-alicloud-acm</module>
|
||||
<module>spring-cloud-starter-alicloud-ans</module>
|
||||
<module>spring-cloud-starter-alicloud-schedulerX</module>
|
||||
</modules>
|
||||
</project>
|
@ -0,0 +1,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alicloud</artifactId>
|
||||
<version>0.2.1.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-alicloud-schedulerX</artifactId>
|
||||
<name>Spring Cloud Starter Alibaba Cloud OSS</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alicloud-schedulerX</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -32,6 +32,8 @@ public interface RocketMQBinderConstants {
|
||||
|
||||
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
|
||||
|
||||
String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG";
|
||||
|
||||
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
||||
|
||||
/**
|
||||
|
@ -16,6 +16,9 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
MessageChannel errorChannel) throws Exception {
|
||||
if (producerProperties.getExtension().getEnabled()) {
|
||||
return new RocketMQMessageHandler(destination.getName(),
|
||||
producerProperties.getExtension(),
|
||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||
destination.getName(), producerProperties,
|
||||
rocketBinderConfigurationProperties, instrumentationManager);
|
||||
if (producerProperties.getExtension().getTransactional()) {
|
||||
// transaction message check LocalTransactionExecuter
|
||||
messageHandler.setLocalTransactionExecuter(
|
||||
getClassConfiguration(destination.getName(),
|
||||
producerProperties.getExtension().getExecuter(),
|
||||
LocalTransactionExecuter.class));
|
||||
// transaction message check TransactionCheckListener
|
||||
messageHandler.setTransactionCheckListener(
|
||||
getClassConfiguration(destination.getName(),
|
||||
producerProperties.getExtension()
|
||||
.getTransactionCheckListener(),
|
||||
TransactionCheckListener.class));
|
||||
}
|
||||
|
||||
return messageHandler;
|
||||
}
|
||||
else {
|
||||
throw new RuntimeException("Binding for channel " + destination.getName()
|
||||
@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends
|
||||
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
|
||||
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
||||
}
|
||||
|
||||
private <T> T getClassConfiguration(String destName, String className,
|
||||
Class<T> interfaceClass) {
|
||||
if (StringUtils.isEmpty(className)) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, should set "
|
||||
+ interfaceClass.getSimpleName() + " configuration"
|
||||
+ interfaceClass.getSimpleName() + " should be set, like "
|
||||
+ "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour"
|
||||
+ interfaceClass.getSimpleName() + "'");
|
||||
}
|
||||
else if (StringUtils.isNotEmpty(className)) {
|
||||
Class fieldClass;
|
||||
// check class exists
|
||||
try {
|
||||
fieldClass = ClassUtils.forName(className,
|
||||
RocketMQMessageChannelBinder.class.getClassLoader());
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, but " + className
|
||||
+ " class is not found");
|
||||
}
|
||||
// check interface incompatible
|
||||
if (!interfaceClass.isAssignableFrom(fieldClass)) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, but " + className
|
||||
+ " is incompatible with " + interfaceClass.getSimpleName()
|
||||
+ " interface");
|
||||
}
|
||||
try {
|
||||
return (T) fieldClass.newInstance();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, but " + className
|
||||
+ " instance error", e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Object getTransactionalArg() {
|
||||
return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG);
|
||||
}
|
||||
|
||||
public Object withTransactionalArg(Object arg) {
|
||||
setHeader(ROCKET_TRANSACTIONAL_ARG, arg);
|
||||
return this;
|
||||
}
|
||||
|
||||
public SendResult getSendResult() {
|
||||
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
|
||||
}
|
||||
|
@ -23,10 +23,14 @@ import java.util.Optional;
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.client.producer.SendStatus;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
import org.apache.rocketmq.client.producer.TransactionMQProducer;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
private final RocketMQProducerProperties producerProperties;
|
||||
private LocalTransactionExecuter localTransactionExecuter;
|
||||
|
||||
private TransactionCheckListener transactionCheckListener;
|
||||
|
||||
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
|
||||
|
||||
private final String destination;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
protected volatile boolean running = false;
|
||||
private volatile boolean running = false;
|
||||
|
||||
public RocketMQMessageHandler(String destination,
|
||||
RocketMQProducerProperties producerProperties,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
this.destination = destination;
|
||||
@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
producer = new DefaultMQProducer(destination);
|
||||
if (producerProperties.getExtension().getTransactional()) {
|
||||
producer = new TransactionMQProducer(destination);
|
||||
if (transactionCheckListener != null) {
|
||||
((TransactionMQProducer) producer)
|
||||
.setTransactionCheckListener(transactionCheckListener);
|
||||
}
|
||||
}
|
||||
else {
|
||||
producer = new DefaultMQProducer(destination);
|
||||
}
|
||||
|
||||
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
||||
producerInstrumentation = manager.getProducerInstrumentation(destination);
|
||||
@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
|
||||
if (producerProperties.getMaxMessageSize() > 0) {
|
||||
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
|
||||
if (producerProperties.getExtension().getMaxMessageSize() > 0) {
|
||||
producer.setMaxMessageSize(
|
||||
producerProperties.getExtension().getMaxMessageSize());
|
||||
}
|
||||
|
||||
try {
|
||||
@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
toSend.putUserProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
SendResult sendRes = producer.send(toSend);
|
||||
SendResult sendRes;
|
||||
if (producerProperties.getExtension().getTransactional()) {
|
||||
sendRes = producer.sendMessageInTransaction(toSend,
|
||||
localTransactionExecuter, headerAccessor.getTransactionalArg());
|
||||
}
|
||||
else {
|
||||
sendRes = producer.send(toSend);
|
||||
}
|
||||
|
||||
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
throw new MQClientException("message hasn't been sent", null);
|
||||
@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
}
|
||||
|
||||
public void setLocalTransactionExecuter(
|
||||
LocalTransactionExecuter localTransactionExecuter) {
|
||||
this.localTransactionExecuter = localTransactionExecuter;
|
||||
}
|
||||
|
||||
public void setTransactionCheckListener(
|
||||
TransactionCheckListener transactionCheckListener) {
|
||||
this.transactionCheckListener = transactionCheckListener;
|
||||
}
|
||||
}
|
@ -17,6 +17,8 @@
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
@ -31,6 +33,18 @@ public class RocketMQProducerProperties {
|
||||
*/
|
||||
private Integer maxMessageSize = 0;
|
||||
|
||||
private Boolean transactional = false;
|
||||
|
||||
/**
|
||||
* full class name of {@link LocalTransactionExecuter}
|
||||
*/
|
||||
private String executer;
|
||||
|
||||
/**
|
||||
* full class name of {@link TransactionCheckListener}
|
||||
*/
|
||||
private String transactionCheckListener;
|
||||
|
||||
public Boolean getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
@ -47,4 +61,27 @@ public class RocketMQProducerProperties {
|
||||
this.maxMessageSize = maxMessageSize;
|
||||
}
|
||||
|
||||
public Boolean getTransactional() {
|
||||
return transactional;
|
||||
}
|
||||
|
||||
public void setTransactional(Boolean transactional) {
|
||||
this.transactional = transactional;
|
||||
}
|
||||
|
||||
public String getExecuter() {
|
||||
return executer;
|
||||
}
|
||||
|
||||
public void setExecuter(String executer) {
|
||||
this.executer = executer;
|
||||
}
|
||||
|
||||
public String getTransactionCheckListener() {
|
||||
return transactionCheckListener;
|
||||
}
|
||||
|
||||
public void setTransactionCheckListener(String transactionCheckListener) {
|
||||
this.transactionCheckListener = transactionCheckListener;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user