diff --git a/pom.xml b/pom.xml index b0532ec3..68db7541 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ spring-cloud-alicloud-oss spring-cloud-alicloud-acm spring-cloud-alicloud-ans + spring-cloud-alicloud-schedulerX diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index b966c312..58578cf2 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -23,9 +23,10 @@ 1.0.8 0.1.1 4.0.1 - 1.0.0 + 1.0.1 2.16.0 4.3.1 + 2.3.1-SNAPSHOT @@ -63,6 +64,11 @@ acm-sdk ${acm.version} + + com.alibaba.schedulerx + schedulerx-client + ${schedulerX.version} + @@ -191,6 +197,11 @@ spring-cloud-alicloud-ans ${project.version} + + org.springframework.cloud + spring-cloud-alicloud-schedulerX + ${project.version} + org.springframework.cloud spring-cloud-alicloud-context @@ -243,6 +254,12 @@ ${project.version} + + org.springframework.cloud + spring-cloud-starter-alicloud-schedulerX + ${project.version} + + org.springframework.cloud spring-cloud-starter-stream-rocketmq diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/nacos-config.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/nacos-config.adoc index 1cd2e71a..debd5fad 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/nacos-config.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/nacos-config.adoc @@ -301,3 +301,64 @@ spring.cloud.nacos.config.group=DEVELOP_GROUP ---- NOTE: 该配置必须放在 bootstrap.properties 文件中。并且在添加配置时 Group 的值一定要和 `spring.cloud.nacos.config.group` 的配置值一致。 + +=== 支持自定义扩展的 Data Id 配置 + +Spring Cloud Alibaba Nacos Config 从 0.2.1 版本后,可支持自定义 Data Id 的配置。关于这部分详细的设计可参考 https://github.com/spring-cloud-incubator/spring-cloud-alibaba/issues/141[这里]。 +一个完整的配置案例如下所示: + +[source,properties] +---- +spring.application.name=opensource-service-provider +spring.cloud.nacos.config.server-addr=127.0.0.1:8848 + +# config external configuration +# 1、Data Id 在默认的组 DEFAULT_GROUP,不支持配置的动态刷新 +spring.cloud.nacos.config.ext-config[0].data-id=ext-config-common01.properties + +# 2、Data Id 不在默认的组,不支持动态刷新 +spring.cloud.nacos.config.ext-config[1].data-id=ext-config-common02.properties +spring.cloud.nacos.config.ext-config[1].group=GLOBALE_GROUP + +# 3、Data Id 既不在默认的组,也支持动态刷新 +spring.cloud.nacos.config.ext-config[2].data-id=ext-config-common03.properties +spring.cloud.nacos.config.ext-config[2].group=REFRESH_GROUP +spring.cloud.nacos.config.ext-config[2].refresh=true +---- + +可以看到: + +* 通过 `spring.cloud.nacos.config.ext-config[n].data-id` 的配置方式来支持多个 Data Id 的配置。 +* 通过 `spring.cloud.nacos.config.ext-config[n].group` 的配置方式自定义 Data Id 所在的组,不明确配置的话,默认是 DEFAULT_GROUP。 +* 通过 `spring.cloud.nacos.config.ext-config[n].refresh` 的配置方式来控制该 Data Id 在配置变更时,是否支持应用中可动态刷新, +感知到最新的配置值。默认是不支持的。 + + +NOTE: 多个 Data Id 同时配置时,他的优先级关系是 `spring.cloud.nacos.config.ext-config[n].data-id` 其中 n 的值越大,优先级越高。 + +NOTE: `spring.cloud.nacos.config.ext-config[n].data-id` 的值必须带文件扩展名,文件扩展名既可支持 properties,又可以支持 yaml/yml。 +此时 `spring.cloud.nacos.config.file-extension` 的配置对自定义扩展配置的 Data Id 文件扩展名没有影响。 + +通过自定义扩展的 Data Id 配置,既可以解决多个应用间配置共享的问题,又可以支持一个应用有多个配置文件。 + +为了更加清晰的在多个应用间配置共享的 Data Id ,你可以通过以下的方式来配置: + +[source,properties] +---- +spring.cloud.nacos.config.shared-dataids=bootstrap-common.properties,all-common.properties +spring.cloud.nacos.config.refreshable-dataids=bootstrap-common.properties +---- + +可以看到: + +* 通过 `spring.cloud.nacos.config.shared-dataids` 来支持多个共享 Data Id 的配置,多个之间用逗号隔开。 +* 通过 `spring.cloud.nacos.config.refreshable-dataids` 来支持哪些共享配置的 Data Id 在配置变化时,应用中是否可动态刷新, +感知到最新的配置值,多个 Data Id 之间用逗号隔开。如果没有明确配置,默认情况下所有共享配置的 Data Id 都不支持动态刷新。 + +NOTE: 通过 `spring.cloud.nacos.config.shared-dataids` 来支持多个共享配置的 Data Id 时, +多个共享配置间的一个优先级的关系我们约定:按照配置出现的先后顺序,即后面的优先级要高于前面。 + +NOTE: 通过 `spring.cloud.nacos.config.shared-dataids` 来配置时,Data Id 必须带文件扩展名,文件扩展名既可支持 properties,也可以支持 yaml/yml。 +此时 `spring.cloud.nacos.config.file-extension` 的配置对自定义扩展配置的 Data Id 文件扩展名没有影响。 + +NOTE: `spring.cloud.nacos.config.refreshable-dataids` 给出哪些需要支持动态刷新时,Data Id 的值也必须明确给出文件扩展名。 \ No newline at end of file diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc index 5443b7dd..3724fd41 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc @@ -164,6 +164,9 @@ Provider端支持的配置: ^|配置项 ^|含义 ^| 默认值 |`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true |`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4) +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`| +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`| |==== Consumer端支持的配置: @@ -173,8 +176,8 @@ Consumer端支持的配置: |==== ^|配置项 ^|含义| 默认值 |`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割| -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息| +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤,订阅所有消息)| +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)| |`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false |`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false |==== diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc new file mode 100644 index 00000000..fbe1307c --- /dev/null +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc @@ -0,0 +1,131 @@ +== Spring Cloud AliCloud SchedulerX + +SchedulerX(分布式任务调度) 是隶属于阿里云EDAS产品的组件, Spring Cloud AliCloud SchedulerX 提供了在Spring Cloud的配置规范下,分布式任务调度的功能支持。SchedulerX可提供秒级、精准、高可靠、高可用的定时任务调度服务,并支持多种类型的任务调度,如简单单机任务、简单多机任务、脚本任务以及网格任务。 + +=== 如何引入 Spring Cloud AliCloud SchedulerX + +Spring Cloud Alibaba 已经发布了0.2.1版本,需要首先导入依赖管理POM。 + +[source,xml] +---- + + + + org.springframework.cloud + spring-cloud-alibaba-dependencies + 0.2.1.RELEASE + pom + import + + + +---- + +接下来引入 Spring Cloud AliCloud SchedulerX Starter 即可。 + +[source,xml] +---- + + org.springframework.cloud + spring-cloud-starter-alicloud-schedulerX + +---- + +=== 启动SchedulerX任务调度 + +当客户端引入了 Spring Cloud AliCloud SchedulerX Starter 以后,只需要进行一些简单的配置,就可以自动初始化SchedulerX的任务调度服务。 + +以下是一个简单的应用示例。 + +[source,java] +---- +@SpringBootApplication +public class ScxApplication { + + public static void main(String[] args) { + SpringApplication.run(ScxApplication.class, args); + } + +} +---- + +在application.properties中,需要加上以下配置。 + +[source,properties] +---- +server.port=18033 +# 其中cn-test是SchedulerX的测试区域 +spring.cloud.alicloud.scx.group-id=*** +spring.cloud.alicloud.edas.namespace=cn-test +---- + +在获取group-id之前,需要首先 https://account.aliyun.com/register/register.htm?spm=5176.8142029.388261.26.e9396d3eEIv28g&oauth_callback=https%3A%2F%2Fwww.aliyun.com%2F[注册阿里云账号] ,然后 https://common-buy.aliyun.com/?spm=5176.11451019.0.0.6f5965c0Uq5tue&commodityCode=edaspostpay#/buy[开通EDAS服务] ,并 https://edas.console.aliyun.com/#/edasTools[开通分布式任务管理组件] 。 + +其中group-id的获取,请参考 https://help.aliyun.com/document_detail/98784.html?spm=a2c4g.11186623.2.17.23c87da9P2F3tG[这里]。 + +NOTE: 在创建group的时候,要选择"测试"区域。 + +=== 编写一个简单任务 + +简单任务是最常用的任务类型,只需要实现 ScxSimpleJobProcessor 接口即可。 + +以下是一个简单的单机类型任务示例。 + +[source,java] +---- +public class SimpleTask implements ScxSimpleJobProcessor { + + @Override + public ProcessResult process(ScxSimpleJobContext context) { + System.out.println("-----------Hello world---------------"); + ProcessResult processResult = new ProcessResult(true); + return processResult; + } + +} +---- + +=== 对任务进行调度 + +进入[SchedulerX任务列表](https://edas.console.aliyun.com/#/edasSchedulerXJob?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建Job",创建一个Job,即如下所示。 + +[source,text] +---- +Job分组:测试——***-*-*-**** +Job处理接口:org.springframework.cloud.alibaba.cloud.examples.SimpleTask +类型:简单Job单机版 +定时表达式:默认选项——0 * * * * ? +Job描述:无 +自定义参数:无 +---- + +以上任务类型选择了"简单Job单机版",并且制定了Cron表达式为"0 * * * * ?",这意味着,每过一分钟,任务将会被执行且只执行一次。 + +更多任务类型,请参考 https://help.aliyun.com/document_detail/43136.html?spm=a2c4g.11186623.6.703.64e17da9br61ZS[SchedulerX官方文档]。 + +=== 生产环境使用 + +以上使用的都是SchedulerX的"测试"区域,主要用于本地调试和测试。 + +在生产级别,除了上面的group-id和namespace以外,还需要一些额外的配置,如下所示。 + +[source,properties] +---- +server.port=18033 +# 其中cn-test是SchedulerX的测试区域 +spring.cloud.alicloud.scx.group-id=*** +spring.cloud.alicloud.edas.namespace=*** +# 当应用运行在EDAS上时,以下配置不需要手动配置。 +spring.cloud.alicloud.access-key=*** +spring.cloud.alicloud.secret-key=*** +# 以下配置不是必须的,请参考SchedulerX文档 +spring.cloud.alicloud.scx.domain-name=*** +---- + +其中group-id与之前的获取方式一样,namespace则是从EDAS控制台左侧"命名空间"列表中获取命名空间ID。 + +NOTE: group-id必须创建在namespace当中。 + +access-key以及secret-key为阿里云账号的AK/SK信息,如果应用在EDAS上部署,则不需要填写这两项信息,否则请前往 https://usercenter.console.aliyun.com/#/manage/ak[安全信息管理]获取。 + +domain-name并不是必须的,具体请参考 https://help.aliyun.com/document_detail/35359.html?spm=a2c4g.11186623.6.704.4abf1994SbgXYS[SchedulerX官方文档]。 \ No newline at end of file diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc index ed1b0d83..c1e699b1 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc @@ -142,7 +142,7 @@ class EchoServiceFallback implements EchoService { } ``` -NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://requesturl +NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:protocol://requesturl `EchoService` 接口中方法 `echo` 对应的资源名为 `GET:http://service-provider/echo/{str}`。 @@ -150,17 +150,17 @@ NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:http://reque ### RestTemplate 支持 -Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。 +Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelRestTemplate` 注解。 ```java @Bean -@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class) +@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class) public RestTemplate restTemplate() { return new RestTemplate(); } ``` -`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。 +`@SentinelRestTemplate` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。 限流的资源规则提供两种粒度: diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc index 77083a32..c831c366 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc @@ -29,3 +29,6 @@ include::acm.adoc[] include::oss.adoc[] +include::schedulerx.adoc[] + + diff --git a/spring-cloud-alibaba-examples/acm-example/acm-local-example/pom.xml b/spring-cloud-alibaba-examples/acm-example/acm-local-example/pom.xml index 504f19ea..ce6a1740 100644 --- a/spring-cloud-alibaba-examples/acm-example/acm-local-example/pom.xml +++ b/spring-cloud-alibaba-examples/acm-example/acm-local-example/pom.xml @@ -20,9 +20,14 @@ org.springframework.boot spring-boot-starter-web - - org.springframework.boot - spring-boot-starter-actuator - + + + + + org.springframework.boot + spring-boot-maven-plugin + + + \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/ans-example/ans-provider-example/pom.xml b/spring-cloud-alibaba-examples/ans-example/ans-provider-example/pom.xml index 1d188a68..87b05909 100644 --- a/spring-cloud-alibaba-examples/ans-example/ans-provider-example/pom.xml +++ b/spring-cloud-alibaba-examples/ans-example/ans-provider-example/pom.xml @@ -25,4 +25,13 @@ spring-boot-starter-actuator + + + + + org.springframework.boot + spring-boot-maven-plugin + + + \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 7d6b3b26..d69279e4 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -30,6 +30,7 @@ acm-example/acm-local-example rocketmq-example spring-cloud-bus-rocketmq-example + schedulerX-example/schedulerX-simple-task-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java new file mode 100644 index 00000000..65d72662 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java @@ -0,0 +1,18 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; + +/** + * @author Jim + */ +public class MyTransactionCheckListener implements TransactionCheckListener { + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + System.out.println("TransactionCheckListener: " + new String(msg.getBody())); + return LocalTransactionState.COMMIT_MESSAGE; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java new file mode 100644 index 00000000..752d4e5f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java @@ -0,0 +1,20 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; + +/** + * @author Jim + */ +public class MyTransactionExecuter implements LocalTransactionExecuter { + @Override + public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + if ("1".equals(msg.getUserProperty("test"))) { + System.out.println(new String(msg.getBody()) + " rollback"); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + System.out.println(new String(msg.getBody()) + " commit"); + return LocalTransactionState.COMMIT_MESSAGE; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index f251902d..a486ebc0 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -30,4 +30,9 @@ public class ReceiveService { System.out.println("input1 receive again: " + receiveMsg); } + @StreamListener("input4") + public void receiveTransactionalMsg(String transactionMsg) { + System.out.println("input4 receive transaction msg: " + transactionMsg); + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java index 2d4deda6..f736a0e5 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java @@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ Source.class, MySink.class }) +@EnableBinding({ MySource.class, MySink.class }) public class RocketMQApplication { public interface MySink { @@ -29,6 +31,16 @@ public class RocketMQApplication { @Input("input3") SubscribableChannel input3(); + @Input("input4") + SubscribableChannel input4(); + } + + public interface MySource { + @Output("output1") + MessageChannel output1(); + + @Output("output2") + MessageChannel output2(); } public static void main(String[] args) { @@ -40,6 +52,11 @@ public class RocketMQApplication { return new CustomRunner(); } + @Bean + public CustomRunnerWithTransactional customRunnerWithTransactional() { + return new CustomRunnerWithTransactional(); + } + public static class CustomRunner implements CommandLineRunner { @Autowired private SenderService senderService; @@ -62,4 +79,21 @@ public class RocketMQApplication { } } + public static class CustomRunnerWithTransactional implements CommandLineRunner { + @Autowired + private SenderService senderService; + + @Override + public void run(String... args) throws Exception { + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg1", false); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg2", true); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg3", true); + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg4", false); + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index eaf0001f..e84ada23 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -5,7 +5,7 @@ import java.util.stream.Stream; import org.apache.rocketmq.common.message.MessageConst; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils; public class SenderService { @Autowired - private Source source; + private MySource source; public void send(String msg) throws Exception { - source.output().send(MessageBuilder.withPayload(msg).build()); + source.output1().send(MessageBuilder.withPayload(msg).build()); } public void sendWithTags(T msg, String tag) throws Exception { Message message = MessageBuilder.createMessage(msg, new MessageHeaders(Stream.of(tag).collect(Collectors .toMap(str -> MessageConst.PROPERTY_TAGS, String::toString)))); - source.output().send(message); + source.output1().send(message); } public void sendObject(T msg, String tag) throws Exception { @@ -37,7 +37,17 @@ public class SenderService { .setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); - source.output().send(message); + source.output1().send(message); + } + + public void sendTransactionalMsg(T msg, boolean error) throws Exception { + MessageBuilder builder = MessageBuilder.withPayload(msg) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); + if (error) { + builder.setHeader("test", "1"); + } + Message message = builder.build(); + source.output2().send(message); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index c92dd24a..3bfe511f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 -spring.cloud.stream.bindings.output.destination=test-topic -spring.cloud.stream.bindings.output.content-type=application/json +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter +spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain @@ -26,6 +32,11 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 +spring.cloud.stream.bindings.input4.destination=TransactionTopic +spring.cloud.stream.bindings.input4.content-type=text/plain +spring.cloud.stream.bindings.input4.group=transaction-group +spring.cloud.stream.bindings.input4.consumer.concurrency=210 + spring.application.name=rocketmq-example server.port=28081 diff --git a/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/pom.xml b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/pom.xml new file mode 100644 index 00000000..2115553c --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/pom.xml @@ -0,0 +1,37 @@ + + + + spring-cloud-alibaba-examples + org.springframework.cloud + 0.2.1.BUILD-SNAPSHOT + ../../pom.xml + + 4.0.0 + schedulerX-simple-task-example + + + + org.springframework.cloud + spring-cloud-starter-alicloud-schedulerX + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/readme-zh.md b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/readme-zh.md new file mode 100644 index 00000000..9e9b6f02 --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/readme-zh.md @@ -0,0 +1,47 @@ +# SchedulerX Simple Task Example + +## 项目说明 + +本项目展示了,在Spring Cloud体系中,如何快如接入SchedulerX,使用任务调度服务。 + +SchedulerX 是阿里中间件团队开发的一款分布式任务调度产品。它为您提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。同时提供分布式的任务执行模型,如网格任务。网格任务支持海量子任务均匀分配到所有 Worker(schedulerx-client)上执行。 + +## 示例 + +### 准备工作 + +1. 请先[注册阿里云账号](https://account.aliyun.com/register/register.htm?spm=5176.8142029.388261.26.e9396d3eEIv28g&oauth_callback=https%3A%2F%2Fwww.aliyun.com%2F) + +2. SchedulerX集成到了EDAS组件中心,因此需要[开通EDAS服务](https://common-buy.aliyun.com/?spm=5176.11451019.0.0.6f5965c0Uq5tue&commodityCode=edaspostpay#/buy) + +3. 到[EDAS组件中心](https://edas.console.aliyun.com/#/edasTools)开通SchedulerX组件,即分布式任务管理。 + +4. 进入[SchedulerX分组管理](https://edas.console.aliyun.com/#/schedulerXGroup?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建分组",创建一个分组。 + +5. 将"分组ID"的值填写到`application.properties`文件中`key`为`spring.cloud.alicloud.scx.group-id`对应的value值,即如下所示。 + + spring.cloud.alicloud.scx.group-id=111-1-1-1111 + +6. 进入[SchedulerX任务列表](https://edas.console.aliyun.com/#/edasSchedulerXJob?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建Job",创建一个Job,即如下所示。 + + Job分组:测试——111-1-1-1111 + Job处理接口:org.springframework.cloud.alibaba.cloud.examples.SimpleTask + 类型:简单Job单机版 + 定时表达式:默认选项——0 * * * * ? + Job描述:无 + 自定义参数:无 + +### 启动应用 + +直接运行main class,即`ScxApplication`。 + +### 查看效果 + +观察应用的控制台日志输出,可以看到每一分钟会打印一次如下日志。 + +``` + -----------Hello world--------------- +``` + +如果您对 Spring Cloud SchedulerX Starter 有任何建议或想法,欢迎提交 issue 中或者通过其他社区渠道向我们反馈。 + diff --git a/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java new file mode 100644 index 00000000..baccfd4f --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author xiaolongzuo + */ +@SpringBootApplication +public class ScxApplication { + + public static void main(String[] args) { + SpringApplication.run(ScxApplication.class, args); + } + +} diff --git a/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java new file mode 100644 index 00000000..e49c0a05 --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import com.alibaba.edas.schedulerx.ProcessResult; +import com.alibaba.edas.schedulerx.ScxSimpleJobContext; +import com.alibaba.edas.schedulerx.ScxSimpleJobProcessor; + +/** + * @author xiaolongzuo + */ +public class SimpleTask implements ScxSimpleJobProcessor { + + @Override + public ProcessResult process(ScxSimpleJobContext context) { + System.out.println("-----------Hello world---------------"); + ProcessResult processResult = new ProcessResult(true); + return processResult; + } + +} diff --git a/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/resources/application.properties new file mode 100644 index 00000000..a995fa3f --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerX-example/schedulerX-simple-task-example/src/main/resources/application.properties @@ -0,0 +1,3 @@ +server.port=18033 +spring.cloud.alicloud.scx.group-id=*** +spring.cloud.alicloud.edas.namespace=cn-test diff --git a/spring-cloud-alicloud-acm/pom.xml b/spring-cloud-alicloud-acm/pom.xml index e6381087..e4ebc470 100644 --- a/spring-cloud-alicloud-acm/pom.xml +++ b/spring-cloud-alicloud-acm/pom.xml @@ -44,6 +44,7 @@ org.springframework.boot spring-boot-starter-actuator + provided true diff --git a/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/AcmAutoConfiguration.java b/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/AcmAutoConfiguration.java index 36da2e85..9599decb 100644 --- a/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/AcmAutoConfiguration.java +++ b/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/AcmAutoConfiguration.java @@ -18,11 +18,9 @@ package org.springframework.cloud.alicloud.acm; import org.springframework.beans.BeansException; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.cloud.alicloud.acm.endpoint.AcmHealthIndicator; import org.springframework.cloud.alicloud.acm.refresh.AcmContextRefresher; import org.springframework.cloud.alicloud.acm.refresh.AcmRefreshHistory; import org.springframework.cloud.alicloud.context.acm.AcmIntegrationProperties; -import org.springframework.cloud.alicloud.context.acm.AcmProperties; import org.springframework.cloud.context.refresh.ContextRefresher; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -47,12 +45,6 @@ public class AcmAutoConfiguration implements ApplicationContextAware { return new AcmPropertySourceRepository(applicationContext); } - @Bean - public AcmHealthIndicator acmHealthIndicator(AcmProperties acmProperties, - AcmPropertySourceRepository acmPropertySourceRepository) { - return new AcmHealthIndicator(acmProperties, acmPropertySourceRepository); - } - @Bean public AcmRefreshHistory acmRefreshHistory() { return new AcmRefreshHistory(); diff --git a/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/endpoint/AcmEndpointAutoConfiguration.java b/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/endpoint/AcmEndpointAutoConfiguration.java index 2154c770..86c7fa18 100644 --- a/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/endpoint/AcmEndpointAutoConfiguration.java +++ b/spring-cloud-alicloud-acm/src/main/java/org/springframework/cloud/alicloud/acm/endpoint/AcmEndpointAutoConfiguration.java @@ -33,20 +33,26 @@ import org.springframework.context.annotation.Bean; @ConditionalOnClass(name = "org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration") public class AcmEndpointAutoConfiguration { - @Autowired - private AcmProperties acmProperties; + @Autowired + private AcmProperties acmProperties; - @Autowired - private AcmRefreshHistory acmRefreshHistory; + @Autowired + private AcmRefreshHistory acmRefreshHistory; - @Autowired - private AcmPropertySourceRepository acmPropertySourceRepository; + @Autowired + private AcmPropertySourceRepository acmPropertySourceRepository; - @ConditionalOnMissingBean - @ConditionalOnEnabledEndpoint - @Bean - public AcmEndpoint acmEndpoint() { - return new AcmEndpoint(acmProperties, acmRefreshHistory, - acmPropertySourceRepository); - } + @ConditionalOnMissingBean + @ConditionalOnEnabledEndpoint + @Bean + public AcmEndpoint acmEndpoint() { + return new AcmEndpoint(acmProperties, acmRefreshHistory, + acmPropertySourceRepository); + } + + @Bean + public AcmHealthIndicator acmHealthIndicator(AcmProperties acmProperties, + AcmPropertySourceRepository acmPropertySourceRepository) { + return new AcmHealthIndicator(acmProperties, acmPropertySourceRepository); + } } diff --git a/spring-cloud-alicloud-ans/pom.xml b/spring-cloud-alicloud-ans/pom.xml index 56fba7fb..6ed883c0 100644 --- a/spring-cloud-alicloud-ans/pom.xml +++ b/spring-cloud-alicloud-ans/pom.xml @@ -44,9 +44,20 @@ spring-cloud-commons + + org.slf4j + slf4j-api + + org.springframework.cloud spring-cloud-starter-netflix-ribbon + + + org.springframework.boot + spring-boot-starter + + diff --git a/spring-cloud-alicloud-context/pom.xml b/spring-cloud-alicloud-context/pom.xml index 5b3a0f1a..64681afe 100644 --- a/spring-cloud-alicloud-context/pom.xml +++ b/spring-cloud-alicloud-context/pom.xml @@ -38,6 +38,12 @@ provided + + com.alibaba.schedulerx + schedulerx-client + provided + + com.alibaba.ans ans-sdk diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/AliCloudContextAutoConfiguration.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/AliCloudContextAutoConfiguration.java index f0f2f572..e7c84a53 100644 --- a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/AliCloudContextAutoConfiguration.java +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/AliCloudContextAutoConfiguration.java @@ -16,14 +16,24 @@ package org.springframework.cloud.alicloud.context; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.commons.util.InetUtils; +import org.springframework.cloud.commons.util.InetUtilsProperties; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author xiaolongzuo */ @Configuration -@EnableConfigurationProperties(AliCloudProperties.class) +@EnableConfigurationProperties({ AliCloudProperties.class, InetUtilsProperties.class }) public class AliCloudContextAutoConfiguration { + @Bean + @ConditionalOnMissingBean + public InetUtils inetUtils(InetUtilsProperties inetUtilsProperties) { + return new InetUtils(inetUtilsProperties); + } + } diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/ans/AnsContextAutoConfiguration.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/ans/AnsContextAutoConfiguration.java index fc58b69e..b1a7d64d 100644 --- a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/ans/AnsContextAutoConfiguration.java +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/ans/AnsContextAutoConfiguration.java @@ -18,12 +18,8 @@ package org.springframework.cloud.alicloud.context.ans; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration; -import org.springframework.cloud.commons.util.InetUtils; -import org.springframework.cloud.commons.util.InetUtilsProperties; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** @@ -31,14 +27,8 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @ConditionalOnClass(name = "org.springframework.cloud.alicloud.ans.AnsAutoConfiguration") -@EnableConfigurationProperties({ AnsProperties.class, InetUtilsProperties.class }) +@EnableConfigurationProperties(AnsProperties.class) @ImportAutoConfiguration(EdasContextAutoConfiguration.class) public class AnsContextAutoConfiguration { - @Bean - @ConditionalOnMissingBean - public InetUtils inetUtils(InetUtilsProperties inetUtilsProperties) { - return new InetUtils(inetUtilsProperties); - } - } diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java index c795d2ac..60945754 100644 --- a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java @@ -1,12 +1,13 @@ package org.springframework.cloud.alicloud.context.nacos; -import com.alibaba.cloud.context.edas.EdasChangeOrderConfiguration; -import com.alibaba.cloud.context.edas.EdasChangeOrderConfigurationFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent; import org.springframework.context.ApplicationListener; +import com.alibaba.cloud.context.edas.EdasChangeOrderConfiguration; +import com.alibaba.cloud.context.edas.EdasChangeOrderConfigurationFactory; + public class NacosParameterInitListener implements ApplicationListener { private static final Logger log = LoggerFactory @@ -20,7 +21,7 @@ public class NacosParameterInitListener private void preparedNacosConfiguration() { EdasChangeOrderConfiguration edasChangeOrderConfiguration = EdasChangeOrderConfigurationFactory - .buildEdasChangeOrderConfiguration(); + .getEdasChangeOrderConfiguration(); log.info("Initialize Nacos Parameter from edas change order,is edas managed {}.", edasChangeOrderConfiguration.isEdasManaged()); if (!edasChangeOrderConfiguration.isEdasManaged()) { diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/oss/OssContextAutoConfiguration.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/oss/OssContextAutoConfiguration.java index e1993a37..0a3e1309 100644 --- a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/oss/OssContextAutoConfiguration.java +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/oss/OssContextAutoConfiguration.java @@ -51,9 +51,9 @@ public class OssContextAutoConfiguration { Assert.isTrue(!StringUtils.isEmpty(ossProperties.getEndpoint()), "Oss endpoint can't be empty."); Assert.isTrue(!StringUtils.isEmpty(aliCloudProperties.getAccessKey()), - "Access key can't be empty."); + "${spring.cloud.alicloud.access-key} can't be empty."); Assert.isTrue(!StringUtils.isEmpty(aliCloudProperties.getSecretKey()), - "Secret key can't be empty."); + "${spring.cloud.alicloud.secret-key} can't be empty."); return new OSSClientBuilder().build(ossProperties.getEndpoint(), aliCloudProperties.getAccessKey(), aliCloudProperties.getSecretKey(), ossProperties.getConfig()); diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java new file mode 100644 index 00000000..075538e5 --- /dev/null +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.scx; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.alicloud.context.AliCloudProperties; +import org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.context.annotation.Configuration; + +import com.alibaba.cloud.context.edas.AliCloudEdasSdk; +import com.alibaba.cloud.context.scx.AliCloudScxInitializer; +import com.alibaba.dts.common.exception.InitException; + +/** + * @author xiaolongzuo + */ +@Configuration +@ConditionalOnClass(name = "org.springframework.cloud.alicloud.scx.ScxAutoConfiguration") +@EnableConfigurationProperties(ScxProperties.class) +@ImportAutoConfiguration(EdasContextAutoConfiguration.class) +public class ScxContextAutoConfiguration { + + private static final Logger log = LoggerFactory + .getLogger(ScxContextAutoConfiguration.class); + + @Autowired + private AliCloudProperties aliCloudProperties; + + @Autowired + private EdasProperties edasProperties; + + @Autowired + private ScxProperties scxProperties; + + @Autowired + private AliCloudEdasSdk aliCloudEdasSdk; + + @PostConstruct + public void initAcmProperties() { + try { + AliCloudScxInitializer.initialize(aliCloudProperties, edasProperties, + scxProperties, aliCloudEdasSdk); + } + catch (InitException e) { + log.error("Init SchedulerX failed.", e); + throw new RuntimeException(e); + } + } +} diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java new file mode 100644 index 00000000..db6a00d6 --- /dev/null +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.scx; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import com.alibaba.cloud.context.scx.ScxConfiguration; + +/** + * @author xiaolongzuo + */ +@ConfigurationProperties("spring.cloud.alicloud.scx") +public class ScxProperties implements ScxConfiguration { + + private String groupId; + + private String domainName; + + @Override + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + @Override + public String getDomainName() { + return domainName; + } + + public void setDomainName(String domainName) { + this.domainName = domainName; + } + +} diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java new file mode 100644 index 00000000..41c3591b --- /dev/null +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.statistics; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.cloud.alicloud.context.acm.AcmContextBootstrapConfiguration; +import org.springframework.cloud.alicloud.context.acm.AcmProperties; +import org.springframework.cloud.alicloud.context.ans.AnsContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.ans.AnsProperties; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.oss.OssProperties; +import org.springframework.cloud.alicloud.context.scx.ScxContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.scx.ScxProperties; +import org.springframework.context.annotation.Configuration; + +import com.alibaba.cloud.context.AliCloudServerMode; +import com.alibaba.cloud.context.edas.AliCloudEdasSdk; +import com.alibaba.cloud.context.statistics.StatisticsTask; + +/** + * @author xiaolongzuo + */ +@Configuration +@AutoConfigureAfter({ ScxContextAutoConfiguration.class, + OssContextAutoConfiguration.class, AnsContextAutoConfiguration.class, + AcmContextBootstrapConfiguration.class }) +public class StatisticsTaskStarter implements InitializingBean { + + @Autowired(required = false) + private AliCloudEdasSdk aliCloudEdasSdk; + + @Autowired(required = false) + private EdasProperties edasProperties; + + @Autowired(required = false) + private ScxProperties scxProperties; + + @Autowired(required = false) + private OssProperties ossProperties; + + @Autowired(required = false) + private AnsProperties ansProperties; + + @Autowired(required = false) + private AcmProperties acmProperties; + + @Autowired(required = false) + private ScxContextAutoConfiguration scxContextAutoConfiguration; + + @Autowired(required = false) + private OssContextAutoConfiguration ossContextAutoConfiguration; + + @Autowired(required = false) + private AnsContextAutoConfiguration ansContextAutoConfiguration; + + @Autowired(required = false) + private AcmContextBootstrapConfiguration acmContextBootstrapConfiguration; + + @Override + public void afterPropertiesSet() { + StatisticsTask statisticsTask = new StatisticsTask(aliCloudEdasSdk, + edasProperties, getComponents()); + statisticsTask.start(); + } + + private List getComponents() { + List components = new ArrayList<>(); + if (scxContextAutoConfiguration != null && scxProperties != null) { + components.add("SC-SCX"); + } + if (ossContextAutoConfiguration != null && ossProperties != null) { + components.add("SC-OSS"); + } + boolean edasEnabled = edasProperties != null && edasProperties.isEnabled(); + boolean ansEnableEdas = edasEnabled || (ansProperties != null + && ansProperties.getServerMode() == AliCloudServerMode.EDAS); + if (ansContextAutoConfiguration != null && ansEnableEdas) { + components.add("SC-ANS"); + } + boolean acmEnableEdas = edasEnabled || (acmProperties != null + && acmProperties.getServerMode() == AliCloudServerMode.EDAS); + if (acmContextBootstrapConfiguration != null && acmEnableEdas) { + components.add("SC-ACM"); + } + return components; + } + +} diff --git a/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories b/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories index b5ed9c32..16457232 100644 --- a/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories @@ -4,7 +4,8 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alicloud.context.AliCloudContextAutoConfiguration,\ org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration,\ org.springframework.cloud.alicloud.context.ans.AnsContextAutoConfiguration,\ - org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration + org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration,\ + org.springframework.cloud.alicloud.context.scx.ScxContextAutoConfiguration,\ + org.springframework.cloud.alicloud.context.statistics.StatisticsTaskStarter org.springframework.context.ApplicationListener=\ - org.springframework.cloud.alicloud.context.ans.AnsContextApplicationListener,\ - org.springframework.cloud.alicloud.context.nacos.NacosParameterInitListener \ No newline at end of file + org.springframework.cloud.alicloud.context.ans.AnsContextApplicationListener \ No newline at end of file diff --git a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java index 636adf64..471ffbb9 100644 --- a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java +++ b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java @@ -33,7 +33,9 @@ import org.springframework.test.context.junit4.SpringRunner; "spring.application.name=myapp", "spring.cloud.alicloud.edas.application.name=myapp", "spring.cloud.alicloud.access-key=ak", "spring.cloud.alicloud.secret-key=sk", - "spring.cloud.alicloud.oss.endpoint=test" }, webEnvironment = RANDOM_PORT) + "spring.cloud.alicloud.oss.endpoint=test", + "spring.cloud.alicloud.scx.group-id=1-2-3-4", + "spring.cloud.alicloud.edas.namespace=cn-test" }, webEnvironment = RANDOM_PORT) @DirtiesContext public class AliCloudSpringApplicationTests { diff --git a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxAutoConfigurationTests.java b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxAutoConfigurationTests.java new file mode 100644 index 00000000..199701ec --- /dev/null +++ b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxAutoConfigurationTests.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.scx; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.cloud.alicloud.context.oss.OssProperties; + +import com.aliyun.oss.OSS; + +/** + * {@link OSS} {@link OssProperties} Test + * + * @author Jim + */ +public class ScxAutoConfigurationTests { + + private ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(ScxContextAutoConfiguration.class)) + .withPropertyValues("spring.cloud.alicloud.scx.group-id=1-2-3-4") + .withPropertyValues("spring.cloud.alicloud.edas.namespace=cn-test"); + + @Test + public void testSxcProperties() { + this.contextRunner.run(context -> { + assertThat(context.getBeansOfType(ScxProperties.class).size() == 1).isTrue(); + EdasProperties edasProperties = context.getBean(EdasProperties.class); + ScxProperties scxProperties = context.getBean(ScxProperties.class); + assertThat(scxProperties.getGroupId()).isEqualTo("1-2-3-4"); + assertThat(edasProperties.getNamespace()).isEqualTo("cn-test"); + assertThat(scxProperties.getDomainName()).isNull(); + }); + } + +} diff --git a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java new file mode 100644 index 00000000..a1128b92 --- /dev/null +++ b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.scx; + +/** + * @author xiaolongzuo + */ +public class ScxAutoConfiguration { +} diff --git a/spring-cloud-alicloud-schedulerX/pom.xml b/spring-cloud-alicloud-schedulerX/pom.xml new file mode 100644 index 00000000..2bf62ddf --- /dev/null +++ b/spring-cloud-alicloud-schedulerX/pom.xml @@ -0,0 +1,33 @@ + + + + spring-cloud-alibaba + org.springframework.cloud + 0.2.1.BUILD-SNAPSHOT + + 4.0.0 + + spring-cloud-alicloud-schedulerX + + + + org.springframework.cloud + spring-cloud-alicloud-context + + + com.alibaba.schedulerx + schedulerx-client + + + com.aliyun + aliyun-java-sdk-core + + + com.aliyun + aliyun-java-sdk-edas + + + + \ No newline at end of file diff --git a/spring-cloud-alicloud-schedulerX/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java b/spring-cloud-alicloud-schedulerX/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java new file mode 100644 index 00000000..77669f50 --- /dev/null +++ b/spring-cloud-alicloud-schedulerX/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.scx; + +import org.springframework.context.annotation.Configuration; + +/** + * placeholder configuration + * + * @author xiaolongzuo + */ +@Configuration +public class ScxAutoConfiguration { + +} diff --git a/spring-cloud-alicloud-schedulerX/src/main/resources/META-INF/spring.factories b/spring-cloud-alicloud-schedulerX/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..c0fc2249 --- /dev/null +++ b/spring-cloud-alicloud-schedulerX/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.springframework.cloud.alicloud.scx.ScxAutoConfiguration \ No newline at end of file diff --git a/spring-cloud-starter-alicloud/pom.xml b/spring-cloud-starter-alicloud/pom.xml index eb2ae1aa..ec627604 100644 --- a/spring-cloud-starter-alicloud/pom.xml +++ b/spring-cloud-starter-alicloud/pom.xml @@ -14,5 +14,6 @@ spring-cloud-starter-alicloud-oss spring-cloud-starter-alicloud-acm spring-cloud-starter-alicloud-ans + spring-cloud-starter-alicloud-schedulerX \ No newline at end of file diff --git a/spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerX/pom.xml b/spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerX/pom.xml new file mode 100644 index 00000000..a7ada650 --- /dev/null +++ b/spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerX/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + + org.springframework.cloud + spring-cloud-starter-alicloud + 0.2.1.BUILD-SNAPSHOT + + spring-cloud-starter-alicloud-schedulerX + Spring Cloud Starter Alibaba Cloud OSS + + + + org.springframework.cloud + spring-cloud-alicloud-schedulerX + + + + diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 65128be4..66517074 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -32,6 +32,8 @@ public interface RocketMQBinderConstants { String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; + String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG"; + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; /** diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index d5109cda..ded2be10 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,9 @@ package org.springframework.cloud.stream.binder.rocketmq; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; @@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.util.ClassUtils; /** * @author Timur Valiev @@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - return new RocketMQMessageHandler(destination.getName(), - producerProperties.getExtension(), + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( + destination.getName(), producerProperties, rocketBinderConfigurationProperties, instrumentationManager); + if (producerProperties.getExtension().getTransactional()) { + // transaction message check LocalTransactionExecuter + messageHandler.setLocalTransactionExecuter( + getClassConfiguration(destination.getName(), + producerProperties.getExtension().getExecuter(), + LocalTransactionExecuter.class)); + // transaction message check TransactionCheckListener + messageHandler.setTransactionCheckListener( + getClassConfiguration(destination.getName(), + producerProperties.getExtension() + .getTransactionCheckListener(), + TransactionCheckListener.class)); + } + + return messageHandler; } else { throw new RuntimeException("Binding for channel " + destination.getName() @@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { return extendedBindingProperties.getExtendedProducerProperties(channelName); } + + private T getClassConfiguration(String destName, String className, + Class interfaceClass) { + if (StringUtils.isEmpty(className)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, should set " + + interfaceClass.getSimpleName() + " configuration" + + interfaceClass.getSimpleName() + " should be set, like " + + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour" + + interfaceClass.getSimpleName() + "'"); + } + else if (StringUtils.isNotEmpty(className)) { + Class fieldClass; + // check class exists + try { + fieldClass = ClassUtils.forName(className, + RocketMQMessageChannelBinder.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " class is not found"); + } + // check interface incompatible + if (!interfaceClass.isAssignableFrom(fieldClass)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " is incompatible with " + interfaceClass.getSimpleName() + + " interface"); + } + try { + return (T) fieldClass.newInstance(); + } + catch (Exception e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " instance error", e); + } + } + return null; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java index 4f0ca012..7707c07e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java @@ -20,6 +20,7 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG; import java.util.HashMap; import java.util.Map; @@ -103,6 +104,15 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { return this; } + public Object getTransactionalArg() { + return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG); + } + + public Object withTransactionalArg(Object arg) { + setHeader(ROCKET_TRANSACTIONAL_ARG, arg); + return this; + } + public SendResult getSendResult() { return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index ece595e6..ba8054ad 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -23,10 +23,14 @@ import java.util.Optional; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -49,16 +53,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private InstrumentationManager instrumentationManager; - private final RocketMQProducerProperties producerProperties; + private LocalTransactionExecuter localTransactionExecuter; + + private TransactionCheckListener transactionCheckListener; + + private final ExtendedProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - protected volatile boolean running = false; + private volatile boolean running = false; public RocketMQMessageHandler(String destination, - RocketMQProducerProperties producerProperties, + ExtendedProducerProperties producerProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, InstrumentationManager instrumentationManager) { this.destination = destination; @@ -69,7 +77,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li @Override public void start() { - producer = new DefaultMQProducer(destination); + if (producerProperties.getExtension().getTransactional()) { + producer = new TransactionMQProducer(destination); + if (transactionCheckListener != null) { + ((TransactionMQProducer) producer) + .setTransactionCheckListener(transactionCheckListener); + } + } + else { + producer = new DefaultMQProducer(destination); + } Optional.ofNullable(instrumentationManager).ifPresent(manager -> { producerInstrumentation = manager.getProducerInstrumentation(destination); @@ -78,8 +95,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - if (producerProperties.getMaxMessageSize() > 0) { - producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + if (producerProperties.getExtension().getMaxMessageSize() > 0) { + producer.setMaxMessageSize( + producerProperties.getExtension().getMaxMessageSize()); } try { @@ -138,7 +156,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li toSend.putUserProperty(entry.getKey(), entry.getValue()); } - SendResult sendRes = producer.send(toSend); + SendResult sendRes; + if (producerProperties.getExtension().getTransactional()) { + sendRes = producer.sendMessageInTransaction(toSend, + localTransactionExecuter, headerAccessor.getTransactionalArg()); + } + else { + sendRes = producer.send(toSend); + } if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { throw new MQClientException("message hasn't been sent", null); @@ -164,4 +189,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } + public void setLocalTransactionExecuter( + LocalTransactionExecuter localTransactionExecuter) { + this.localTransactionExecuter = localTransactionExecuter; + } + + public void setTransactionCheckListener( + TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 6afefd3b..1a05ad50 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -17,6 +17,8 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; /** * @author Timur Valiev @@ -31,6 +33,18 @@ public class RocketMQProducerProperties { */ private Integer maxMessageSize = 0; + private Boolean transactional = false; + + /** + * full class name of {@link LocalTransactionExecuter} + */ + private String executer; + + /** + * full class name of {@link TransactionCheckListener} + */ + private String transactionCheckListener; + public Boolean getEnabled() { return enabled; } @@ -47,4 +61,27 @@ public class RocketMQProducerProperties { this.maxMessageSize = maxMessageSize; } + public Boolean getTransactional() { + return transactional; + } + + public void setTransactional(Boolean transactional) { + this.transactional = transactional; + } + + public String getExecuter() { + return executer; + } + + public void setExecuter(String executer) { + this.executer = executer; + } + + public String getTransactionCheckListener() { + return transactionCheckListener; + } + + public void setTransactionCheckListener(String transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } }