From 8f1b45b2e64e8452b70c92441b8d17e1acbe2e57 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 16:04:56 +0800 Subject: [PATCH 1/5] sentinel add docs and bug fix for 1.x branch --- .../src/main/asciidoc-zh/sentinel.adoc | 141 +++++++++++++----- .../custom/SentinelAutoConfiguration.java | 17 +-- .../alibaba/sentinel/feign/SentinelFeign.java | 11 +- 3 files changed, 113 insertions(+), 56 deletions(-) diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc index e4c9817f..c1e699b1 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/sentinel.adoc @@ -1,20 +1,20 @@ == Spring Cloud Alibaba Sentinel -### Sentinel介绍 +### Sentinel 介绍 -随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。 +随着微服务的流行,服务和服务之间的稳定性变得越来越重要。 https://github.com/alibaba/Sentinel[Sentinel] 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。 -Sentinel 具有以下特征: +https://github.com/alibaba/Sentinel[Sentinel] 具有以下特征: -* *丰富的应用场景*:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、实时熔断下游不可用应用等。 -* *完备的实时监控*:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。 -* *广泛的开源生态*:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。 -* *完善的 SPI 扩展点*:Sentinel 提供简单易用、完善的 SPI 扩展点。您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理、适配数据源等。 +* *丰富的应用场景*: Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、实时熔断下游不可用应用等。 +* *完备的实时监控*: Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。 +* *广泛的开源生态*: Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。 +* *完善的 SPI 扩展点*: Sentinel 提供简单易用、完善的 SPI 扩展点。您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理、适配数据源等。 -### 如何使用Sentinel +### 如何使用 Sentinel -如果要在您的项目中引入Sentinel,使用group ID为 `org.springframework.cloud` 和artifact ID为 `spring-cloud-starter-alibaba-sentinel` 的starter。 +如果要在您的项目中引入 Sentinel,使用 group ID 为 `org.springframework.cloud` 和 artifact ID 为 `spring-cloud-starter-alibaba-sentinel` 的 starter。 ```xml @@ -23,7 +23,7 @@ Sentinel 具有以下特征: ``` -下面这个例子就是一个最简单的使用Sentinel的例子: +下面这个例子就是一个最简单的使用 Sentinel 的例子: ```java @SpringBootApplication @@ -47,9 +47,9 @@ public class TestController { } ``` -@SentinelResource注解用来标识资源是否被限流、降级。上述例子上该注解的属性'hello'表示资源名。 +@SentinelResource 注解用来标识资源是否被限流、降级。上述例子上该注解的属性 'hello' 表示资源名。 -@SentinelResource还提供了其它额外的属性如 `blockHandler`,`blockHandlerClass`,`fallback` 用于表示限流或降级的操作,更多内容可以参考 https://github.com/alibaba/Sentinel/wiki/%E6%B3%A8%E8%A7%A3%E6%94%AF%E6%8C%81[Sentinel注解支持]。 +@SentinelResource 还提供了其它额外的属性如 `blockHandler`,`blockHandlerClass`,`fallback` 用于表示限流或降级的操作,更多内容可以参考 https://github.com/alibaba/Sentinel/wiki/%E6%B3%A8%E8%A7%A3%E6%94%AF%E6%8C%81[Sentinel注解支持]。 ##### Sentinel 控制台 @@ -74,7 +74,7 @@ image::https://github.com/alibaba/Sentinel/wiki/image/dashboard.png[] ###### 启动控制台 -Sentinel 控制台是一个标准的SpringBoot应用,以SpringBoot的方式运行jar包即可。 +Sentinel 控制台是一个标准的 SpringBoot 应用,以 SpringBoot 的方式运行 jar 包即可。 ```shell java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar @@ -94,27 +94,73 @@ spring: dashboard: localhost:8080 ---- -这里的 `spring.cloud.sentinel.transport.port` 端口配置会在应用对应的机器上启动一个Http Server,该Serve会与Sentinel控制台做交互。比如Sentinel控制台添加了1个限流规则,会把规则数据push给这个Http Server接受,Http Server再将规则注册到Sentinel中。 +这里的 `spring.cloud.sentinel.transport.port` 端口配置会在应用对应的机器上启动一个 Http Server,该 Server 会与 Sentinel 控制台做交互。比如 Sentinel 控制台添加了1个限流规则,会把规则数据 push 给这个 Http Server 接收,Http Server 再将规则注册到 Sentinel 中。 -更多Sentinel控制台的使用及问题参考: https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0[Sentinel控制台] +更多 Sentinel 控制台的使用及问题参考: https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0[Sentinel控制台] -### Feign支持 +### Feign 支持 -DOING +Sentinel 适配了 https://github.com/OpenFeign/feign[Feign] 组件。如果想使用,除了引入 `sentinel-starter` 的依赖外还需要 3 个步骤: -### RestTemplate支持 +* 配置文件打开 sentinel 对 feign 的支持:`feign.sentinel.enabled=true` +* 加入 `feign starter` 依赖触发 `sentinel starter` 的配置类生效: +```xml + + org.springframework.cloud + spring-cloud-starter-openfeign + +``` +* `feign` 内部的 `loadbalance` 功能依赖 Netflix 的 `ribbon` 模块(如果不使用 `loadbalance` 功能可不引入): +```xml + + org.springframework.cloud + spring-cloud-starter-netflix-ribbon + +``` -Spring Cloud Alibaba Sentinel支持对 `RestTemplate` 的服务调用使用Sentinel进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。 +这是一个 `FeignClient` 对应的接口: + +```java +@FeignClient(name = "service-provider", fallback = EchoServiceFallback.class, configuration = FeignConfiguration.class) +public interface EchoService { + @RequestMapping(value = "/echo/{str}", method = RequestMethod.GET) + String echo(@PathVariable("str") String str); +} + +class FeignConfiguration { + @Bean + public EchoServiceFallback echoServiceFallback() { + return new EchoServiceFallback(); + } +} + +class EchoServiceFallback implements EchoService { + @Override + public String echo(@PathVariable("str") String str) { + return "echo fallback"; + } +} +``` + +NOTE: Feign 对应的接口中的资源名策略定义:httpmethod:protocol://requesturl + +`EchoService` 接口中方法 `echo` 对应的资源名为 `GET:http://service-provider/echo/{str}`。 + +请注意:`@FeignClient` 注解中的所有属性,Sentinel 都做了兼容。 + +### RestTemplate 支持 + +Spring Cloud Alibaba Sentinel 支持对 `RestTemplate` 的服务调用使用 Sentinel 进行保护,在构造 `RestTemplate` bean的时候需要加上 `@SentinelRestTemplate` 注解。 ```java @Bean -@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class) +@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class) public RestTemplate restTemplate() { return new RestTemplate(); } ``` -`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。 +`@SentinelRestTemplate` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。 限流的资源规则提供两种粒度: @@ -122,12 +168,13 @@ public RestTemplate restTemplate() { * `schema://host:port`:协议、主机和端口 -NOTE: 以 `https://www.taobao.com/test` 这个url为例。对应的资源名有两种粒度,分别是 `https://www.taobao.com:80` 以及 `https://www.taobao.com:80/test` - +NOTE: 以 `https://www.taobao.com/test` 这个 url 为例。对应的资源名有两种粒度,分别是 `https://www.taobao.com:80` 以及 `https://www.taobao.com:80/test` ### 动态数据源支持 -*在版本 0.2.0.RELEASE 或 0.1.0.RELEASE 之前*,要在Spring Cloud Alibaba Sentinel下使用动态数据源,需要3个步骤: +#### 在版本 0.2.0.RELEASE 或 0.1.0.RELEASE 之前 + +需要3个步骤才可完成数据源的配置: * 配置文件中定义数据源信息。比如使用文件: @@ -141,7 +188,7 @@ spring.cloud.sentinel.datasource.converter=flowConverter spring.cloud.sentinel.datasource.file=/Users/you/yourrule.json ---- -* 创建一个Converter类实现 `com.alibaba.csp.sentinel.datasource.Converter` 接口,并且需要在 `ApplicationContext` 中有该类的一个Bean +* 创建一个 Converter 类实现 `com.alibaba.csp.sentinel.datasource.Converter` 接口,并且需要在 `ApplicationContext` 中有该类的一个 Bean ```java @Component("flowConverter") @@ -154,7 +201,7 @@ public class JsonFlowRuleListParser implements Converter> } ``` -这个Converter的bean name需要跟 `application.properties` 配置文件中的converter配置一致 +这个 Converter 的 bean name 需要跟 `application.properties` 配置文件中的 converter 配置一致 * 在任意一个 Spring Bean 中定义一个被 `@SentinelDataSource` 注解修饰的 `ReadableDataSource` 属性 @@ -163,7 +210,7 @@ public class JsonFlowRuleListParser implements Converter> private ReadableDataSource dataSource; ``` -`@SentinelDataSource` 注解的value属性表示数据源在 `application.properties` 配置文件中前缀。 该在例子中,前缀为 `spring.cloud.sentinel.datasource` 。 +`@SentinelDataSource` 注解的 value 属性表示数据源在 `application.properties` 配置文件中前缀。 该在例子中,前缀为 `spring.cloud.sentinel.datasource` 。 如果 `ApplicationContext` 中存在超过1个 `ReadableDataSource` bean,那么不会加载这些 `ReadableDataSource` 中的任意一个。 只有在 `ApplicationContext` 存在一个 `ReadableDataSource` 的情况下才会生效。 @@ -173,7 +220,9 @@ private ReadableDataSource dataSource; [Sentinel Starter] load 3 flow rules ``` -*在版本 0.2.0.RELEASE 或 0.1.0.RELEASE 之后*,要在Spring Cloud Alibaba Sentinel下使用动态数据源,只需要1个步骤: +#### 在版本 0.2.0.RELEASE 或 0.1.0.RELEASE 之后 + +只需要1个步骤就可完成数据源的配置: * 直接在 `application.properties` 配置文件中配置数据源信息即可 @@ -196,23 +245,23 @@ spring.cloud.sentinel.datasource.ds4.apollo.default-flow-rule-value = test ``` -这样配置方式参考了Spring Cloud Stream Binder的配置,内部使用了 `TreeMap` 进行存储,comparator为 `String.CASE_INSENSITIVE_ORDER` 。 +这样配置方式参考了 Spring Cloud Stream Binder 的配置,内部使用了 `TreeMap` 进行存储,comparator 为 `String.CASE_INSENSITIVE_ORDER` 。 NOTE: d1, ds2, ds3, ds4 是 `ReadableDataSource` 的名字,可随意编写。后面的 `file` ,`zk` ,`nacos` , `apollo` 就是对应具体的数据源。 它们后面的配置就是这些数据源各自的配置。 每种数据源都有两个共同的配置项: `data-type` 和 `converter-class` 。 -`data-type` 配置项表示 `Converter`,Spring Cloud Alibaba Sentinel默认提供两种内置的值,分别是 `json` 和 `xml` (不填默认是json)。 如果不想使用内置的 `json` 或 `xml` 这两种 `Converter`,可以填写 `custom` 表示自定义 `Converter`,然后再配置 `converter-class` 配置项,该配置项需要写类的全路径名。 +`data-type` 配置项表示 `Converter`,Spring Cloud Alibaba Sentinel 默认提供两种内置的值,分别是 `json` 和 `xml` (不填默认是json)。 如果不想使用内置的 `json` 或 `xml` 这两种 `Converter`,可以填写 `custom` 表示自定义 `Converter`,然后再配置 `converter-class` 配置项,该配置项需要写类的全路径名。 -这两种内置的 `Converter` 只支持解析 json数组 或 xml数组。内部解析的时候会自动判断每个json对象或xml对象属于哪4种Sentinel规则(`FlowRule`,`DegradeRule`,`SystemRule`,`AuthorityRule`)。 +这两种内置的 `Converter` 只支持解析 json 数组 或 xml 数组。内部解析的时候会自动判断每个 json 对象或xml对象属于哪4种 Sentinel 规则(`FlowRule`,`DegradeRule`,`SystemRule`,`AuthorityRule`)。 比如10个规则数组里解析出5个限流规则和5个降级规则。 这种情况下该数据源不会注册,日志里页会进行警告。 如果10个规则里有9个限流规则,1个解析报错了。这种情况下日志会警告有个规则格式错误,另外9个限流规则会注册上去。 -这里json或xml解析用的是 `jackson`。`ObjectMapper` 或 `XmlMapper` 使用默认的配置,遇到不认识的字段会解析报错。 因为不这样做的话限流 json 也会解析成 系统规则(系统规则所有的配置都有默认值),所以需要这样严格解析。 +这里 json 或 xml 解析用的是 `jackson`。`ObjectMapper` 或 `XmlMapper` 使用默认的配置,遇到不认识的字段会解析报错。 因为不这样做的话限流 json 也会解析成 系统规则(系统规则所有的配置都有默认值),所以需要这样严格解析。 -当然还有一种情况是json对象或xml对象可能会匹配上所有4种规则(比如json对象里只配了 `resource` 字段,那么会匹配上4种规则),这种情况下日志里会警告,并且过滤掉这个对象。 +当然还有一种情况是 json 对象或 xml 对象可能会匹配上所有4种规则(比如json对象里只配了 `resource` 字段,那么会匹配上4种规则),这种情况下日志里会警告,并且过滤掉这个对象。 用户使用这种配置的时候只需要填写正确的json或xml就行,有任何不合理的信息都会在日志里打印出来。 @@ -223,19 +272,29 @@ NOTE: d1, ds2, ds3, ds4 是 `ReadableDataSource` 的名字,可随意编写。 [Sentinel Starter] DataSource ds2-sentinel-nacos-datasource load 2 FlowRule ``` -NOTE: 默认情况下,xml格式是不支持的。需要添加 `jackson-dataformat-xml` 依赖后才会自动生效。 +NOTE: 默认情况下,xml 格式是不支持的。需要添加 `jackson-dataformat-xml` 依赖后才会自动生效。 -关于Sentinel动态数据源的实现原理,参考: https://github.com/alibaba/Sentinel/wiki/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%99%E6%89%A9%E5%B1%95[动态规则扩展] +关于 Sentinel 动态数据源的实现原理,参考: https://github.com/alibaba/Sentinel/wiki/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%99%E6%89%A9%E5%B1%95[动态规则扩展] -### Endpoint支持 +### Endpoint 支持 -在使用Endpoint特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。 +在使用 Endpoint 特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。 -* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的endpoint路径为 `/sentinel` -* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的endpoint路径为 `/actuator/sentinel` +* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的 endpoint 路径为 `/sentinel` +* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的 endpoint 路径为 `/actuator/sentinel` ### More +下表显示当应用的 `ApplicationContext` 中存在对应的Bean的类型时,会进行的一些操作: + +:frame: topbot +[width="60%",options="header"] +|==== +^|存在Bean的类型 ^|操作 ^|作用 +|`UrlCleaner`|`WebCallbackManager.setUrlCleaner(urlCleaner)`|资源清理(资源(比如将满足 /foo/:id 的 URL 都归到 /foo/* 资源下)) +|`UrlBlockHandler`|`WebCallbackManager.setUrlBlockHandler(urlBlockHandler)`|自定义限流处理逻辑 +|==== + 下表显示 Spring Cloud Alibaba Sentinel 的所有配置信息: :frame: topbot @@ -244,14 +303,16 @@ NOTE: 默认情况下,xml格式是不支持的。需要添加 `jackson-datafor ^|配置项 ^|含义 ^|默认值 |`spring.cloud.sentinel.enabled`|Sentinel自动化配置是否生效|true |`spring.cloud.sentinel.eager`|取消Sentinel控制台懒加载|false -|`spring.cloud.sentinel.charset`|metric文件字符集|UTF-8 |`spring.cloud.sentinel.transport.port`|应用与Sentinel控制台交互的端口,应用本地会起一个该端口占用的HttpServer|8721 |`spring.cloud.sentinel.transport.dashboard`|Sentinel 控制台地址| |`spring.cloud.sentinel.transport.heartbeatIntervalMs`|应用与Sentinel控制台的心跳间隔时间| |`spring.cloud.sentinel.filter.order`|Servlet Filter的加载顺序。Starter内部会构造这个filter|Integer.MIN_VALUE |`spring.cloud.sentinel.filter.spring.url-patterns`|数据类型是数组。表示Servlet Filter的url pattern集合|/* +|`spring.cloud.sentinel.metric.charset`|metric文件字符集|UTF-8 |`spring.cloud.sentinel.metric.fileSingleSize`|Sentinel metric 单个文件的大小| |`spring.cloud.sentinel.metric.fileTotalCount`|Sentinel metric 总文件数量| +|`spring.cloud.sentinel.log.dir`|Sentinel 日志文件所在的目录| +|`spring.cloud.sentinel.log.switch-pid`|Sentinel 日志文件名是否需要带上pid|false |`spring.cloud.sentinel.servlet.blockPage`| 自定义的跳转 URL,当请求被限流时会自动跳转至设定好的 URL | |`spring.cloud.sentinel.flow.coldFactor`| https://github.com/alibaba/Sentinel/wiki/%E9%99%90%E6%B5%81---%E5%86%B7%E5%90%AF%E5%8A%A8[冷启动因子] |3 |==== diff --git a/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelAutoConfiguration.java b/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelAutoConfiguration.java index 924f415c..317d27c8 100644 --- a/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelAutoConfiguration.java +++ b/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelAutoConfiguration.java @@ -19,7 +19,6 @@ package org.springframework.cloud.alibaba.sentinel.custom; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -156,26 +155,22 @@ public class SentinelAutoConfiguration { } @Bean("sentinel-json-converter") - public JsonConverter jsonConverter( - @Qualifier("sentinel-object-mapper") ObjectMapper objectMapper) { - return new JsonConverter(objectMapper); + public JsonConverter jsonConverter() { + return new JsonConverter(objectMapper()); } - @Bean("sentinel-object-mapper") - public ObjectMapper objectMapper() { + private ObjectMapper objectMapper() { return new ObjectMapper(); } @ConditionalOnClass(XmlMapper.class) protected static class SentinelXmlConfiguration { @Bean("sentinel-xml-converter") - public XmlConverter xmlConverter( - @Qualifier("sentinel-xml-mapper") XmlMapper xmlMapper) { - return new XmlConverter(xmlMapper); + public XmlConverter xmlConverter() { + return new XmlConverter(xmlMapper()); } - @Bean("sentinel-xml-mapper") - public XmlMapper xmlMapper() { + private XmlMapper xmlMapper() { return new XmlMapper(); } } diff --git a/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/feign/SentinelFeign.java b/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/feign/SentinelFeign.java index 44a7370a..653c2183 100644 --- a/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/feign/SentinelFeign.java +++ b/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/feign/SentinelFeign.java @@ -89,13 +89,14 @@ public class SentinelFeign { // check fallback and fallbackFactory properties if (void.class != fallback) { fallbackInstance = getFromContext(name, "fallback", fallback, - target); + target.type()); return new SentinelInvocationHandler(target, dispatch, new FallbackFactory.Default(fallbackInstance)); } if (void.class != fallbackFactory) { fallbackFactoryInstance = (FallbackFactory) getFromContext(name, - "fallbackFactory", fallbackFactory, target); + "fallbackFactory", fallbackFactory, + FallbackFactory.class); return new SentinelInvocationHandler(target, dispatch, fallbackFactoryInstance); } @@ -103,7 +104,7 @@ public class SentinelFeign { } private Object getFromContext(String name, String type, - Class fallbackType, Target target) { + Class fallbackType, Class targetType) { Object fallbackInstance = feignContext.getInstance(name, fallbackType); if (fallbackInstance == null) { @@ -112,10 +113,10 @@ public class SentinelFeign { type, fallbackType, name)); } - if (!target.type().isAssignableFrom(fallbackType)) { + if (!targetType.isAssignableFrom(fallbackType)) { throw new IllegalStateException(String.format( "Incompatible %s instance. Fallback/fallbackFactory of type %s is not assignable to %s for feign client %s", - type, fallbackType, target.type(), name)); + type, fallbackType, targetType, name)); } return fallbackInstance; } From 8f24bd979b97c2abf8fd3458eea164df24d7931f Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 16:09:14 +0800 Subject: [PATCH 2/5] rocketmq binder support transactional message and update example, docs for 1.x branch --- .../src/main/asciidoc-zh/rocketmq.adoc | 251 +++++++++++++++++- .../rocketmq/RocketMQBinderConstants.java | 38 +-- .../RocketMQMessageChannelBinder.java | 65 ++++- .../RocketMQMessageHeaderAccessor.java | 154 ++++++----- .../integration/RocketMQMessageHandler.java | 49 +++- .../RocketMQProducerProperties.java | 72 +++-- 6 files changed, 514 insertions(+), 115 deletions(-) diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc index 238f0b4b..3724fd41 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc @@ -1 +1,250 @@ -== Spring Cloud Alibaba Rocket Binder +== Spring Cloud Alibaba RocketMQ Binder + +### RocketMQ 介绍 + +https://rocketmq.apache.org[RocketMQ] 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 + +具有以下特点: + +* 能够保证严格的消息顺序 + +* 提供丰富的消息拉取模式 + +* 高效的订阅者水平扩展能力 + +* 实时的消息订阅机制 + +* 亿级消息堆积能力 + +### RocketMQ 基本使用 + +* 下载 RocketMQ + +下载 https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[RocketMQ最新的二进制文件],并解压 + +解压后的目录结构如下: + +``` +apache-rocketmq +├── LICENSE +├── NOTICE +├── README.md +├── benchmark +├── bin +├── conf +└── lib +``` + +* 启动 NameServer + +```bash +nohup sh bin/mqnamesrv & +tail -f ~/logs/rocketmqlogs/namesrv.log +``` + +* 启动 Broker + +```bash +nohup sh bin/mqbroker -n localhost:9876 & +tail -f ~/logs/rocketmqlogs/broker.log +``` + +* 发送、接收消息 + +发送消息: + +```bash +sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer +``` + +发送成功后显示:`SendResult [sendStatus=SEND_OK, msgId= ...` + +接收消息: + +```bash +sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer +``` + +接收成功后显示:`ConsumeMessageThread_%d Receive New Messages: [MessageExt...` + +* 关闭 Server + +```bash +sh bin/mqshutdown broker +sh bin/mqshutdown namesrv +``` + +### Spring Cloud Stream 介绍 + +Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 `Spring Integration` 与 Broker 进行连接。 + +Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。 + +Spring Cloud Stream 内部有两个概念:Binder 和 Binding。 + +* Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。 + +比如 `Kafka` 的实现 `KafkaMessageChannelBinder`,`RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder`。 + +* Binding: 包括 Input Binding 和 Output Binding。 + +Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。 + +.Spring Cloud Stream +image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[] + +使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码: + +```java +MessageChannel messageChannel = new DirectChannel(); + +// 消息订阅 +((SubscribableChannel) messageChannel).subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + System.out.println("receive msg: " + message.getPayload()); + } +}); + +// 消息发送 +messageChannel.send(MessageBuilder.withPayload("simple msg").build()); +``` + +这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。 + +**Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。** + +### Spring Cloud Alibaba RocketMQ Binder 实现原理 + +.RocketMQ Binder处理流程 +image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[] + + +RocketMQ Binder 的核心主要就是这3个类:`RocketMQMessageChannelBinder`,`RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 + +`RocketMQMessageChannelBinder` 是个标准的 Binder 实现,其内部构建 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 + +`RocketMQMessageHandler` 用于 RocketMQ `Producer` 的启动以及消息的发送,其内部会根据 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类,去创建 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`。 + +在构造 `org.apache.rocketmq.common.message.Message` 的过程中会根据 `org.springframework.messaging.Message` 的 Header 构造成 `RocketMQMessageHeaderAccessor`。然后再根据 `RocketMQMessageHeaderAccessor` 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message` 中。 + +`RocketMQInboundChannelAdapter` 用于 RocketMQ `Consumer` 的启动以及消息的接收。其内部还支持 https://github.com/spring-projects/spring-retry[spring-retry] 的使用。 + +在消费消息的时候可以从 Header 中获取 `Acknowledgement` 并进行一些设置。 + +比如使用 `MessageListenerConcurrently` 进行异步消费的时候,可以设置延迟消费: + +```java +@StreamListener("input") +public void receive(Message message) { + RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); + Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); + acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); + acknowledgement.setConsumeConcurrentlyDelayLevel(1); +} +``` + +比如使用 `MessageListenerOrderly` 进行顺序消费的时候,可以设置延迟消费: + +```java +@StreamListener("input") +public void receive(Message message) { + RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); + Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); + acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT); + acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000); +} +``` + +Provider端支持的配置: + +:frame: topbot +[width="60%",options="header"] +|==== +^|配置项 ^|含义 ^| 默认值 +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4) +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`| +|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`| +|==== + +Consumer端支持的配置: + +:frame: topbot +[width="60%",options="header"] +|==== +^|配置项 ^|含义| 默认值 +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤,订阅所有消息)| +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)| +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false +|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false +|==== + +### Endpoint支持 + +在使用Endpoint特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。 + +* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的 endpoint 路径为 `/rocketmq_binder` +* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的 endpoint 路径为 `/actuator/rocketmq-binder` + +Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。 + +```json +{ + "runtime": { + "lastSend.timestamp": 1542786623915 + }, + "metrics": { + "scs-rocketmq.consumer.test-topic.totalConsumed": { + "count": 11 + }, + "scs-rocketmq.consumer.test-topic.totalConsumedFailures": { + "count": 0 + }, + "scs-rocketmq.producer.test-topic.totalSentFailures": { + "count": 0 + }, + "scs-rocketmq.consumer.test-topic.consumedPerSecond": { + "count": 11, + "fifteenMinuteRate": 0.012163847780107841, + "fiveMinuteRate": 0.03614605351360527, + "meanRate": 0.3493213353657594, + "oneMinuteRate": 0.17099243039490175 + }, + "scs-rocketmq.producer.test-topic.totalSent": { + "count": 5 + }, + "scs-rocketmq.producer.test-topic.sentPerSecond": { + "count": 5, + "fifteenMinuteRate": 0.005540151995103271, + "fiveMinuteRate": 0.01652854617838251, + "meanRate": 0.10697493212602836, + "oneMinuteRate": 0.07995558537067671 + }, + "scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { + "count": 0, + "fifteenMinuteRate": 0.0, + "fiveMinuteRate": 0.0, + "meanRate": 0.0, + "oneMinuteRate": 0.0 + }, + "scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { + "count": 0, + "fifteenMinuteRate": 0.0, + "fiveMinuteRate": 0.0, + "meanRate": 0.0, + "oneMinuteRate": 0.0 + } + } +} +``` + +注意:要想查看统计数据需要在pom里加上 https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core依赖]。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息: + +```json +{ + "warning": "please add metrics-core dependency, we use it for metrics" +} +``` \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 9c8b5221..a54d2c1c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -21,7 +21,7 @@ package org.springframework.cloud.stream.binder.rocketmq; */ public interface RocketMQBinderConstants { - String ENDPOINT_ID = "rocketmq_binder"; + String ENDPOINT_ID = "rocketmq_binder"; /** * Header key @@ -32,6 +32,8 @@ public interface RocketMQBinderConstants { String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; + String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG"; + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; /** @@ -39,23 +41,23 @@ public interface RocketMQBinderConstants { */ String LASTSEND_TIMESTAMP = "lastSend.timestamp"; - interface Metrics { - interface Producer { - String PREFIX = "scs-rocketmq.producer."; - String TOTAL_SENT = "totalSent"; - String TOTAL_SENT_FAILURES = "totalSentFailures"; - String SENT_PER_SECOND = "sentPerSecond"; - String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond"; - } + interface Metrics { + interface Producer { + String PREFIX = "scs-rocketmq.producer."; + String TOTAL_SENT = "totalSent"; + String TOTAL_SENT_FAILURES = "totalSentFailures"; + String SENT_PER_SECOND = "sentPerSecond"; + String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond"; + } - interface Consumer { - String GROUP_PREFIX = "scs-rocketmq.consumerGroup."; - String PREFIX = "scs-rocketmq.consumer."; - String TOTAL_CONSUMED = "totalConsumed"; - String CONSUMED_PER_SECOND = "consumedPerSecond"; - String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures"; - String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond"; - } - } + interface Consumer { + String GROUP_PREFIX = "scs-rocketmq.consumerGroup."; + String PREFIX = "scs-rocketmq.consumer."; + String TOTAL_CONSUMED = "totalConsumed"; + String CONSUMED_PER_SECOND = "consumedPerSecond"; + String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures"; + String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond"; + } + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 03009b43..4982ffa1 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,9 @@ package org.springframework.cloud.stream.binder.rocketmq; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; @@ -36,6 +39,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.util.ClassUtils; /** * @author Timur Valiev @@ -71,9 +75,24 @@ public class RocketMQMessageChannelBinder extends ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - return new RocketMQMessageHandler(destination.getName(), - producerProperties.getExtension(), + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( + destination.getName(), producerProperties, rocketBinderConfigurationProperties, instrumentationManager); + if (producerProperties.getExtension().getTransactional()) { + // transaction message check LocalTransactionExecuter + messageHandler.setLocalTransactionExecuter( + getClassConfiguration(destination.getName(), + producerProperties.getExtension().getExecuter(), + LocalTransactionExecuter.class)); + // transaction message check TransactionCheckListener + messageHandler.setTransactionCheckListener( + getClassConfiguration(destination.getName(), + producerProperties.getExtension() + .getTransactionCheckListener(), + TransactionCheckListener.class)); + } + + return messageHandler; } else { throw new RuntimeException("Binding for channel " + destination.getName() @@ -120,4 +139,46 @@ public class RocketMQMessageChannelBinder extends public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { return extendedBindingProperties.getExtendedProducerProperties(channelName); } + + private T getClassConfiguration(String destName, String className, + Class interfaceClass) { + if (StringUtils.isEmpty(className)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, should set " + + interfaceClass.getSimpleName() + " configuration" + + interfaceClass.getSimpleName() + " should be set, like " + + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour" + + interfaceClass.getSimpleName() + "'"); + } + else if (StringUtils.isNotEmpty(className)) { + Class fieldClass; + // check class exists + try { + fieldClass = ClassUtils.forName(className, + RocketMQMessageChannelBinder.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " class is not found"); + } + // check interface incompatible + if (!interfaceClass.isAssignableFrom(fieldClass)) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " is incompatible with " + interfaceClass.getSimpleName() + + " interface"); + } + try { + return (T) fieldClass.newInstance(); + } + catch (Exception e) { + throw new RuntimeException("Binding for channel " + destName + + " using transactional message, but " + className + + " instance error", e); + } + } + return null; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java index fa480681..23671b60 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java @@ -16,6 +16,12 @@ package org.springframework.cloud.stream.binder.rocketmq; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG; + import java.util.HashMap; import java.util.Map; @@ -29,95 +35,105 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageHeaderAccessor; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; - /** * @author Timur Valiev * @author Jim */ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { - public RocketMQMessageHeaderAccessor() { - super(); - } + public RocketMQMessageHeaderAccessor() { + super(); + } - public RocketMQMessageHeaderAccessor(Message message) { - super(message); - } + public RocketMQMessageHeaderAccessor(Message message) { + super(message); + } - public Acknowledgement getAcknowledgement(Message message) { - return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class); - } + public Acknowledgement getAcknowledgement(Message message) { + return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class); + } - public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) { - setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment); - return this; - } + public RocketMQMessageHeaderAccessor withAcknowledgment( + Acknowledgement acknowledgment) { + setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment); + return this; + } - public String getTags() { - return getMessageHeaders().get(MessageConst.PROPERTY_TAGS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_TAGS); - } + public String getTags() { + return getMessageHeaders().get(MessageConst.PROPERTY_TAGS) == null ? "" + : (String) getMessageHeaders().get(MessageConst.PROPERTY_TAGS); + } - public RocketMQMessageHeaderAccessor withTags(String tag) { - setHeader(MessageConst.PROPERTY_TAGS, tag); - return this; - } + public RocketMQMessageHeaderAccessor withTags(String tag) { + setHeader(MessageConst.PROPERTY_TAGS, tag); + return this; + } - public String getKeys() { - return getMessageHeaders().get(MessageConst.PROPERTY_KEYS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_KEYS); - } + public String getKeys() { + return getMessageHeaders().get(MessageConst.PROPERTY_KEYS) == null ? "" + : (String) getMessageHeaders().get(MessageConst.PROPERTY_KEYS); + } - public RocketMQMessageHeaderAccessor withKeys(String keys) { - setHeader(MessageConst.PROPERTY_KEYS, keys); - return this; - } + public RocketMQMessageHeaderAccessor withKeys(String keys) { + setHeader(MessageConst.PROPERTY_KEYS, keys); + return this; + } - public MessageExt getRocketMessage() { - return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class); - } + public MessageExt getRocketMessage() { + return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class); + } - public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) { - setHeader(ORIGINAL_ROCKET_MESSAGE, message); - return this; - } + public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) { + setHeader(ORIGINAL_ROCKET_MESSAGE, message); + return this; + } - public Integer getDelayTimeLevel() { - return NumberUtils.toInt((String)getMessageHeaders().get(MessageConst.PROPERTY_DELAY_TIME_LEVEL), 0); - } + public Integer getDelayTimeLevel() { + return NumberUtils.toInt( + (String) getMessageHeaders().get(MessageConst.PROPERTY_DELAY_TIME_LEVEL), + 0); + } - public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) { - setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); - return this; - } + public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) { + setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); + return this; + } - public Integer getFlag() { - return NumberUtils.toInt((String)getMessageHeaders().get(ROCKET_FLAG), 0); - } + public Integer getFlag() { + return NumberUtils.toInt((String) getMessageHeaders().get(ROCKET_FLAG), 0); + } - public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) { - setHeader(ROCKET_FLAG, delayTimeLevel); - return this; - } + public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) { + setHeader(ROCKET_FLAG, delayTimeLevel); + return this; + } - public SendResult getSendResult() { - return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); - } + public Object getTransactionalArg() { + return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG); + } - public static void putSendResult(MutableMessage message, SendResult sendResult) { - message.getHeaders().put(ROCKET_SEND_RESULT, sendResult); - } + public Object withTransactionalArg(Object arg) { + setHeader(ROCKET_TRANSACTIONAL_ARG, arg); + return this; + } - public Map getUserProperties() { - Map result = new HashMap<>(); - for (Map.Entry entry : this.toMap().entrySet()) { - if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry - .getKey().equals(MessageHeaders.CONTENT_TYPE)) { - result.put(entry.getKey(), (String)entry.getValue()); - } - } - return result; - } + public SendResult getSendResult() { + return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); + } + + public static void putSendResult(MutableMessage message, SendResult sendResult) { + message.getHeaders().put(ROCKET_SEND_RESULT, sendResult); + } + + public Map getUserProperties() { + Map result = new HashMap<>(); + for (Map.Entry entry : this.toMap().entrySet()) { + if (entry.getValue() instanceof String + && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) + && !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) { + result.put(entry.getKey(), (String) entry.getValue()); + } + } + return result; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 0ab7386c..331df01b 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -21,10 +21,14 @@ import java.util.Map; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -47,16 +51,20 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private InstrumentationManager instrumentationManager; - private final RocketMQProducerProperties producerProperties; + private LocalTransactionExecuter localTransactionExecuter; + + private TransactionCheckListener transactionCheckListener; + + private final ExtendedProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - protected volatile boolean running = false; + private volatile boolean running = false; public RocketMQMessageHandler(String destination, - RocketMQProducerProperties producerProperties, + ExtendedProducerProperties producerProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, InstrumentationManager instrumentationManager) { this.destination = destination; @@ -67,7 +75,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li @Override public void start() { - producer = new DefaultMQProducer(destination); + if (producerProperties.getExtension().getTransactional()) { + producer = new TransactionMQProducer(destination); + if (transactionCheckListener != null) { + ((TransactionMQProducer) producer) + .setTransactionCheckListener(transactionCheckListener); + } + } + else { + producer = new DefaultMQProducer(destination); + } if (instrumentationManager != null) { producerInstrumentation = instrumentationManager @@ -77,8 +94,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - if (producerProperties.getMaxMessageSize() > 0) { - producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + if (producerProperties.getExtension().getMaxMessageSize() > 0) { + producer.setMaxMessageSize( + producerProperties.getExtension().getMaxMessageSize()); } try { @@ -139,7 +157,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li toSend.putUserProperty(entry.getKey(), entry.getValue()); } - SendResult sendRes = producer.send(toSend); + SendResult sendRes; + if (producerProperties.getExtension().getTransactional()) { + sendRes = producer.sendMessageInTransaction(toSend, + localTransactionExecuter, headerAccessor.getTransactionalArg()); + } + else { + sendRes = producer.send(toSend); + } if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { throw new MQClientException("message hasn't been sent", null); @@ -167,4 +192,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li } + public void setLocalTransactionExecuter( + LocalTransactionExecuter localTransactionExecuter) { + this.localTransactionExecuter = localTransactionExecuter; + } + + public void setTransactionCheckListener( + TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } + } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 8c252834..1a05ad50 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -17,6 +17,8 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.TransactionCheckListener; /** * @author Timur Valiev @@ -24,28 +26,62 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; */ public class RocketMQProducerProperties { - private Boolean enabled = true; + private Boolean enabled = true; - /** - * Maximum allowed message size in bytes - * {@link DefaultMQProducer#maxMessageSize} - */ - private Integer maxMessageSize = 0; + /** + * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize} + */ + private Integer maxMessageSize = 0; - public Boolean getEnabled() { - return enabled; - } + private Boolean transactional = false; - public void setEnabled(Boolean enabled) { - this.enabled = enabled; - } + /** + * full class name of {@link LocalTransactionExecuter} + */ + private String executer; - public Integer getMaxMessageSize() { - return maxMessageSize; - } + /** + * full class name of {@link TransactionCheckListener} + */ + private String transactionCheckListener; - public void setMaxMessageSize(Integer maxMessageSize) { - this.maxMessageSize = maxMessageSize; - } + public Boolean getEnabled() { + return enabled; + } + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Integer getMaxMessageSize() { + return maxMessageSize; + } + + public void setMaxMessageSize(Integer maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + public Boolean getTransactional() { + return transactional; + } + + public void setTransactional(Boolean transactional) { + this.transactional = transactional; + } + + public String getExecuter() { + return executer; + } + + public void setExecuter(String executer) { + this.executer = executer; + } + + public String getTransactionCheckListener() { + return transactionCheckListener; + } + + public void setTransactionCheckListener(String transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } } From b0238ac88d76928d96680760e6b41e696aaf5732 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 12 Dec 2018 16:10:19 +0800 Subject: [PATCH 3/5] rocketmq binder update example for 1.x branch --- .../examples/MyTransactionCheckListener.java | 18 ++++++++ .../cloud/examples/MyTransactionExecuter.java | 20 +++++++++ .../cloud/examples/ReceiveService.java | 5 +++ .../cloud/examples/RocketMQApplication.java | 41 +++++++++++++++++-- .../alibaba/cloud/examples/SenderService.java | 20 ++++++--- .../src/main/resources/application.properties | 17 +++++++- 6 files changed, 111 insertions(+), 10 deletions(-) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java new file mode 100644 index 00000000..65d72662 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java @@ -0,0 +1,18 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; + +/** + * @author Jim + */ +public class MyTransactionCheckListener implements TransactionCheckListener { + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + System.out.println("TransactionCheckListener: " + new String(msg.getBody())); + return LocalTransactionState.COMMIT_MESSAGE; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java new file mode 100644 index 00000000..752d4e5f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java @@ -0,0 +1,20 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; + +/** + * @author Jim + */ +public class MyTransactionExecuter implements LocalTransactionExecuter { + @Override + public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + if ("1".equals(msg.getUserProperty("test"))) { + System.out.println(new String(msg.getBody()) + " rollback"); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + System.out.println(new String(msg.getBody()) + " commit"); + return LocalTransactionState.COMMIT_MESSAGE; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index f251902d..d590aa28 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -30,4 +30,9 @@ public class ReceiveService { System.out.println("input1 receive again: " + receiveMsg); } + @StreamListener("input4") + public void receiveTransactionalMsg(String transactionMsg) { + System.out.println("input4 receive transaction msg: " + transactionMsg); + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java index e4a8b1ab..87da880a 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java @@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ Source.class, MySink.class }) +@EnableBinding({ MySource.class, MySink.class }) public class RocketMQApplication { public interface MySink { @@ -29,17 +31,33 @@ public class RocketMQApplication { @Input("input3") SubscribableChannel input3(); + @Input("input4") + SubscribableChannel input4(); + + } + + public interface MySource { + @Output("output1") + MessageChannel output1(); + + @Output("output2") + MessageChannel output2(); } public static void main(String[] args) { SpringApplication.run(RocketMQApplication.class, args); - } + } @Bean public CustomRunner customRunner() { return new CustomRunner(); } + @Bean + public CustomRunnerWithTransactional customRunnerWithTransactional() { + return new CustomRunnerWithTransactional(); + } + public static class CustomRunner implements CommandLineRunner { @Autowired private SenderService senderService; @@ -62,4 +80,21 @@ public class RocketMQApplication { } } + public static class CustomRunnerWithTransactional implements CommandLineRunner { + @Autowired + private SenderService senderService; + + @Override + public void run(String... args) throws Exception { + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg1", false); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg2", true); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg3", true); + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg4", false); + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index 1c886d01..9614cc5d 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -5,7 +5,7 @@ import java.util.Map; import org.apache.rocketmq.common.message.MessageConst; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils; public class SenderService { @Autowired - private Source source; + private MySource source; public void send(String msg) throws Exception { - source.output().send(MessageBuilder.withPayload(msg).build()); + source.output1().send(MessageBuilder.withPayload(msg).build()); } public void sendWithTags(T msg, String tag) throws Exception { Map map = new HashMap<>(); map.put(MessageConst.PROPERTY_TAGS, tag); Message message = MessageBuilder.createMessage(msg, new MessageHeaders(map)); - source.output().send(message); + source.output1().send(message); } public void sendObject(T msg, String tag) throws Exception { @@ -37,7 +37,17 @@ public class SenderService { .setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); - source.output().send(message); + source.output1().send(message); + } + + public void sendTransactionalMsg(T msg, boolean error) throws Exception { + MessageBuilder builder = MessageBuilder.withPayload(msg) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); + if (error) { + builder.setHeader("test", "1"); + } + Message message = builder.build(); + source.output2().send(message); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties index 0ba6a54e..54bf902b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties @@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 -spring.cloud.stream.bindings.output.destination=test-topic -spring.cloud.stream.bindings.output.content-type=application/json +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter +spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain @@ -26,5 +32,12 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 +spring.cloud.stream.bindings.input4.destination=TransactionTopic +spring.cloud.stream.bindings.input4.content-type=text/plain +spring.cloud.stream.bindings.input4.group=transaction-group +spring.cloud.stream.bindings.input4.consumer.concurrency=210 + +spring.application.name=rocketmq-example + server.port=28081 management.security.enabled=false \ No newline at end of file From 2bb73f4524cdf981c4a78161c4fc818bb0c6f38d Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Fri, 14 Dec 2018 18:14:32 +0800 Subject: [PATCH 4/5] check ParamFlowRule --- .../custom/SentinelDataSourceHandler.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelDataSourceHandler.java b/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelDataSourceHandler.java index 4121d198..436a9ff5 100644 --- a/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelDataSourceHandler.java +++ b/spring-cloud-alibaba-sentinel/src/main/java/org/springframework/cloud/alibaba/sentinel/custom/SentinelDataSourceHandler.java @@ -51,6 +51,8 @@ import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleManager; import com.alibaba.csp.sentinel.slots.system.SystemRule; import com.alibaba.csp.sentinel.slots.system.SystemRuleManager; @@ -71,7 +73,8 @@ public class SentinelDataSourceHandler { private List dataTypeList = Arrays.asList("json", "xml"); private List> rulesList = Arrays.asList(FlowRule.class, - DegradeRule.class, SystemRule.class, AuthorityRule.class); + DegradeRule.class, SystemRule.class, AuthorityRule.class, + ParamFlowRule.class); private List dataSourceBeanNameList = Collections .synchronizedList(new ArrayList()); @@ -163,9 +166,12 @@ public class SentinelDataSourceHandler { else if (ruleType == SystemRule.class) { SystemRuleManager.register2Property(sentinelProperty); } - else { + else if (ruleType == AuthorityRule.class) { AuthorityRuleManager.register2Property(sentinelProperty); } + else { + ParamFlowRuleManager.register2Property(sentinelProperty); + } } } } @@ -337,14 +343,15 @@ public class SentinelDataSourceHandler { } private boolean checkAllRuleTypeSame(List convertedRuleList) { - int matchCount = 0; - for(Object rule : convertedRuleList) { - if(rulesList.contains(rule.getClass()) && rule.getClass() == convertedRuleList.get(0).getClass()) { - matchCount ++; - } - } - return matchCount == convertedRuleList.size(); - } + int matchCount = 0; + for (Object rule : convertedRuleList) { + if (rulesList.contains(rule.getClass()) + && rule.getClass() == convertedRuleList.get(0).getClass()) { + matchCount++; + } + } + return matchCount == convertedRuleList.size(); + } public List getDataSourceBeanNameList() { return dataSourceBeanNameList; From 2c06acc9962dee196431c0e9202775470837dded Mon Sep 17 00:00:00 2001 From: xiaolongzuo <150349407@qq.com> Date: Fri, 14 Dec 2018 18:34:43 +0800 Subject: [PATCH 5/5] Add SchedulerX module for branch 1.x. --- pom.xml | 1 + spring-cloud-alibaba-dependencies/pom.xml | 19 ++- .../src/main/asciidoc-zh/schedulerx.adoc | 131 ++++++++++++++++++ .../asciidoc-zh/spring-cloud-alibaba.adoc | 1 + .../src/main/asciidoc/schedulerx.adoc | 0 .../main/asciidoc/spring-cloud-alibaba.adoc | 1 + spring-cloud-alibaba-examples/pom.xml | 1 + .../schedulerx-simple-task-example/pom.xml | 37 +++++ .../readme-zh.md | 47 +++++++ .../cloud/examples/ScxApplication.java | 32 +++++ .../alibaba/cloud/examples/SimpleTask.java | 35 +++++ .../src/main/resources/application.properties | 3 + spring-cloud-alicloud-context/pom.xml | 6 + .../nacos/NacosParameterInitListener.java | 7 +- .../scx/ScxContextAutoConfiguration.java | 71 ++++++++++ .../alicloud/context/scx/ScxProperties.java | 51 +++++++ .../statistics/StatisticsTaskStarter.java | 108 +++++++++++++++ .../main/resources/META-INF/spring.factories | 4 +- .../AliCloudSpringApplicationTests.java | 4 +- .../context/scx/ScxPropertiesLoadTests.java | 50 +++++++ .../alicloud/scx/ScxAutoConfiguration.java | 23 +++ spring-cloud-alicloud-schedulerx/pom.xml | 48 +++++++ .../alicloud/scx/ScxAutoConfiguration.java | 29 ++++ .../alicloud/scx/endpoint/ScxEndpoint.java | 58 ++++++++ .../ScxEndpointAutoConfiguration.java | 37 +++++ .../main/resources/META-INF/spring.factories | 3 + spring-cloud-starter-alicloud/pom.xml | 3 +- .../pom.xml | 20 +++ 28 files changed, 823 insertions(+), 7 deletions(-) create mode 100644 spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc create mode 100644 spring-cloud-alibaba-docs/src/main/asciidoc/schedulerx.adoc create mode 100644 spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/readme-zh.md create mode 100644 spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java create mode 100644 spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java create mode 100644 spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/resources/application.properties create mode 100644 spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java create mode 100644 spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java create mode 100644 spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java create mode 100644 spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxPropertiesLoadTests.java create mode 100644 spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java create mode 100644 spring-cloud-alicloud-schedulerx/pom.xml create mode 100644 spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java create mode 100644 spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpoint.java create mode 100644 spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpointAutoConfiguration.java create mode 100644 spring-cloud-alicloud-schedulerx/src/main/resources/META-INF/spring.factories create mode 100644 spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerx/pom.xml diff --git a/pom.xml b/pom.xml index 053e6e20..34abc02b 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ spring-cloud-alicloud-context spring-cloud-alicloud-acm spring-cloud-alicloud-ans + spring-cloud-alicloud-schedulerx diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index f438ccf0..cd17626b 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -22,9 +22,10 @@ 1.0.8 0.1.1 4.0.1 - 1.0.0 + 1.0.2 2.16.0 4.3.1 + 2.1.6 @@ -61,6 +62,11 @@ acm-sdk ${acm.version} + + com.alibaba.edas + schedulerX-client + ${schedulerX.client.version} + com.alibaba.nacos nacos-client @@ -172,6 +178,11 @@ spring-cloud-alicloud-ans ${project.version} + + org.springframework.cloud + spring-cloud-alicloud-schedulerx + ${project.version} + org.springframework.cloud spring-cloud-alicloud-context @@ -219,6 +230,12 @@ ${project.version} + + org.springframework.cloud + spring-cloud-starter-alicloud-schedulerx + ${project.version} + + org.springframework.cloud spring-cloud-starter-stream-rocketmq diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc new file mode 100644 index 00000000..0c33b407 --- /dev/null +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/schedulerx.adoc @@ -0,0 +1,131 @@ +== Spring Cloud Alibaba Cloud SchedulerX + +SchedulerX(分布式任务调度) 是隶属于阿里云EDAS产品的组件, Spring Cloud AliCloud SchedulerX 提供了在Spring Cloud的配置规范下,分布式任务调度的功能支持。SchedulerX可提供秒级、精准、高可靠、高可用的定时任务调度服务,并支持多种类型的任务调度,如简单单机任务、简单多机任务、脚本任务以及网格任务。 + +=== 如何引入 Spring Cloud AliCloud SchedulerX + +Spring Cloud Alibaba 已经发布了0.1.1版本,需要首先导入依赖管理POM。 + +[source,xml] +---- + + + + org.springframework.cloud + spring-cloud-alibaba-dependencies + 0.1.1.RELEASE + pom + import + + + +---- + +接下来引入 Spring Cloud AliCloud SchedulerX Starter 即可。 + +[source,xml] +---- + + org.springframework.cloud + spring-cloud-starter-alicloud-schedulerX + +---- + +=== 启动SchedulerX任务调度 + +当客户端引入了 Spring Cloud AliCloud SchedulerX Starter 以后,只需要进行一些简单的配置,就可以自动初始化SchedulerX的任务调度服务。 + +以下是一个简单的应用示例。 + +[source,java] +---- +@SpringBootApplication +public class ScxApplication { + + public static void main(String[] args) { + SpringApplication.run(ScxApplication.class, args); + } + +} +---- + +在application.properties中,需要加上以下配置。 + +[source,properties] +---- +server.port=18033 +# 其中cn-test是SchedulerX的测试区域 +spring.cloud.alicloud.scx.group-id=*** +spring.cloud.alicloud.edas.namespace=cn-test +---- + +在获取group-id之前,需要首先 https://account.aliyun.com/register/register.htm?spm=5176.8142029.388261.26.e9396d3eEIv28g&oauth_callback=https%3A%2F%2Fwww.aliyun.com%2F[注册阿里云账号] ,然后 https://common-buy.aliyun.com/?spm=5176.11451019.0.0.6f5965c0Uq5tue&commodityCode=edaspostpay#/buy[开通EDAS服务] ,并 https://edas.console.aliyun.com/#/edasTools[开通分布式任务管理组件] 。 + +其中group-id的获取,请参考 https://help.aliyun.com/document_detail/98784.html?spm=a2c4g.11186623.2.17.23c87da9P2F3tG[这里]。 + +NOTE: 在创建group的时候,要选择"测试"区域。 + +=== 编写一个简单任务 + +简单任务是最常用的任务类型,只需要实现 ScxSimpleJobProcessor 接口即可。 + +以下是一个简单的单机类型任务示例。 + +[source,java] +---- +public class SimpleTask implements ScxSimpleJobProcessor { + + @Override + public ProcessResult process(ScxSimpleJobContext context) { + System.out.println("-----------Hello world---------------"); + ProcessResult processResult = new ProcessResult(true); + return processResult; + } + +} +---- + +=== 对任务进行调度 + +进入 https://edas.console.aliyun.com/#/edasSchedulerXJob?regionNo=cn-test[SchedulerX任务列表] 页面,选择上方"测试"区域,点击右上角"新建Job",创建一个Job,即如下所示。 + +[source,text] +---- +Job分组:测试——***-*-*-**** +Job处理接口:org.springframework.cloud.alibaba.cloud.examples.SimpleTask +类型:简单Job单机版 +定时表达式:默认选项——0 * * * * ? +Job描述:无 +自定义参数:无 +---- + +以上任务类型选择了"简单Job单机版",并且制定了Cron表达式为"0 * * * * ?",这意味着,每过一分钟,任务将会被执行且只执行一次。 + +更多任务类型,请参考 https://help.aliyun.com/document_detail/43136.html[SchedulerX官方文档]。 + +=== 生产环境使用 + +以上使用的都是SchedulerX的"测试"区域,主要用于本地调试和测试。 + +在生产级别,除了上面的group-id和namespace以外,还需要一些额外的配置,如下所示。 + +[source,properties] +---- +server.port=18033 +# 其中cn-test是SchedulerX的测试区域 +spring.cloud.alicloud.scx.group-id=*** +spring.cloud.alicloud.edas.namespace=*** +# 当应用运行在EDAS上时,以下配置不需要手动配置。 +spring.cloud.alicloud.access-key=*** +spring.cloud.alicloud.secret-key=*** +# 以下配置不是必须的,请参考SchedulerX文档 +spring.cloud.alicloud.scx.domain-name=*** +---- + +其中group-id与之前的获取方式一样,namespace则是从EDAS控制台左侧"命名空间"列表中获取命名空间ID。 + +NOTE: group-id必须创建在namespace当中。 + +access-key以及secret-key为阿里云账号的AK/SK信息,如果应用在EDAS上部署,则不需要填写这两项信息,否则请前往 https://usercenter.console.aliyun.com/#/manage/ak[安全信息管理]获取。 + +domain-name并不是必须的,具体请参考 https://help.aliyun.com/document_detail/35359.html[SchedulerX官方文档]。 diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc index 6a05ccde..5cde376e 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/spring-cloud-alibaba.adoc @@ -29,3 +29,4 @@ include::acm.adoc[] include::oss.adoc[] +include::schedulerx.adoc[] \ No newline at end of file diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc/schedulerx.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc/schedulerx.adoc new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc/spring-cloud-alibaba.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc/spring-cloud-alibaba.adoc index a8c52ae2..294e493b 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc/spring-cloud-alibaba.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc/spring-cloud-alibaba.adoc @@ -29,3 +29,4 @@ include::acm.adoc[] include::oss.adoc[] +include::schedulerx.adoc[] \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 98b392f8..9c207c18 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -28,6 +28,7 @@ ans-example/ans-provider-example acm-example/acm-local-example rocketmq-example + schedulerx-example/schedulerx-simple-task-example diff --git a/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/pom.xml b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/pom.xml new file mode 100644 index 00000000..5850143a --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/pom.xml @@ -0,0 +1,37 @@ + + + + spring-cloud-alibaba-examples + org.springframework.cloud + 0.1.1.BUILD-SNAPSHOT + ../../pom.xml + + 4.0.0 + schedulerx-simple-task-example + + + + org.springframework.cloud + spring-cloud-starter-alicloud-schedulerx + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/readme-zh.md b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/readme-zh.md new file mode 100644 index 00000000..9e9b6f02 --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/readme-zh.md @@ -0,0 +1,47 @@ +# SchedulerX Simple Task Example + +## 项目说明 + +本项目展示了,在Spring Cloud体系中,如何快如接入SchedulerX,使用任务调度服务。 + +SchedulerX 是阿里中间件团队开发的一款分布式任务调度产品。它为您提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。同时提供分布式的任务执行模型,如网格任务。网格任务支持海量子任务均匀分配到所有 Worker(schedulerx-client)上执行。 + +## 示例 + +### 准备工作 + +1. 请先[注册阿里云账号](https://account.aliyun.com/register/register.htm?spm=5176.8142029.388261.26.e9396d3eEIv28g&oauth_callback=https%3A%2F%2Fwww.aliyun.com%2F) + +2. SchedulerX集成到了EDAS组件中心,因此需要[开通EDAS服务](https://common-buy.aliyun.com/?spm=5176.11451019.0.0.6f5965c0Uq5tue&commodityCode=edaspostpay#/buy) + +3. 到[EDAS组件中心](https://edas.console.aliyun.com/#/edasTools)开通SchedulerX组件,即分布式任务管理。 + +4. 进入[SchedulerX分组管理](https://edas.console.aliyun.com/#/schedulerXGroup?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建分组",创建一个分组。 + +5. 将"分组ID"的值填写到`application.properties`文件中`key`为`spring.cloud.alicloud.scx.group-id`对应的value值,即如下所示。 + + spring.cloud.alicloud.scx.group-id=111-1-1-1111 + +6. 进入[SchedulerX任务列表](https://edas.console.aliyun.com/#/edasSchedulerXJob?regionNo=cn-test)页面,选择上方"测试"区域,点击右上角"新建Job",创建一个Job,即如下所示。 + + Job分组:测试——111-1-1-1111 + Job处理接口:org.springframework.cloud.alibaba.cloud.examples.SimpleTask + 类型:简单Job单机版 + 定时表达式:默认选项——0 * * * * ? + Job描述:无 + 自定义参数:无 + +### 启动应用 + +直接运行main class,即`ScxApplication`。 + +### 查看效果 + +观察应用的控制台日志输出,可以看到每一分钟会打印一次如下日志。 + +``` + -----------Hello world--------------- +``` + +如果您对 Spring Cloud SchedulerX Starter 有任何建议或想法,欢迎提交 issue 中或者通过其他社区渠道向我们反馈。 + diff --git a/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java new file mode 100644 index 00000000..baccfd4f --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ScxApplication.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author xiaolongzuo + */ +@SpringBootApplication +public class ScxApplication { + + public static void main(String[] args) { + SpringApplication.run(ScxApplication.class, args); + } + +} diff --git a/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java new file mode 100644 index 00000000..e49c0a05 --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SimpleTask.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import com.alibaba.edas.schedulerx.ProcessResult; +import com.alibaba.edas.schedulerx.ScxSimpleJobContext; +import com.alibaba.edas.schedulerx.ScxSimpleJobProcessor; + +/** + * @author xiaolongzuo + */ +public class SimpleTask implements ScxSimpleJobProcessor { + + @Override + public ProcessResult process(ScxSimpleJobContext context) { + System.out.println("-----------Hello world---------------"); + ProcessResult processResult = new ProcessResult(true); + return processResult; + } + +} diff --git a/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/resources/application.properties new file mode 100644 index 00000000..a995fa3f --- /dev/null +++ b/spring-cloud-alibaba-examples/schedulerx-example/schedulerx-simple-task-example/src/main/resources/application.properties @@ -0,0 +1,3 @@ +server.port=18033 +spring.cloud.alicloud.scx.group-id=*** +spring.cloud.alicloud.edas.namespace=cn-test diff --git a/spring-cloud-alicloud-context/pom.xml b/spring-cloud-alicloud-context/pom.xml index 891f3650..68aa20a9 100644 --- a/spring-cloud-alicloud-context/pom.xml +++ b/spring-cloud-alicloud-context/pom.xml @@ -50,6 +50,12 @@ provided + + com.alibaba.edas + schedulerX-client + provided + + com.alibaba.edas.acm acm-sdk diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java index 6b5a68bd..e3e5eae5 100644 --- a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/nacos/NacosParameterInitListener.java @@ -1,12 +1,13 @@ package org.springframework.cloud.alicloud.context.nacos; -import com.alibaba.cloud.context.edas.EdasChangeOrderConfiguration; -import com.alibaba.cloud.context.edas.EdasChangeOrderConfigurationFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent; import org.springframework.context.ApplicationListener; +import com.alibaba.cloud.context.edas.EdasChangeOrderConfiguration; +import com.alibaba.cloud.context.edas.EdasChangeOrderConfigurationFactory; + /** * A listener that prepare initialize the nacos configuration for aliyun acm * @@ -24,7 +25,7 @@ public class NacosParameterInitListener private void preparedNacosConfiguration() { EdasChangeOrderConfiguration edasChangeOrderConfiguration = EdasChangeOrderConfigurationFactory - .buildEdasChangeOrderConfiguration(); + .getEdasChangeOrderConfiguration(); log.info("Initialize Nacos Parameter from edas change order,is edas managed {}.", edasChangeOrderConfiguration.isEdasManaged()); if (!edasChangeOrderConfiguration.isEdasManaged()) { diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java new file mode 100644 index 00000000..075538e5 --- /dev/null +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxContextAutoConfiguration.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.scx; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.alicloud.context.AliCloudProperties; +import org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.context.annotation.Configuration; + +import com.alibaba.cloud.context.edas.AliCloudEdasSdk; +import com.alibaba.cloud.context.scx.AliCloudScxInitializer; +import com.alibaba.dts.common.exception.InitException; + +/** + * @author xiaolongzuo + */ +@Configuration +@ConditionalOnClass(name = "org.springframework.cloud.alicloud.scx.ScxAutoConfiguration") +@EnableConfigurationProperties(ScxProperties.class) +@ImportAutoConfiguration(EdasContextAutoConfiguration.class) +public class ScxContextAutoConfiguration { + + private static final Logger log = LoggerFactory + .getLogger(ScxContextAutoConfiguration.class); + + @Autowired + private AliCloudProperties aliCloudProperties; + + @Autowired + private EdasProperties edasProperties; + + @Autowired + private ScxProperties scxProperties; + + @Autowired + private AliCloudEdasSdk aliCloudEdasSdk; + + @PostConstruct + public void initAcmProperties() { + try { + AliCloudScxInitializer.initialize(aliCloudProperties, edasProperties, + scxProperties, aliCloudEdasSdk); + } + catch (InitException e) { + log.error("Init SchedulerX failed.", e); + throw new RuntimeException(e); + } + } +} diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java new file mode 100644 index 00000000..db6a00d6 --- /dev/null +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/scx/ScxProperties.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.scx; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import com.alibaba.cloud.context.scx.ScxConfiguration; + +/** + * @author xiaolongzuo + */ +@ConfigurationProperties("spring.cloud.alicloud.scx") +public class ScxProperties implements ScxConfiguration { + + private String groupId; + + private String domainName; + + @Override + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + @Override + public String getDomainName() { + return domainName; + } + + public void setDomainName(String domainName) { + this.domainName = domainName; + } + +} diff --git a/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java new file mode 100644 index 00000000..41c3591b --- /dev/null +++ b/spring-cloud-alicloud-context/src/main/java/org/springframework/cloud/alicloud/context/statistics/StatisticsTaskStarter.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.statistics; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.cloud.alicloud.context.acm.AcmContextBootstrapConfiguration; +import org.springframework.cloud.alicloud.context.acm.AcmProperties; +import org.springframework.cloud.alicloud.context.ans.AnsContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.ans.AnsProperties; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.oss.OssProperties; +import org.springframework.cloud.alicloud.context.scx.ScxContextAutoConfiguration; +import org.springframework.cloud.alicloud.context.scx.ScxProperties; +import org.springframework.context.annotation.Configuration; + +import com.alibaba.cloud.context.AliCloudServerMode; +import com.alibaba.cloud.context.edas.AliCloudEdasSdk; +import com.alibaba.cloud.context.statistics.StatisticsTask; + +/** + * @author xiaolongzuo + */ +@Configuration +@AutoConfigureAfter({ ScxContextAutoConfiguration.class, + OssContextAutoConfiguration.class, AnsContextAutoConfiguration.class, + AcmContextBootstrapConfiguration.class }) +public class StatisticsTaskStarter implements InitializingBean { + + @Autowired(required = false) + private AliCloudEdasSdk aliCloudEdasSdk; + + @Autowired(required = false) + private EdasProperties edasProperties; + + @Autowired(required = false) + private ScxProperties scxProperties; + + @Autowired(required = false) + private OssProperties ossProperties; + + @Autowired(required = false) + private AnsProperties ansProperties; + + @Autowired(required = false) + private AcmProperties acmProperties; + + @Autowired(required = false) + private ScxContextAutoConfiguration scxContextAutoConfiguration; + + @Autowired(required = false) + private OssContextAutoConfiguration ossContextAutoConfiguration; + + @Autowired(required = false) + private AnsContextAutoConfiguration ansContextAutoConfiguration; + + @Autowired(required = false) + private AcmContextBootstrapConfiguration acmContextBootstrapConfiguration; + + @Override + public void afterPropertiesSet() { + StatisticsTask statisticsTask = new StatisticsTask(aliCloudEdasSdk, + edasProperties, getComponents()); + statisticsTask.start(); + } + + private List getComponents() { + List components = new ArrayList<>(); + if (scxContextAutoConfiguration != null && scxProperties != null) { + components.add("SC-SCX"); + } + if (ossContextAutoConfiguration != null && ossProperties != null) { + components.add("SC-OSS"); + } + boolean edasEnabled = edasProperties != null && edasProperties.isEnabled(); + boolean ansEnableEdas = edasEnabled || (ansProperties != null + && ansProperties.getServerMode() == AliCloudServerMode.EDAS); + if (ansContextAutoConfiguration != null && ansEnableEdas) { + components.add("SC-ANS"); + } + boolean acmEnableEdas = edasEnabled || (acmProperties != null + && acmProperties.getServerMode() == AliCloudServerMode.EDAS); + if (acmContextBootstrapConfiguration != null && acmEnableEdas) { + components.add("SC-ACM"); + } + return components; + } + +} diff --git a/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories b/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories index b5ed9c32..fb9957b3 100644 --- a/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-alicloud-context/src/main/resources/META-INF/spring.factories @@ -4,7 +4,9 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alicloud.context.AliCloudContextAutoConfiguration,\ org.springframework.cloud.alicloud.context.edas.EdasContextAutoConfiguration,\ org.springframework.cloud.alicloud.context.ans.AnsContextAutoConfiguration,\ - org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration + org.springframework.cloud.alicloud.context.oss.OssContextAutoConfiguration,\ + org.springframework.cloud.alicloud.context.scx.ScxContextAutoConfiguration,\ + org.springframework.cloud.alicloud.context.statistics.StatisticsTaskStarter org.springframework.context.ApplicationListener=\ org.springframework.cloud.alicloud.context.ans.AnsContextApplicationListener,\ org.springframework.cloud.alicloud.context.nacos.NacosParameterInitListener \ No newline at end of file diff --git a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java index 636adf64..471ffbb9 100644 --- a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java +++ b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/AliCloudSpringApplicationTests.java @@ -33,7 +33,9 @@ import org.springframework.test.context.junit4.SpringRunner; "spring.application.name=myapp", "spring.cloud.alicloud.edas.application.name=myapp", "spring.cloud.alicloud.access-key=ak", "spring.cloud.alicloud.secret-key=sk", - "spring.cloud.alicloud.oss.endpoint=test" }, webEnvironment = RANDOM_PORT) + "spring.cloud.alicloud.oss.endpoint=test", + "spring.cloud.alicloud.scx.group-id=1-2-3-4", + "spring.cloud.alicloud.edas.namespace=cn-test" }, webEnvironment = RANDOM_PORT) @DirtiesContext public class AliCloudSpringApplicationTests { diff --git a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxPropertiesLoadTests.java b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxPropertiesLoadTests.java new file mode 100644 index 00000000..78e6f394 --- /dev/null +++ b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/context/scx/ScxPropertiesLoadTests.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.context.scx; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author xiaolongzuo + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = ScxContextAutoConfiguration.class, properties = { + "spring.cloud.alicloud.scx.group-id=1-2-3-4", + "spring.cloud.alicloud.edas.namespace=cn-test" }) +public class ScxPropertiesLoadTests { + + @Autowired + private EdasProperties edasProperties; + + @Autowired + private ScxProperties scxProperties; + + @Test + public void testSxcProperties() { + assertThat(scxProperties.getGroupId()).isEqualTo("1-2-3-4"); + assertThat(edasProperties.getNamespace()).isEqualTo("cn-test"); + assertThat(scxProperties.getDomainName()).isNull(); + } + +} diff --git a/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java new file mode 100644 index 00000000..a1128b92 --- /dev/null +++ b/spring-cloud-alicloud-context/src/test/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.scx; + +/** + * @author xiaolongzuo + */ +public class ScxAutoConfiguration { +} diff --git a/spring-cloud-alicloud-schedulerx/pom.xml b/spring-cloud-alicloud-schedulerx/pom.xml new file mode 100644 index 00000000..a2a5e117 --- /dev/null +++ b/spring-cloud-alicloud-schedulerx/pom.xml @@ -0,0 +1,48 @@ + + + + spring-cloud-alibaba + org.springframework.cloud + 0.1.1.BUILD-SNAPSHOT + + 4.0.0 + spring-cloud-alicloud-schedulerx + Spring Cloud Alibaba Cloud SchedulerX + + + + org.springframework.cloud + spring-cloud-alicloud-context + + + org.slf4j + slf4j-api + + + com.alibaba.edas + schedulerX-client + + + com.aliyun + aliyun-java-sdk-core + + + com.aliyun + aliyun-java-sdk-edas + + + org.springframework.boot + spring-boot-autoconfigure + provided + true + + + org.springframework.boot + spring-boot-actuator + true + + + + \ No newline at end of file diff --git a/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java b/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java new file mode 100644 index 00000000..77669f50 --- /dev/null +++ b/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/ScxAutoConfiguration.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.scx; + +import org.springframework.context.annotation.Configuration; + +/** + * placeholder configuration + * + * @author xiaolongzuo + */ +@Configuration +public class ScxAutoConfiguration { + +} diff --git a/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpoint.java b/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpoint.java new file mode 100644 index 00000000..96c31901 --- /dev/null +++ b/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpoint.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.scx.endpoint; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.actuate.endpoint.AbstractEndpoint; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.cloud.alicloud.context.scx.ScxProperties; + +/** + * @author xiaolongzuo + */ +public class ScxEndpoint extends AbstractEndpoint> { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScxEndpoint.class); + + private ScxProperties scxProperties; + + private EdasProperties edasProperties; + + public ScxEndpoint(EdasProperties edasProperties, ScxProperties scxProperties) { + super("scx", false); + this.edasProperties = edasProperties; + this.scxProperties = scxProperties; + } + + /** + * @return scx endpoint + */ + @Override + public Map invoke() { + Map scxEndpoint = new HashMap<>(); + LOGGER.info("SCX endpoint invoke, scxProperties is {}", scxProperties); + scxEndpoint.put("namespace", + edasProperties == null ? "" : edasProperties.getNamespace()); + scxEndpoint.put("scxProperties", scxProperties); + return scxEndpoint; + } + +} diff --git a/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpointAutoConfiguration.java b/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpointAutoConfiguration.java new file mode 100644 index 00000000..1d54e464 --- /dev/null +++ b/spring-cloud-alicloud-schedulerx/src/main/java/org/springframework/cloud/alicloud/scx/endpoint/ScxEndpointAutoConfiguration.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alicloud.scx.endpoint; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; +import org.springframework.cloud.alicloud.context.edas.EdasProperties; +import org.springframework.cloud.alicloud.context.scx.ScxProperties; +import org.springframework.context.annotation.Bean; + +/** + * @author xiaolongzuo + */ +@ConditionalOnWebApplication +@ConditionalOnClass(name = "org.springframework.boot.actuate.endpoint.AbstractEndpoint") +public class ScxEndpointAutoConfiguration { + + @Bean + public ScxEndpoint scxEndpoint(EdasProperties edasProperties, + ScxProperties scxProperties) { + return new ScxEndpoint(edasProperties, scxProperties); + } +} diff --git a/spring-cloud-alicloud-schedulerx/src/main/resources/META-INF/spring.factories b/spring-cloud-alicloud-schedulerx/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..5003685d --- /dev/null +++ b/spring-cloud-alicloud-schedulerx/src/main/resources/META-INF/spring.factories @@ -0,0 +1,3 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.springframework.cloud.alicloud.scx.endpoint.ScxEndpointAutoConfiguration,\ + org.springframework.cloud.alicloud.scx.ScxAutoConfiguration \ No newline at end of file diff --git a/spring-cloud-starter-alicloud/pom.xml b/spring-cloud-starter-alicloud/pom.xml index 75f0a9b2..c7e11ca8 100644 --- a/spring-cloud-starter-alicloud/pom.xml +++ b/spring-cloud-starter-alicloud/pom.xml @@ -11,11 +11,12 @@ spring-cloud-starter-alicloud pom Spring Cloud Alibaba Cloud Starters - Spring Cloud Alibaba Cloud Starters + Spring Cloud Alibaba Cloud Starters spring-cloud-starter-alicloud-oss spring-cloud-starter-alicloud-acm spring-cloud-starter-alicloud-ans + spring-cloud-starter-alicloud-schedulerx diff --git a/spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerx/pom.xml b/spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerx/pom.xml new file mode 100644 index 00000000..c9d15bb8 --- /dev/null +++ b/spring-cloud-starter-alicloud/spring-cloud-starter-alicloud-schedulerx/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + + org.springframework.cloud + spring-cloud-starter-alicloud + 0.1.1.BUILD-SNAPSHOT + + spring-cloud-starter-alicloud-schedulerx + Spring Cloud Starter Alibaba Cloud SchedulerX + + + + org.springframework.cloud + spring-cloud-alicloud-schedulerx + + + +