mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Compare commits
51 Commits
2020.0.RC1
...
master
Author | SHA1 | Date | |
---|---|---|---|
|
196f06bde9 | ||
|
e4f2f4150c | ||
|
2a8647c263 | ||
|
4786725e8e | ||
|
139d793302 | ||
|
a498f39c19 | ||
|
a2b2c2f8c1 | ||
|
480fa336bf | ||
|
df2bced6f2 | ||
|
479540c835 | ||
|
235e9ee494 | ||
|
a0ad31f11c | ||
|
28d2100f5c | ||
|
da26be5bdf | ||
|
4ffbe22c8c | ||
|
91805b2695 | ||
|
66bdd10ac9 | ||
|
c3d393b09e | ||
|
0612c9c726 | ||
|
39d6a3e1ae | ||
|
55b3336b6c | ||
|
048d1ceddf | ||
|
3f039ad099 | ||
|
cf71b7aa26 | ||
|
f816114122 | ||
|
3d5eaefe1c | ||
|
223dd559a9 | ||
|
e4a8a14b1b | ||
|
4bb797407a | ||
|
c9984f5880 | ||
|
c76f407038 | ||
|
3ad550f211 | ||
|
8894793ce1 | ||
|
d653a14476 | ||
|
a869ef7ed6 | ||
|
eaab60ebdb | ||
|
139b03e652 | ||
|
51a8b32ff3 | ||
|
a42363878b | ||
|
fd1deaa475 | ||
|
884399d73b | ||
|
11b95f444f | ||
|
70c7a9c022 | ||
|
2701624da2 | ||
|
9d164f9a0a | ||
|
4817ca53b8 | ||
|
15df0bff6b | ||
|
7684fe71ca | ||
|
441bcb9124 | ||
|
1ecf526b5c | ||
|
770382b36a |
15
README-zh.md
15
README-zh.md
@ -48,15 +48,16 @@ Spring Cloud Alibaba 致力于提供微服务开发的一站式解决方案。
|
||||
更多组件请参考 [Roadmap](https://github.com/alibaba/spring-cloud-alibaba/blob/master/Roadmap-zh.md)。
|
||||
|
||||
## 如何构建
|
||||
|
||||
* master 分支对应的是 Spring Cloud Greenwich,最低支持 JDK 1.8。
|
||||
* 2020.0 分支对应的是 Spring Cloud 2020,最低支持 JDK 1.8。
|
||||
* master 分支对应的是 Spring Cloud Hoxton,最低支持 JDK 1.8。
|
||||
* greenwich 分支对应的是 Spring Cloud Greenwich,最低支持 JDK 1.8。
|
||||
* finchley 分支对应的是 Spring Cloud Finchley,最低支持 JDK 1.8。
|
||||
* 1.x 分支对应的是 Spring Cloud Edgware,最低支持 JDK 1.7。
|
||||
|
||||
Spring Cloud 使用 Maven 来构建,最快的使用方式是将本项目 clone 到本地,然后执行以下命令:
|
||||
|
||||
```bash
|
||||
./mvnw install
|
||||
|
||||
```
|
||||
执行完毕后,项目将被安装到本地 Maven 仓库。
|
||||
|
||||
## 如何使用
|
||||
@ -64,7 +65,7 @@ Spring Cloud 使用 Maven 来构建,最快的使用方式是将本项目 clone
|
||||
### 如何引入依赖
|
||||
|
||||
如果需要使用已发布的版本,在 `dependencyManagement` 中添加如下配置。
|
||||
|
||||
```xml
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@ -76,7 +77,7 @@ Spring Cloud 使用 Maven 来构建,最快的使用方式是将本项目 clone
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
```
|
||||
然后在 `dependencies` 中添加自己所需使用的依赖即可使用。
|
||||
|
||||
## 演示 Demo
|
||||
@ -111,7 +112,7 @@ Example 列表:
|
||||
* 2.0.x 版本适用于 Spring Boot 2.0.x
|
||||
* 2.1.x 版本适用于 Spring Boot 2.1.x
|
||||
* 2.2.x 版本适用于 Spring Boot 2.2.x
|
||||
|
||||
* 2021.x 版本适用于 Spring Boot 2.4.x
|
||||
|
||||
## 社区交流
|
||||
|
||||
|
16
README.md
16
README.md
@ -51,22 +51,23 @@ For more features, please refer to [Roadmap](https://github.com/alibaba/spring-c
|
||||
For more features please refer to [Roadmap](https://github.com/alibaba/spring-cloud-alibaba/blob/master/Roadmap.md).
|
||||
|
||||
## How to build
|
||||
|
||||
* **master branch**: Corresponds to Spring Cloud Greenwich & Spring Boot 2.x. JDK 1.8 or later versions are supported.
|
||||
* **finchley branch**: Corresponds to Spring Cloud Finchley & Spring Boot 2.x. JDK 1.8 or later versions are supported.
|
||||
* **2020.0 branch**: Corresponds to Spring Cloud 2020 & Spring Boot 2.4.x. JDK 1.8 or later versions are supported.
|
||||
* **master branch**: Corresponds to Spring Cloud Hoxton & Spring Boot 2.2.x. JDK 1.8 or later versions are supported.
|
||||
* **greenwich branch**: Corresponds to Spring Cloud Greenwich & Spring Boot 2.1.x. JDK 1.8 or later versions are supported.
|
||||
* **finchley branch**: Corresponds to Spring Cloud Finchley & Spring Boot 2.0.x. JDK 1.8 or later versions are supported.
|
||||
* **1.x branch**: Corresponds to Spring Cloud Edgware & Spring Boot 1.x, JDK 1.7 or later versions are supported.
|
||||
|
||||
Spring Cloud uses Maven for most build-related activities, and you should be able to get off the ground quite quickly by cloning the project you are interested in and typing:
|
||||
|
||||
```bash
|
||||
./mvnw install
|
||||
|
||||
```
|
||||
|
||||
## How to Use
|
||||
|
||||
### Add maven dependency
|
||||
|
||||
These artifacts are available from Maven Central and Spring Release repository via BOM:
|
||||
|
||||
```xml
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@ -78,7 +79,7 @@ These artifacts are available from Maven Central and Spring Release repository v
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
```
|
||||
add the module in `dependencies`.
|
||||
|
||||
|
||||
@ -118,6 +119,7 @@ As the interfaces and annotations of Spring Boot 1 and Spring Boot 2 have been c
|
||||
* 2.0.x for Spring Boot 2.0.x
|
||||
* 2.1.x for Spring Boot 2.1.x
|
||||
* 2.2.x for Spring Boot 2.2.x
|
||||
* 2020.x for Spring Boot 2.4.x
|
||||
|
||||
## Code of Conduct
|
||||
This project is a sub-project of Spring Cloud, it adheres to the Contributor Covenant [code of conduct](https://github.com/spring-cloud/spring-cloud-build/blob/master/docs/src/main/asciidoc/code-of-conduct.adoc). By participating, you are expected to uphold this code. Please report unacceptable behavior to spring-code-of-conduct@pivotal.io.
|
||||
|
@ -19,9 +19,9 @@
|
||||
|
||||
<properties>
|
||||
<revision>2.2.6-SNAPSHOT</revision>
|
||||
<sentinel.version>1.8.0</sentinel.version>
|
||||
<sentinel.version>1.8.1</sentinel.version>
|
||||
<seata.version>1.3.0</seata.version>
|
||||
<nacos.client.version>1.4.1</nacos.client.version>
|
||||
<nacos.client.version>1.4.2</nacos.client.version>
|
||||
<nacos.config.version>0.8.0</nacos.config.version>
|
||||
<spring.context.support.version>1.0.10</spring.context.support.version>
|
||||
|
||||
|
@ -260,7 +260,7 @@ Nacos 内部有 https://nacos.io/zh-cn/docs/concepts.html[Namespace 的概念]:
|
||||
[quote]
|
||||
用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。
|
||||
|
||||
在没有明确指定 `${spring.cloud.nacos.config.namespace}` 配置的情况下, 默认使用的是 Nacos 上 Public 这个namespae。如果需要使用自定义的命名空间,可以通过以下配置来实现:
|
||||
在没有明确指定 `${spring.cloud.nacos.config.namespace}` 配置的情况下, 默认使用的是 Nacos 上 Public 这个namespace。如果需要使用自定义的命名空间,可以通过以下配置来实现:
|
||||
[source,properties]
|
||||
----
|
||||
spring.cloud.nacos.config.namespace=b3404bc0-d7dc-4855-b519-570ed34b62d7
|
||||
@ -348,7 +348,13 @@ Nacos Config 目前提供了三种配置能力从 Nacos 拉取相关的配置
|
||||
* B: 通过 `spring.cloud.nacos.config.ext-config[n].data-id` 的方式支持多个扩展 Data Id 的配置
|
||||
* C: 通过内部相关规则(应用名、应用名+ Profile )自动生成相关的 Data Id 配置
|
||||
|
||||
当三种方式共同使用时,他们的一个优先级关系是:A < B < C
|
||||
当三种方式共同使用时,他们的一个优先级关系是:
|
||||
|
||||
->A为优先级最高的
|
||||
|
||||
->B的优先级低于A,
|
||||
|
||||
->C的优先级是最低的
|
||||
|
||||
=== Nacos Config 对外暴露的 Endpoint
|
||||
|
||||
|
@ -57,7 +57,7 @@ Send messages:
|
||||
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
|
||||
```
|
||||
|
||||
Output when the message is successfuly sent: `SendResult [sendStatus=SEND_OK, msgId= ...`
|
||||
Output when the message is successfully sent: `SendResult [sendStatus=SEND_OK, msgId= ...`
|
||||
|
||||
Receive messages:
|
||||
|
||||
|
@ -82,7 +82,7 @@ Job Description: Empty
|
||||
Custom Parameters: Empty
|
||||
----
|
||||
|
||||
The job above is a “Simple Single-Server Job”, and speficied a Cron expression of "0 * * * * ?" . This means that the job will be executed once and once only in every minute.
|
||||
The job above is a “Simple Single-Server Job”, and specified a Cron expression of "0 * * * * ?" . This means that the job will be executed once and once only in every minute.
|
||||
|
||||
For more job types, refer to https://help.aliyun.com/document_detail/43136.html[SchedulerX Documentation].
|
||||
|
||||
|
@ -18,18 +18,18 @@ spring.cloud.nacos.password=nacos
|
||||
#spring.cloud.nacos.config.shared-data-ids=common.properties,base-common.properties
|
||||
|
||||
## recommended.
|
||||
spring.cloud.nacos.config.shared-configs[0].data-id= test2.yaml
|
||||
spring.cloud.nacos.config.shared-configs[0].data-id=test2.yaml
|
||||
spring.cloud.nacos.config.shared-configs[0].refresh=true
|
||||
## the default value is 'DEFAULT_GROUP' , if not specified.
|
||||
spring.cloud.nacos.config.shared-configs[0].group= GROUP_APP1
|
||||
spring.cloud.nacos.config.shared-configs[0].group=GROUP_APP1
|
||||
|
||||
## not recommended.
|
||||
#spring.cloud.nacos.config.ext-config[0]=ext.properties
|
||||
## recommended.
|
||||
spring.cloud.nacos.config.extension-configs[0].data-id= extension1.properties
|
||||
spring.cloud.nacos.config.extension-configs[0].data-id=extension1.properties
|
||||
spring.cloud.nacos.config.extension-configs[0].refresh=true
|
||||
spring.cloud.nacos.config.extension-configs[1].data-id= test1.yml
|
||||
spring.cloud.nacos.config.extension-configs[1].refresh= true
|
||||
spring.cloud.nacos.config.extension-configs[1].data-id=test1.yml
|
||||
spring.cloud.nacos.config.extension-configs[1].refresh=true
|
||||
|
||||
|
||||
|
||||
|
@ -2,6 +2,7 @@ spring.application.name=service-consumer
|
||||
server.port=18083
|
||||
management.endpoints.web.exposure.include=*
|
||||
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
|
||||
spring.cloud.nacos.discovery.fail-fast=true
|
||||
|
||||
spring.cloud.nacos.username=nacos
|
||||
spring.cloud.nacos.password=nacos
|
||||
|
@ -73,7 +73,7 @@ Provider端在application.properties文件中定义dubbo相关的配置,比如
|
||||
|
||||
定义具体的服务:
|
||||
|
||||
@Service(
|
||||
@DubboService(
|
||||
version = "${foo.service.version}",
|
||||
application = "${dubbo.application.id}",
|
||||
protocol = "${dubbo.protocol.id}",
|
||||
@ -111,7 +111,7 @@ Consumer端在服务调用之前,先定义限流规则。
|
||||
|
||||
根据Provider端中发布的定义,使用Dubbo的@Reference注解注入服务对应的Bean:
|
||||
|
||||
@Reference(version = "${foo.service.version}", application = "${dubbo.application.id}",
|
||||
@DubboReference(version = "${foo.service.version}", application = "${dubbo.application.id}",
|
||||
path = "dubbo://localhost:12345", timeout = 30000)
|
||||
private FooService fooService;
|
||||
|
||||
|
@ -16,14 +16,14 @@
|
||||
|
||||
package com.alibaba.cloud.examples;
|
||||
|
||||
import org.apache.dubbo.config.annotation.Reference;
|
||||
import org.apache.dubbo.config.annotation.DubboReference;
|
||||
|
||||
/**
|
||||
* @author fangjian
|
||||
*/
|
||||
public class FooServiceConsumer {
|
||||
|
||||
@Reference(version = "${foo.service.version}",
|
||||
@DubboReference(version = "${foo.service.version}",
|
||||
application = "${dubbo.application.id}",
|
||||
url = "dubbo://localhost:12345?version=1.0.0", timeout = 30000)
|
||||
private FooService fooService;
|
||||
|
@ -16,12 +16,12 @@
|
||||
|
||||
package com.alibaba.cloud.examples;
|
||||
|
||||
import org.apache.dubbo.config.annotation.Service;
|
||||
import org.apache.dubbo.config.annotation.DubboService;
|
||||
|
||||
/**
|
||||
* @author fangjian
|
||||
*/
|
||||
@Service(version = "${foo.service.version}", application = "${dubbo.application.id}",
|
||||
@DubboService(version = "${foo.service.version}", application = "${dubbo.application.id}",
|
||||
protocol = "${dubbo.protocol.id}", registry = "${dubbo.registry.id}")
|
||||
public class FooServiceImpl implements FooService {
|
||||
|
||||
|
@ -114,6 +114,8 @@ public class EchoController {
|
||||
|
||||
- 启动nacos 注册中心
|
||||
|
||||
- 启动sentinel
|
||||
|
||||
- 启动服务提供方:
|
||||
|
||||
1. IDE直接启动:找到主类 `ProviderApplication`,执行 main 方法启动应用。
|
||||
@ -123,3 +125,5 @@ public class EchoController {
|
||||
|
||||
1. IDE直接启动:找到主类 `ConsumerApplication`,执行 main 方法启动应用。
|
||||
2. 打包编译后启动:首先执行 `mvn clean package` 将工程编译打包,然后执行 `java -jar sentinel-feign-consumer-example.jar`启动应用。
|
||||
|
||||
- 启动之后,Sentinel Dashboard可能看不见service-consumer服务的详细信息,多请求几次接口即可。
|
||||
|
@ -8,6 +8,9 @@ spring:
|
||||
nacos:
|
||||
discovery:
|
||||
server-addr: 127.0.0.1:8848
|
||||
sentinel:
|
||||
transport:
|
||||
dashboard: 127.0.0.1:8081
|
||||
|
||||
feign:
|
||||
sentinel:
|
||||
|
@ -10,7 +10,6 @@
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-dubbo-client-sample</artifactId>
|
||||
<name>Spring Cloud Dubbo Client Sample</name>
|
||||
|
||||
|
@ -12,7 +12,11 @@ spring:
|
||||
allow-bean-definition-overriding: true
|
||||
cloud:
|
||||
nacos:
|
||||
username: nacos
|
||||
password: nacos
|
||||
discovery:
|
||||
server-addr: 127.0.0.1:8848
|
||||
username: nacos
|
||||
password: nacos
|
||||
server-addr: 127.0.0.1:8848
|
||||
namespace: public
|
||||
|
||||
server:
|
||||
port: 8080
|
@ -1,4 +1,6 @@
|
||||
dubbo:
|
||||
cloud:
|
||||
subscribed-services: ${spring.application.name}
|
||||
scan:
|
||||
base-packages: com.alibaba.cloud.dubbo.bootstrap
|
||||
protocol:
|
||||
|
@ -132,7 +132,6 @@ public class DataSourcePropertiesConfiguration {
|
||||
if (!ObjectUtils.isEmpty(field.get(this))) {
|
||||
return field.getName();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
// won't happen
|
||||
|
@ -86,9 +86,8 @@ public abstract class SentinelConverter<T extends Object>
|
||||
});
|
||||
|
||||
for (Object obj : sourceArray) {
|
||||
String item = null;
|
||||
try {
|
||||
item = objectMapper.writeValueAsString(obj);
|
||||
String item = objectMapper.writeValueAsString(obj);
|
||||
Optional.ofNullable(convertRule(item))
|
||||
.ifPresent(convertRule -> ruleCollection.add(convertRule));
|
||||
}
|
||||
|
@ -32,6 +32,16 @@ public class NacosConfigHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
private final ConfigService configService;
|
||||
|
||||
/**
|
||||
* status up .
|
||||
*/
|
||||
private final String STATUS_UP = "UP";
|
||||
|
||||
/**
|
||||
* status down .
|
||||
*/
|
||||
private final String STATUS_DOWN = "DOWN";
|
||||
|
||||
public NacosConfigHealthIndicator(ConfigService configService) {
|
||||
this.configService = configService;
|
||||
}
|
||||
@ -43,10 +53,10 @@ public class NacosConfigHealthIndicator extends AbstractHealthIndicator {
|
||||
// Set the status to Builder
|
||||
builder.status(status);
|
||||
switch (status) {
|
||||
case "UP":
|
||||
case STATUS_UP:
|
||||
builder.up();
|
||||
break;
|
||||
case "DOWN":
|
||||
case STATUS_DOWN:
|
||||
builder.down();
|
||||
break;
|
||||
default:
|
||||
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.logging;
|
||||
|
||||
import com.alibaba.nacos.client.logging.NacosLogging;
|
||||
|
||||
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.event.GenericApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
||||
/**
|
||||
* Reload nacos log configuration file, after
|
||||
* {@link org.springframework.boot.context.logging.LoggingApplicationListener}.
|
||||
*
|
||||
* @author mai.jh
|
||||
*/
|
||||
public class NacosLoggingListener implements GenericApplicationListener {
|
||||
|
||||
@Override
|
||||
public boolean supportsEventType(ResolvableType resolvableType) {
|
||||
Class<?> type = resolvableType.getRawClass();
|
||||
if (type != null) {
|
||||
return ApplicationEnvironmentPreparedEvent.class.isAssignableFrom(type);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationEvent applicationEvent) {
|
||||
NacosLogging.getInstance().loadConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE + 21;
|
||||
}
|
||||
|
||||
}
|
@ -7,4 +7,6 @@ org.springframework.boot.diagnostics.FailureAnalyzer=\
|
||||
com.alibaba.cloud.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer
|
||||
org.springframework.boot.env.PropertySourceLoader=\
|
||||
com.alibaba.cloud.nacos.parser.NacosJsonPropertySourceLoader,\
|
||||
com.alibaba.cloud.nacos.parser.NacosXmlPropertySourceLoader
|
||||
com.alibaba.cloud.nacos.parser.NacosXmlPropertySourceLoader
|
||||
org.springframework.context.ApplicationListener=\
|
||||
com.alibaba.cloud.nacos.logging.NacosLoggingListener
|
@ -207,6 +207,12 @@ public class NacosDiscoveryProperties {
|
||||
*/
|
||||
private boolean ephemeral = true;
|
||||
|
||||
/**
|
||||
* Throw exceptions during service registration if true, otherwise, log error
|
||||
* (defaults to true).
|
||||
*/
|
||||
private boolean failFast = true;
|
||||
|
||||
@Autowired
|
||||
private InetUtils inetUtils;
|
||||
|
||||
@ -486,6 +492,14 @@ public class NacosDiscoveryProperties {
|
||||
this.ephemeral = ephemeral;
|
||||
}
|
||||
|
||||
public boolean isFailFast() {
|
||||
return failFast;
|
||||
}
|
||||
|
||||
public void setFailFast(boolean failFast) {
|
||||
this.failFast = failFast;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
@ -510,6 +524,7 @@ public class NacosDiscoveryProperties {
|
||||
&& Objects.equals(secretKey, that.secretKey)
|
||||
&& Objects.equals(heartBeatInterval, that.heartBeatInterval)
|
||||
&& Objects.equals(heartBeatTimeout, that.heartBeatTimeout)
|
||||
&& Objects.equals(failFast, that.failFast)
|
||||
&& Objects.equals(ipDeleteTimeout, that.ipDeleteTimeout);
|
||||
}
|
||||
|
||||
@ -519,7 +534,7 @@ public class NacosDiscoveryProperties {
|
||||
watchDelay, logName, service, weight, clusterName, group,
|
||||
namingLoadCacheAtStart, registerEnabled, ip, networkInterface, port,
|
||||
secure, accessKey, secretKey, heartBeatInterval, heartBeatTimeout,
|
||||
ipDeleteTimeout, instanceEnabled, ephemeral);
|
||||
ipDeleteTimeout, instanceEnabled, ephemeral, failFast);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -535,7 +550,7 @@ public class NacosDiscoveryProperties {
|
||||
+ ", port=" + port + ", secure=" + secure + ", accessKey='" + accessKey
|
||||
+ '\'' + ", secretKey='" + secretKey + '\'' + ", heartBeatInterval="
|
||||
+ heartBeatInterval + ", heartBeatTimeout=" + heartBeatTimeout
|
||||
+ ", ipDeleteTimeout=" + ipDeleteTimeout + '}';
|
||||
+ ", ipDeleteTimeout=" + ipDeleteTimeout + ", failFast=" + failFast + '}';
|
||||
}
|
||||
|
||||
public void overrideFromEnv(Environment env) {
|
||||
|
@ -31,6 +31,16 @@ import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
*/
|
||||
public class NacosDiscoveryHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
/**
|
||||
* status up.
|
||||
*/
|
||||
private static final String STATUS_UP = "UP";
|
||||
|
||||
/**
|
||||
* status down.
|
||||
*/
|
||||
private static final String STATUS_DOWN = "DOWN";
|
||||
|
||||
private final NamingService namingService;
|
||||
|
||||
public NacosDiscoveryHealthIndicator(NamingService namingService) {
|
||||
@ -44,10 +54,10 @@ public class NacosDiscoveryHealthIndicator extends AbstractHealthIndicator {
|
||||
// Set the status to Builder
|
||||
builder.status(status);
|
||||
switch (status) {
|
||||
case "UP":
|
||||
case STATUS_UP:
|
||||
builder.up();
|
||||
break;
|
||||
case "DOWN":
|
||||
case STATUS_DOWN:
|
||||
builder.down();
|
||||
break;
|
||||
default:
|
||||
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.discovery.logging;
|
||||
|
||||
import com.alibaba.nacos.client.logging.NacosLogging;
|
||||
|
||||
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.event.GenericApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
||||
/**
|
||||
* Reload nacos log configuration file, after
|
||||
* {@link org.springframework.boot.context.logging.LoggingApplicationListener}.
|
||||
*
|
||||
* @author mai.jh
|
||||
*/
|
||||
public class NacosLoggingListener implements GenericApplicationListener {
|
||||
|
||||
@Override
|
||||
public boolean supportsEventType(ResolvableType resolvableType) {
|
||||
Class<?> type = resolvableType.getRawClass();
|
||||
if (type != null) {
|
||||
return ApplicationEnvironmentPreparedEvent.class.isAssignableFrom(type);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationEvent applicationEvent) {
|
||||
NacosLogging.getInstance().loadConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE + 21;
|
||||
}
|
||||
|
||||
}
|
@ -76,11 +76,15 @@ public class NacosServiceRegistry implements ServiceRegistry<Registration> {
|
||||
instance.getIp(), instance.getPort());
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("nacos registry, {} register failed...{},", serviceId,
|
||||
registration.toString(), e);
|
||||
// rethrow a RuntimeException if the registration is failed.
|
||||
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
|
||||
rethrowRuntimeException(e);
|
||||
if (nacosDiscoveryProperties.isFailFast()) {
|
||||
log.error("nacos registry, {} register failed...{},", serviceId,
|
||||
registration.toString(), e);
|
||||
rethrowRuntimeException(e);
|
||||
}
|
||||
else {
|
||||
log.warn("Failfast is false. {} register failed...{},", serviceId,
|
||||
registration.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -155,8 +159,9 @@ public class NacosServiceRegistry implements ServiceRegistry<Registration> {
|
||||
public Object getStatus(Registration registration) {
|
||||
|
||||
String serviceName = registration.getServiceId();
|
||||
String group = nacosDiscoveryProperties.getGroup();
|
||||
try {
|
||||
List<Instance> instances = namingService().getAllInstances(serviceName);
|
||||
List<Instance> instances = namingService().getAllInstances(serviceName, group);
|
||||
for (Instance instance : instances) {
|
||||
if (instance.getIp().equalsIgnoreCase(nacosDiscoveryProperties.getIp())
|
||||
&& instance.getPort() == nacosDiscoveryProperties.getPort()) {
|
||||
|
@ -9,3 +9,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
|
||||
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
|
||||
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
|
||||
org.springframework.context.ApplicationListener=\
|
||||
com.alibaba.cloud.nacos.discovery.logging.NacosLoggingListener
|
||||
|
@ -25,7 +25,7 @@ import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
|
||||
import com.alibaba.csp.sentinel.heartbeat.HeartbeatSenderProvider;
|
||||
import com.alibaba.csp.sentinel.transport.HeartbeatSender;
|
||||
import com.alibaba.csp.sentinel.transport.config.TransportConfig;
|
||||
import com.alibaba.csp.sentinel.util.function.Tuple2;
|
||||
import com.alibaba.csp.sentinel.transport.endpoint.Endpoint;
|
||||
|
||||
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
@ -84,8 +84,7 @@ public class SentinelHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
// Check health of Dashboard
|
||||
boolean dashboardUp = true;
|
||||
List<Tuple2<String, Integer>> consoleServerList = TransportConfig
|
||||
.getConsoleServerList();
|
||||
List<Endpoint> consoleServerList = TransportConfig.getConsoleServerList();
|
||||
if (CollectionUtils.isEmpty(consoleServerList)) {
|
||||
// If Dashboard isn't configured, it's OK and mark the status of Dashboard
|
||||
// with UNKNOWN.
|
||||
|
@ -31,7 +31,8 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
|
||||
import com.alibaba.csp.sentinel.transport.config.TransportConfig;
|
||||
import com.alibaba.csp.sentinel.util.function.Tuple2;
|
||||
import com.alibaba.csp.sentinel.transport.endpoint.Endpoint;
|
||||
import com.alibaba.csp.sentinel.transport.endpoint.Protocol;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -134,8 +135,10 @@ public class SentinelAutoConfigurationTests {
|
||||
Map<String, Object> map = sentinelEndpoint.invoke();
|
||||
|
||||
assertThat(map.get("logUsePid")).isEqualTo(Boolean.TRUE);
|
||||
assertThat(map.get("consoleServer").toString()).isEqualTo(
|
||||
Arrays.asList(Tuple2.of("localhost", 8080), Tuple2.of("localhost", 8081))
|
||||
assertThat(map.get("consoleServer").toString())
|
||||
.isEqualTo(Arrays
|
||||
.asList(new Endpoint(Protocol.HTTP, "localhost", 8080),
|
||||
new Endpoint(Protocol.HTTP, "localhost", 8081))
|
||||
.toString());
|
||||
assertThat(map.get("clientPort")).isEqualTo("9999");
|
||||
assertThat(map.get("heartbeatIntervalMs")).isEqualTo(20000L);
|
||||
@ -185,8 +188,10 @@ public class SentinelAutoConfigurationTests {
|
||||
@Test
|
||||
public void testSentinelSystemProperties() {
|
||||
assertThat(LogBase.isLogNameUsePid()).isEqualTo(true);
|
||||
assertThat(TransportConfig.getConsoleServerList().toString()).isEqualTo(
|
||||
Arrays.asList(Tuple2.of("localhost", 8080), Tuple2.of("localhost", 8081))
|
||||
assertThat(TransportConfig.getConsoleServerList().toString())
|
||||
.isEqualTo(Arrays
|
||||
.asList(new Endpoint(Protocol.HTTP, "localhost", 8080),
|
||||
new Endpoint(Protocol.HTTP, "localhost", 8081))
|
||||
.toString());
|
||||
assertThat(TransportConfig.getPort()).isEqualTo("9999");
|
||||
assertThat(TransportConfig.getHeartbeatIntervalMs().longValue())
|
||||
|
@ -51,6 +51,10 @@ public class DubboCloudProperties {
|
||||
|
||||
private String registryType = DUBBO_CLOUD_REGISTRY_PROPERTY_VALUE;
|
||||
|
||||
private int maxReSubscribeMetadataTimes = 1000;
|
||||
|
||||
private int reSubscribeMetadataIntervial = 5;
|
||||
|
||||
public String getSubscribedServices() {
|
||||
return subscribedServices;
|
||||
}
|
||||
@ -91,4 +95,20 @@ public class DubboCloudProperties {
|
||||
this.registryType = registryType;
|
||||
}
|
||||
|
||||
public int getMaxReSubscribeMetadataTimes() {
|
||||
return maxReSubscribeMetadataTimes;
|
||||
}
|
||||
|
||||
public void setMaxReSubscribeMetadataTimes(int maxReSubscribeMetadataTimes) {
|
||||
this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
|
||||
}
|
||||
|
||||
public int getReSubscribeMetadataIntervial() {
|
||||
return reSubscribeMetadataIntervial;
|
||||
}
|
||||
|
||||
public void setReSubscribeMetadataIntervial(int reSubscribeMetadataIntervial) {
|
||||
this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata;
|
||||
|
||||
import org.apache.dubbo.common.extension.Activate;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
|
||||
import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
|
||||
import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
|
||||
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
|
||||
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
|
||||
import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
|
||||
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
|
||||
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
|
||||
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
|
||||
|
||||
/**
|
||||
* Copy from org.apache.dubbo.metadata.DefaultMetadataParamsFilter.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
@Activate
|
||||
public class DefaultMetadataParamsFilter implements MetadataParamsFilter {
|
||||
|
||||
@Override
|
||||
public String[] serviceParamsIncluded() {
|
||||
return new String[] { CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY,
|
||||
CONNECTIONS_KEY, DEPRECATED_KEY, GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY,
|
||||
PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY, WEIGHT_KEY,
|
||||
DUBBO_VERSION_KEY, RELEASE_KEY };
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata;
|
||||
|
||||
import org.apache.dubbo.common.extension.SPI;
|
||||
|
||||
/**
|
||||
* Copy from org.apache.dubbo.metadata.MetadataParamsFilter.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
@SPI
|
||||
public interface MetadataParamsFilter {
|
||||
|
||||
/**
|
||||
* params that need to be sent to metadata center.
|
||||
* @return arrays of keys
|
||||
*/
|
||||
String[] serviceParamsIncluded();
|
||||
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
|
||||
import org.apache.dubbo.common.logger.Logger;
|
||||
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* Copy from org.apache.dubbo.metadata.RevisionResolver.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public final class RevisionResolver {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RevisionResolver.class);
|
||||
|
||||
private static final String EMPTY_REVISION = "0";
|
||||
|
||||
private static final char[] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8',
|
||||
'9', 'A', 'B', 'C', 'D', 'E', 'F' };
|
||||
|
||||
private static MessageDigest mdInst;
|
||||
|
||||
static {
|
||||
try {
|
||||
mdInst = MessageDigest.getInstance("MD5");
|
||||
}
|
||||
catch (NoSuchAlgorithmException e) {
|
||||
logger.error("Failed to calculate metadata revision", e);
|
||||
}
|
||||
}
|
||||
|
||||
private RevisionResolver() {
|
||||
|
||||
}
|
||||
|
||||
public static String getEmptyRevision() {
|
||||
return EMPTY_REVISION;
|
||||
}
|
||||
|
||||
public static String calRevision(String metadata) {
|
||||
mdInst.update(metadata.getBytes(UTF_8));
|
||||
byte[] md5 = mdInst.digest();
|
||||
|
||||
int j = md5.length;
|
||||
char[] str = new char[j * 2];
|
||||
int k = 0;
|
||||
for (byte byte0 : md5) {
|
||||
str[k++] = hexDigits[byte0 >>> 4 & 0xf];
|
||||
str[k++] = hexDigits[byte0 & 0xf];
|
||||
}
|
||||
return new String(str);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,351 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.compiler.support.ClassUtils;
|
||||
import org.apache.dubbo.common.extension.ExtensionLoader;
|
||||
import org.apache.dubbo.common.utils.ArrayUtils;
|
||||
import org.apache.dubbo.common.utils.StringUtils;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
|
||||
/**
|
||||
* Copy from org.apache.dubbo.metadata.MetadataInfo.ServiceInfo.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class ServiceInfo implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -258557978718735302L;
|
||||
|
||||
private static ExtensionLoader<MetadataParamsFilter> loader = ExtensionLoader
|
||||
.getExtensionLoader(MetadataParamsFilter.class);
|
||||
|
||||
private String name;
|
||||
|
||||
private String group;
|
||||
|
||||
private String version;
|
||||
|
||||
private String protocol;
|
||||
|
||||
private String path; // most of the time, path is the same with the interface name.
|
||||
|
||||
private Map<String, String> params;
|
||||
|
||||
// params configured on consumer side,
|
||||
private transient Map<String, String> consumerParams;
|
||||
|
||||
// cached method params
|
||||
private transient Map<String, Map<String, String>> methodParams;
|
||||
|
||||
private transient Map<String, Map<String, String>> consumerMethodParams;
|
||||
|
||||
// cached numbers
|
||||
private transient Map<String, Number> numbers;
|
||||
|
||||
private transient Map<String, Map<String, Number>> methodNumbers;
|
||||
|
||||
// service + group + version
|
||||
private transient String serviceKey;
|
||||
|
||||
// service + group + version + protocol
|
||||
private transient String matchKey;
|
||||
|
||||
private transient URL url;
|
||||
|
||||
public ServiceInfo() {
|
||||
}
|
||||
|
||||
public ServiceInfo(URL url) {
|
||||
this(url.getServiceInterface(), url.getParameter(GROUP_KEY),
|
||||
url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
|
||||
|
||||
this.url = url;
|
||||
Map<String, String> params = new HashMap<>();
|
||||
List<MetadataParamsFilter> filters = loader.getActivateExtension(url,
|
||||
"params-filter");
|
||||
for (MetadataParamsFilter filter : filters) {
|
||||
String[] paramsIncluded = filter.serviceParamsIncluded();
|
||||
if (ArrayUtils.isNotEmpty(paramsIncluded)) {
|
||||
for (String p : paramsIncluded) {
|
||||
String value = url.getParameter(p);
|
||||
if (StringUtils.isNotEmpty(value) && params.get(p) == null) {
|
||||
params.put(p, value);
|
||||
}
|
||||
String[] methods = url.getParameter(METHODS_KEY, (String[]) null);
|
||||
if (methods != null) {
|
||||
for (String method : methods) {
|
||||
String mValue = getMethodParameterStrict(url, method, p);
|
||||
if (StringUtils.isNotEmpty(mValue)) {
|
||||
params.put(method + DOT_SEPARATOR + p, mValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
public String getMethodParameterStrict(URL url, String method, String key) {
|
||||
Map<String, String> keyMap = url.getMethodParameters().get(method);
|
||||
String value = null;
|
||||
if (keyMap != null) {
|
||||
value = keyMap.get(key);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public ServiceInfo(String name, String group, String version, String protocol,
|
||||
String path, Map<String, String> params) {
|
||||
this.name = name;
|
||||
this.group = group;
|
||||
this.version = version;
|
||||
this.protocol = protocol;
|
||||
this.path = path;
|
||||
this.params = params == null ? new HashMap<>() : params;
|
||||
|
||||
this.serviceKey = URL.buildKey(name, group, version);
|
||||
this.matchKey = buildMatchKey();
|
||||
}
|
||||
|
||||
public String getMatchKey() {
|
||||
if (matchKey != null) {
|
||||
return matchKey;
|
||||
}
|
||||
buildMatchKey();
|
||||
return matchKey;
|
||||
}
|
||||
|
||||
private String buildMatchKey() {
|
||||
matchKey = getServiceKey();
|
||||
if (StringUtils.isNotEmpty(protocol)) {
|
||||
matchKey = getServiceKey() + GROUP_CHAR_SEPARATOR + protocol;
|
||||
}
|
||||
return matchKey;
|
||||
}
|
||||
|
||||
public String getServiceKey() {
|
||||
if (serviceKey != null) {
|
||||
return serviceKey;
|
||||
}
|
||||
this.serviceKey = URL.buildKey(name, group, version);
|
||||
return serviceKey;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getGroup() {
|
||||
return group;
|
||||
}
|
||||
|
||||
public void setGroup(String group) {
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public void setVersion(String version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public void setPath(String path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public Map<String, String> getParams() {
|
||||
if (params == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
public void setParams(Map<String, String> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
public Map<String, String> getAllParams() {
|
||||
if (consumerParams != null) {
|
||||
Map<String, String> allParams = new HashMap<>(
|
||||
(int) ((params.size() + consumerParams.size()) / 0.75f + 1));
|
||||
allParams.putAll(params);
|
||||
allParams.putAll(consumerParams);
|
||||
return allParams;
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
public String getParameter(String key) {
|
||||
if (consumerParams != null) {
|
||||
String value = consumerParams.get(key);
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return params.get(key);
|
||||
}
|
||||
|
||||
public String getMethodParameter(String method, String key, String defaultValue) {
|
||||
if (methodParams == null) {
|
||||
methodParams = URL.toMethodParameters(params);
|
||||
consumerMethodParams = URL.toMethodParameters(consumerParams);
|
||||
}
|
||||
|
||||
String value = getMethodParameter(method, key, consumerMethodParams);
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
value = getMethodParameter(method, key, methodParams);
|
||||
return value == null ? defaultValue : value;
|
||||
}
|
||||
|
||||
private String getMethodParameter(String method, String key,
|
||||
Map<String, Map<String, String>> map) {
|
||||
Map<String, String> keyMap = map.get(method);
|
||||
String value = null;
|
||||
if (keyMap != null) {
|
||||
value = keyMap.get(key);
|
||||
}
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
value = getParameter(key);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public boolean hasMethodParameter(String method, String key) {
|
||||
String value = this.getMethodParameter(method, key, (String) null);
|
||||
return StringUtils.isNotEmpty(value);
|
||||
}
|
||||
|
||||
public boolean hasMethodParameter(String method) {
|
||||
if (methodParams == null) {
|
||||
methodParams = URL.toMethodParameters(params);
|
||||
consumerMethodParams = URL.toMethodParameters(consumerParams);
|
||||
}
|
||||
|
||||
return consumerMethodParams.containsKey(method)
|
||||
|| methodParams.containsKey(method);
|
||||
}
|
||||
|
||||
public String toDescString() {
|
||||
return this.getMatchKey() + getMethodSignaturesString() + getParams();
|
||||
}
|
||||
|
||||
private String getMethodSignaturesString() {
|
||||
SortedSet<String> methodStrings = new TreeSet();
|
||||
|
||||
Method[] methods = ClassUtils.forName(name).getMethods();
|
||||
for (Method method : methods) {
|
||||
methodStrings.add(method.toString());
|
||||
}
|
||||
return methodStrings.toString();
|
||||
}
|
||||
|
||||
public void addParameter(String key, String value) {
|
||||
if (consumerParams != null) {
|
||||
this.consumerParams.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
public void addParameterIfAbsent(String key, String value) {
|
||||
if (consumerParams != null) {
|
||||
this.consumerParams.putIfAbsent(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
public void addConsumerParams(Map<String, String> params) {
|
||||
// copy once for one service subscription
|
||||
if (consumerParams == null) {
|
||||
consumerParams = new HashMap<>(params);
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Number> getNumbers() {
|
||||
// concurrent initialization is tolerant
|
||||
if (numbers == null) {
|
||||
numbers = new ConcurrentHashMap<>();
|
||||
}
|
||||
return numbers;
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Number>> getMethodNumbers() {
|
||||
if (methodNumbers == null) { // concurrent initialization is tolerant
|
||||
methodNumbers = new ConcurrentHashMap<>();
|
||||
}
|
||||
return methodNumbers;
|
||||
}
|
||||
|
||||
public URL getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof ServiceInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ServiceInfo serviceInfo = (ServiceInfo) obj;
|
||||
return this.getMatchKey().equals(serviceInfo.getMatchKey())
|
||||
&& this.getParams().equals(serviceInfo.getParams());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getMatchKey(), getParams());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "service{" + "name='" + name + "'," + "group='" + group + "',"
|
||||
+ "version='" + version + "'," + "protocol='" + protocol + "',"
|
||||
+ "params=" + params + "," + "consumerParams=" + consumerParams + "}";
|
||||
}
|
||||
|
||||
}
|
@ -16,9 +16,11 @@
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata.repository;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -31,6 +33,8 @@ import com.alibaba.cloud.dubbo.env.DubboCloudProperties;
|
||||
import com.alibaba.cloud.dubbo.http.matcher.RequestMetadataMatcher;
|
||||
import com.alibaba.cloud.dubbo.metadata.DubboRestServiceMetadata;
|
||||
import com.alibaba.cloud.dubbo.metadata.RequestMetadata;
|
||||
import com.alibaba.cloud.dubbo.metadata.RevisionResolver;
|
||||
import com.alibaba.cloud.dubbo.metadata.ServiceInfo;
|
||||
import com.alibaba.cloud.dubbo.metadata.ServiceRestMetadata;
|
||||
import com.alibaba.cloud.dubbo.registry.event.SubscribedServicesChangedEvent;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataService;
|
||||
@ -41,6 +45,8 @@ import com.alibaba.cloud.dubbo.util.JSONUtils;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.constants.CommonConstants;
|
||||
import org.apache.dubbo.common.utils.CollectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -94,6 +100,12 @@ public class DubboServiceMetadataRepository
|
||||
@Deprecated
|
||||
public static final String DUBBO_METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_URLS_PROPERTY_NAME;
|
||||
|
||||
/**
|
||||
* The key of dubbo metadata revision. copyed from
|
||||
* ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME.
|
||||
*/
|
||||
public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision";
|
||||
|
||||
/**
|
||||
* The {@link String#format(String, Object...) pattern} of dubbo protocols port.
|
||||
*/
|
||||
@ -115,14 +127,11 @@ public class DubboServiceMetadataRepository
|
||||
*/
|
||||
private final MultiValueMap<String, URL> allExportedURLs = new LinkedMultiValueMap<>();
|
||||
|
||||
// =================================== Registration
|
||||
// =================================== //
|
||||
// ======================== Registration ======================== //
|
||||
|
||||
// ====================================================================================
|
||||
// //
|
||||
// ============================================================== //
|
||||
|
||||
// =================================== Subscription
|
||||
// =================================== //
|
||||
// ======================== Subscription ======================== //
|
||||
/**
|
||||
* A Map to store REST metadata temporary, its' key is the special service name for a
|
||||
* Dubbo service, the value is a JSON content of JAX-RS or Spring MVC REST metadata
|
||||
@ -132,11 +141,9 @@ public class DubboServiceMetadataRepository
|
||||
|
||||
private ApplicationEventPublisher applicationEventPublisher;
|
||||
|
||||
// ====================================================================================
|
||||
// //
|
||||
// =============================================================== //
|
||||
|
||||
// =================================== REST Metadata
|
||||
// ================================== //
|
||||
// ======================== REST Metadata ======================== //
|
||||
private volatile Set<String> subscribedServices = emptySet();
|
||||
|
||||
/**
|
||||
@ -145,11 +152,9 @@ public class DubboServiceMetadataRepository
|
||||
*/
|
||||
private Map<String, Map<RequestMetadataMatcher, DubboRestServiceMetadata>> dubboRestServiceMetadataRepository = newHashMap();
|
||||
|
||||
// ====================================================================================
|
||||
// //
|
||||
// =============================================================== //
|
||||
|
||||
// =================================== Dependencies
|
||||
// =================================== //
|
||||
// ======================== Dependencies ========================= //
|
||||
|
||||
@Autowired
|
||||
private DubboCloudProperties dubboCloudProperties;
|
||||
@ -178,8 +183,7 @@ public class DubboServiceMetadataRepository
|
||||
@Autowired
|
||||
private DubboMetadataServiceExporter dubboMetadataServiceExporter;
|
||||
|
||||
// ====================================================================================
|
||||
// //
|
||||
// =============================================================== //
|
||||
|
||||
private static <K, V> Map<K, V> getMap(Map<String, Map<K, V>> repository,
|
||||
String key) {
|
||||
@ -187,12 +191,7 @@ public class DubboServiceMetadataRepository
|
||||
}
|
||||
|
||||
private static <K, V> V getOrDefault(Map<K, V> source, K key, V defaultValue) {
|
||||
V value = source.get(key);
|
||||
if (value == null) {
|
||||
value = defaultValue;
|
||||
source.put(key, value);
|
||||
}
|
||||
return value;
|
||||
return source.computeIfAbsent(key, k -> defaultValue);
|
||||
}
|
||||
|
||||
private static <K, V> Map<K, V> newHashMap() {
|
||||
@ -237,9 +236,6 @@ public class DubboServiceMetadataRepository
|
||||
dispatchEvent(new SubscribedServicesChangedEvent(this, oldSubscribedServices,
|
||||
newSubscribedServices));
|
||||
|
||||
// clear old one, help GC
|
||||
oldSubscribedServices.clear();
|
||||
|
||||
return newSubscribedServices.stream();
|
||||
}
|
||||
|
||||
@ -249,13 +245,14 @@ public class DubboServiceMetadataRepository
|
||||
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
initializeMetadata();
|
||||
// inited by DubboCloudRegistry.preInit() @theonefx
|
||||
// initializeMetadata();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the metadata.
|
||||
*/
|
||||
private void initializeMetadata() {
|
||||
public void initializeMetadata() {
|
||||
doGetSubscribedServices().forEach(this::initializeMetadata);
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("The metadata of Dubbo services has been initialized");
|
||||
@ -302,12 +299,36 @@ public class DubboServiceMetadataRepository
|
||||
|
||||
addDubboMetadataServiceURLsMetadata(metadata, dubboMetadataServiceURLs);
|
||||
addDubboProtocolsPortMetadata(metadata);
|
||||
addRevision(metadata);
|
||||
|
||||
return Collections.unmodifiableMap(metadata);
|
||||
}
|
||||
|
||||
private void addRevision(Map<String, String> metadata) {
|
||||
metadata.put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, calAndGetRevision());
|
||||
}
|
||||
|
||||
public String calAndGetRevision() {
|
||||
if (CollectionUtils.isEmptyMap(allExportedURLs)) {
|
||||
return RevisionResolver.getEmptyRevision();
|
||||
}
|
||||
else {
|
||||
List<String> descs = new ArrayList<>(allExportedURLs.size());
|
||||
for (Map.Entry<String, List<URL>> entry : allExportedURLs.entrySet()) {
|
||||
entry.getValue().stream().map(ServiceInfo::new)
|
||||
.map(ServiceInfo::toDescString).forEach(descs::add);
|
||||
}
|
||||
|
||||
descs.sort(String::compareTo);
|
||||
|
||||
return RevisionResolver.calRevision(descs.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private void removeDubboMetadataServiceURLs(List<URL> dubboMetadataServiceURLs) {
|
||||
dubboMetadataServiceURLs.forEach(this::unexportURL);
|
||||
dubboMetadataServiceURLs.stream().map(URL::getServiceKey).distinct()
|
||||
.forEach(allExportedURLs::remove);
|
||||
// dubboMetadataServiceURLs.forEach(this::unexportURL);
|
||||
}
|
||||
|
||||
private void addDubboMetadataServiceURLsMetadata(Map<String, String> metadata,
|
||||
@ -407,8 +428,34 @@ public class DubboServiceMetadataRepository
|
||||
|
||||
public List<URL> getExportedURLs(String serviceInterface, String group,
|
||||
String version) {
|
||||
String serviceKey = URL.buildKey(serviceInterface, group, version);
|
||||
return allExportedURLs.getOrDefault(serviceKey, Collections.emptyList());
|
||||
if (group != null) {
|
||||
List<URL> urls = new LinkedList<>();
|
||||
if (CommonConstants.ANY_VALUE.equals(group)) {
|
||||
String serviceKey = URL.buildKey(serviceInterface, group, version);
|
||||
String expectKey = serviceKey.substring(2);
|
||||
for (String key : allExportedURLs.keySet()) {
|
||||
if (key.endsWith(expectKey)) {
|
||||
urls.addAll(allExportedURLs.get(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
String[] groups = group.split(CommonConstants.COMMA_SEPARATOR);
|
||||
for (String expectKey : groups) {
|
||||
String serviceKey = URL.buildKey(serviceInterface, expectKey,
|
||||
version);
|
||||
List<URL> urlList = allExportedURLs.get(serviceKey);
|
||||
if (urlList != null) {
|
||||
urls.addAll(urlList);
|
||||
}
|
||||
}
|
||||
}
|
||||
return urls;
|
||||
}
|
||||
else {
|
||||
String serviceKey = URL.buildKey(serviceInterface, null, version);
|
||||
return allExportedURLs.getOrDefault(serviceKey, Collections.emptyList());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.dubbo.common.URLBuilder.from;
|
||||
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
|
||||
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
|
||||
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public abstract class AbstractServiceSubscribeHandler {
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
protected final URL url;
|
||||
|
||||
protected final NotifyListener listener;
|
||||
|
||||
protected final DubboCloudRegistry registry;
|
||||
|
||||
public AbstractServiceSubscribeHandler(URL url, NotifyListener listener,
|
||||
DubboCloudRegistry registry) {
|
||||
this.url = url;
|
||||
this.listener = listener;
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
protected void notifyAllSubscribedURLs(URL url, List<URL> subscribedURLs,
|
||||
NotifyListener listener) {
|
||||
|
||||
if (isEmpty(subscribedURLs)) {
|
||||
// Add the EMPTY_PROTOCOL URL
|
||||
listener.notify(Collections.singletonList(emptyURL(url)));
|
||||
// if (isDubboMetadataServiceURL(url)) {
|
||||
// if meta service change, and serviceInstances is zero, will clean up
|
||||
// information about this client
|
||||
// String serviceName = url.getParameter(GROUP_KEY);
|
||||
// repository.removeMetadataAndInitializedService(serviceName, url);
|
||||
// }
|
||||
}
|
||||
else {
|
||||
// Notify all
|
||||
listener.notify(subscribedURLs);
|
||||
}
|
||||
}
|
||||
|
||||
private URL emptyURL(URL url) {
|
||||
// issue : When the last service provider is closed, the client still periodically
|
||||
// connects to the last provider.n
|
||||
// fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
|
||||
return from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY)
|
||||
.build();
|
||||
}
|
||||
|
||||
private final AtomicBoolean inited = new AtomicBoolean(false);
|
||||
|
||||
public void init() {
|
||||
if (inited.compareAndSet(false, true)) {
|
||||
doInit();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doInit();
|
||||
|
||||
}
|
@ -16,25 +16,27 @@
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.alibaba.cloud.commons.lang.StringUtils;
|
||||
import com.alibaba.cloud.dubbo.metadata.RevisionResolver;
|
||||
import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository;
|
||||
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
|
||||
import com.alibaba.cloud.dubbo.service.DubboGenericServiceFactory;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataService;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy;
|
||||
import com.alibaba.cloud.dubbo.util.DubboMetadataUtils;
|
||||
import com.alibaba.cloud.dubbo.util.JSONUtils;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.URLBuilder;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
import org.apache.dubbo.registry.support.FailbackRegistry;
|
||||
import org.slf4j.Logger;
|
||||
@ -44,25 +46,15 @@ import org.springframework.cloud.client.ServiceInstance;
|
||||
import org.springframework.cloud.client.discovery.DiscoveryClient;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.stream.StreamSupport.stream;
|
||||
import static org.apache.dubbo.common.URLBuilder.from;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
|
||||
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
|
||||
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
|
||||
import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
|
||||
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME;
|
||||
import static org.springframework.util.StringUtils.hasText;
|
||||
@ -71,22 +63,14 @@ import static org.springframework.util.StringUtils.hasText;
|
||||
* Dubbo Cloud {@link FailbackRegistry} is based on Spring Cloud {@link DiscoveryClient}.
|
||||
*
|
||||
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class DubboCloudRegistry extends FailbackRegistry {
|
||||
|
||||
/**
|
||||
* The parameter name of {@link #servicesLookupInterval}.
|
||||
*/
|
||||
public static final String SERVICES_LOOKUP_INTERVAL_PARAM_NAME = "dubbo.services.lookup.interval";
|
||||
public class DubboCloudRegistry extends FailbackRegistry
|
||||
implements ApplicationListener<ServiceInstancesChangedEvent> {
|
||||
|
||||
protected static final String DUBBO_METADATA_SERVICE_CLASS_NAME = DubboMetadataService.class
|
||||
.getName();
|
||||
|
||||
/**
|
||||
* Caches the IDs of {@link ApplicationListener}.
|
||||
*/
|
||||
private static final Set<String> REGISTER_LISTENERS = new HashSet<>();
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private final DiscoveryClient discoveryClient;
|
||||
@ -97,275 +81,373 @@ public class DubboCloudRegistry extends FailbackRegistry {
|
||||
|
||||
private final JSONUtils jsonUtils;
|
||||
|
||||
private final DubboGenericServiceFactory dubboGenericServiceFactory;
|
||||
|
||||
private final DubboMetadataUtils dubboMetadataUtils;
|
||||
|
||||
/**
|
||||
* The interval in second of lookup service names(only for Dubbo-OPS).
|
||||
*/
|
||||
private final long servicesLookupInterval;
|
||||
|
||||
private final ConfigurableApplicationContext applicationContext;
|
||||
|
||||
private final String currentApplicationName;
|
||||
private final ReSubscribeManager reSubscribeManager;
|
||||
|
||||
private final AtomicBoolean inited = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* {subscribedURL : ServiceSubscribeHandler}.
|
||||
*/
|
||||
private final Map<URL, GenearalServiceSubscribeHandler> urlSubscribeHandlerMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* {appName: MetadataServiceSubscribeHandler}.
|
||||
*/
|
||||
private final Map<String, MetadataServiceSubscribeHandler> metadataSubscribeHandlerMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* {appName : {revision: [instances]}}.
|
||||
*/
|
||||
private final Map<String, Map<String, List<ServiceInstance>>> serviceRevisionInstanceMap = new ConcurrentHashMap<>();
|
||||
|
||||
public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient,
|
||||
DubboServiceMetadataRepository repository,
|
||||
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy,
|
||||
JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory,
|
||||
ConfigurableApplicationContext applicationContext) {
|
||||
JSONUtils jsonUtils, ConfigurableApplicationContext applicationContext) {
|
||||
|
||||
super(url);
|
||||
this.servicesLookupInterval = url
|
||||
.getParameter(SERVICES_LOOKUP_INTERVAL_PARAM_NAME, 60L);
|
||||
this.discoveryClient = discoveryClient;
|
||||
this.repository = repository;
|
||||
this.dubboMetadataConfigServiceProxy = dubboMetadataConfigServiceProxy;
|
||||
this.jsonUtils = jsonUtils;
|
||||
this.dubboGenericServiceFactory = dubboGenericServiceFactory;
|
||||
this.applicationContext = applicationContext;
|
||||
this.dubboMetadataUtils = getBean(DubboMetadataUtils.class);
|
||||
this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName();
|
||||
this.reSubscribeManager = new ReSubscribeManager(this);
|
||||
}
|
||||
|
||||
private <T> T getBean(Class<T> beanClass) {
|
||||
private void preInit() {
|
||||
if (inited.compareAndSet(false, true)) {
|
||||
Set<String> subscribeApps = getServices(null);
|
||||
|
||||
for (String appName : subscribeApps) {
|
||||
List<ServiceInstance> instances = discoveryClient.getInstances(appName);
|
||||
|
||||
Map<String, List<ServiceInstance>> map = serviceRevisionInstanceMap
|
||||
.computeIfAbsent(appName, k -> new HashMap<>());
|
||||
|
||||
for (ServiceInstance instance : instances) {
|
||||
String revision = getRevision(instance);
|
||||
List<ServiceInstance> list = map.computeIfAbsent(revision,
|
||||
k -> new ArrayList<>());
|
||||
list.add(instance);
|
||||
}
|
||||
|
||||
if (map.size() == 0) {
|
||||
logger.debug("APP {} preInited, instance siez is zero!!", appName);
|
||||
}
|
||||
else {
|
||||
map.forEach((revision, list) -> logger.debug(
|
||||
"APP {} revision {} preInited, instance size = {}", appName,
|
||||
revision, list.size()));
|
||||
}
|
||||
}
|
||||
|
||||
metadataSubscribeHandlerMap.forEach((url, handler) -> handler.init());
|
||||
urlSubscribeHandlerMap.forEach((url, handler) -> handler.init());
|
||||
repository.initializeMetadata();
|
||||
|
||||
// meke sure everything prepared, then can listening
|
||||
// ServiceInstanceChangeEvent
|
||||
applicationContext.addApplicationListener(this);
|
||||
|
||||
logger.info("DubboCloudRegistry preInit Done.");
|
||||
}
|
||||
}
|
||||
|
||||
protected <T> T getBean(Class<T> beanClass) {
|
||||
return this.applicationContext.getBean(beanClass);
|
||||
}
|
||||
|
||||
protected boolean shouldRegister(URL url) {
|
||||
protected boolean shouldNotRegister(URL url) {
|
||||
String side = url.getParameter(SIDE_KEY);
|
||||
|
||||
boolean should = PROVIDER_SIDE.equals(side); // Only register the Provider.
|
||||
|
||||
if (!should) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("The URL[{}] should not be registered.", url.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (!should) {
|
||||
logger.debug("The URL should NOT!! be registered & unregistered [{}] .",
|
||||
url);
|
||||
}
|
||||
else {
|
||||
logger.debug("The URL should be registered & unregistered [{}] .", url);
|
||||
}
|
||||
}
|
||||
|
||||
return should;
|
||||
return !should;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void doRegister(URL url) {
|
||||
if (!shouldRegister(url)) {
|
||||
return;
|
||||
synchronized (this) {
|
||||
preInit();
|
||||
if (shouldNotRegister(url)) {
|
||||
return;
|
||||
}
|
||||
repository.exportURL(url);
|
||||
}
|
||||
repository.exportURL(url);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void doUnregister(URL url) {
|
||||
if (!shouldRegister(url)) {
|
||||
return;
|
||||
synchronized (this) {
|
||||
preInit();
|
||||
if (shouldNotRegister(url)) {
|
||||
return;
|
||||
}
|
||||
repository.unexportURL(url);
|
||||
}
|
||||
repository.unexportURL(url);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void doSubscribe(URL url, NotifyListener listener) {
|
||||
|
||||
if (isAdminURL(url)) {
|
||||
// TODO in future
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("This feature about admin will be supported in the future.");
|
||||
synchronized (this) {
|
||||
preInit();
|
||||
if (isAdminURL(url)) {
|
||||
// TODO in future
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"This feature about admin will be supported in the future.");
|
||||
}
|
||||
}
|
||||
else if (isDubboMetadataServiceURL(url) && containsProviderCategory(url)) {
|
||||
// for DubboMetadataService
|
||||
String appName = getServiceName(url);
|
||||
MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler(
|
||||
appName, url, listener, this, dubboMetadataUtils);
|
||||
if (inited.get()) {
|
||||
handler.init();
|
||||
}
|
||||
metadataSubscribeHandlerMap.put(appName, handler);
|
||||
}
|
||||
else if (isConsumerServiceURL(url)) {
|
||||
// for general Dubbo Services
|
||||
GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler(
|
||||
url, listener, this, repository, jsonUtils,
|
||||
dubboMetadataConfigServiceProxy);
|
||||
if (inited.get()) {
|
||||
handler.init();
|
||||
}
|
||||
urlSubscribeHandlerMap.put(url, handler);
|
||||
}
|
||||
}
|
||||
else if (isDubboMetadataServiceURL(url)) { // for DubboMetadataService
|
||||
subscribeDubboMetadataServiceURLs(url, listener);
|
||||
}
|
||||
else { // for general Dubbo Services
|
||||
subscribeURLs(url, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void subscribeURLs(URL url, NotifyListener listener) {
|
||||
|
||||
// Sync subscription
|
||||
subscribeURLs(url, getServices(url), listener);
|
||||
|
||||
// Async subscription
|
||||
registerServiceInstancesChangedListener(url,
|
||||
|
||||
new ApplicationListener<ServiceInstancesChangedEvent>() {
|
||||
|
||||
private final URL url2subscribe = url;
|
||||
|
||||
@Override
|
||||
@Order
|
||||
public void onApplicationEvent(ServiceInstancesChangedEvent event) {
|
||||
Set<String> serviceNames = getServices(url);
|
||||
|
||||
String serviceName = event.getServiceName();
|
||||
|
||||
if (serviceNames.contains(serviceName)) {
|
||||
subscribeURLs(url, serviceNames, listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ServiceInstancesChangedEventListener:"
|
||||
+ url.getServiceKey();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void subscribeURLs(URL url, Set<String> serviceNames,
|
||||
NotifyListener listener) {
|
||||
|
||||
List<URL> subscribedURLs = new LinkedList<>();
|
||||
|
||||
serviceNames.forEach(serviceName -> {
|
||||
|
||||
subscribeURLs(url, subscribedURLs, serviceName,
|
||||
() -> getServiceInstances(serviceName));
|
||||
|
||||
});
|
||||
|
||||
// Notify all
|
||||
notifyAllSubscribedURLs(url, subscribedURLs, listener);
|
||||
}
|
||||
|
||||
private void registerServiceInstancesChangedListener(URL url,
|
||||
ApplicationListener<ServiceInstancesChangedEvent> listener) {
|
||||
String listenerId = generateId(url);
|
||||
if (REGISTER_LISTENERS.add(listenerId)) {
|
||||
applicationContext.addApplicationListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs,
|
||||
String serviceName,
|
||||
Supplier<List<ServiceInstance>> serviceInstancesSupplier) {
|
||||
List<ServiceInstance> serviceInstances = serviceInstancesSupplier.get();
|
||||
subscribeURLs(subscribedURL, subscribedURLs, serviceName, serviceInstances);
|
||||
}
|
||||
|
||||
private void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs,
|
||||
String serviceName, List<ServiceInstance> serviceInstances) {
|
||||
|
||||
if (CollectionUtils.isEmpty(serviceInstances)) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(format("There is no instance in service[name : %s]",
|
||||
serviceName));
|
||||
}
|
||||
}
|
||||
|
||||
List<URL> exportedURLs = getExportedURLs(subscribedURL, serviceName,
|
||||
serviceInstances);
|
||||
|
||||
/**
|
||||
* Add the exported URLs from {@link MetadataService}
|
||||
*/
|
||||
subscribedURLs.addAll(exportedURLs);
|
||||
}
|
||||
|
||||
private List<URL> getExportedURLs(URL subscribedURL, String serviceName,
|
||||
List<ServiceInstance> serviceInstances) {
|
||||
|
||||
List<ServiceInstance> validServiceInstances = filter(serviceInstances);
|
||||
|
||||
// If there is no valid ServiceInstance, return empty result
|
||||
if (isEmpty(validServiceInstances)) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"There is no instance from service[name : {}], and then Dubbo Service[key : {}] will not be "
|
||||
+ "available , please make sure the further impact",
|
||||
serviceName, subscribedURL.getServiceKey());
|
||||
}
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
List<URL> subscribedURLs = cloneExportedURLs(subscribedURL, serviceInstances);
|
||||
|
||||
// clear local service instances, help GC
|
||||
validServiceInstances.clear();
|
||||
|
||||
return subscribedURLs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone the subscribed URLs based on the template URLs.
|
||||
* @param subscribedURL the URL to be subscribed
|
||||
* @param serviceInstances the list of {@link ServiceInstance service instances}
|
||||
* @return non-null
|
||||
* Process ServiceInstanceChangedEvent, refresh dubbo reference and metadata info.
|
||||
*/
|
||||
private List<URL> cloneExportedURLs(URL subscribedURL,
|
||||
List<ServiceInstance> serviceInstances) {
|
||||
@Override
|
||||
public void onApplicationEvent(ServiceInstancesChangedEvent event) {
|
||||
|
||||
List<URL> clonedExportedURLs = new LinkedList<>();
|
||||
String appName = event.getServiceName();
|
||||
|
||||
serviceInstances.forEach(serviceInstance -> {
|
||||
List<ServiceInstance> instances = filter(
|
||||
event.getServiceInstances() != null ? event.getServiceInstances()
|
||||
: Collections.emptyList());
|
||||
|
||||
String host = serviceInstance.getHost();
|
||||
Set<String> subscribedServiceNames = getServices(null);
|
||||
|
||||
getTemplateExportedURLs(subscribedURL, serviceInstances).stream()
|
||||
.map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
|
||||
.map(templateURL -> templateURL.removeParameter(PID_KEY))
|
||||
.map(templateURL -> {
|
||||
String protocol = templateURL.getProtocol();
|
||||
Integer port = repository.getDubboProtocolPort(serviceInstance,
|
||||
protocol);
|
||||
if (Objects.equals(templateURL.getHost(), host)
|
||||
&& Objects.equals(templateURL.getPort(), port)) { // use
|
||||
// templateURL
|
||||
// if
|
||||
// equals
|
||||
return templateURL;
|
||||
}
|
||||
if (!subscribedServiceNames.contains(appName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (port == null) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"The protocol[{}] port of Dubbo service instance[host : {}] "
|
||||
+ "can't be resolved",
|
||||
protocol, host);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
URLBuilder clonedURLBuilder = from(templateURL) // remove the
|
||||
// parameters from
|
||||
// the template
|
||||
// URL
|
||||
.setHost(host) // reset the host
|
||||
.setPort(port); // reset the port
|
||||
|
||||
return clonedURLBuilder.build();
|
||||
}
|
||||
|
||||
}).filter(Objects::nonNull).forEach(clonedExportedURLs::add);
|
||||
});
|
||||
return clonedExportedURLs;
|
||||
}
|
||||
|
||||
private List<URL> getTemplateExportedURLs(URL subscribedURL,
|
||||
List<ServiceInstance> serviceInstances) {
|
||||
|
||||
DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
|
||||
|
||||
List<URL> templateExportedURLs = emptyList();
|
||||
|
||||
if (dubboMetadataService != null) {
|
||||
templateExportedURLs = getExportedURLs(dubboMetadataService, subscribedURL);
|
||||
if (instances.size() == 0) {
|
||||
logger.warn("APP {} instance changed, size changed zero!!!", appName);
|
||||
}
|
||||
else {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"The metadata of Dubbo service[key : {}] still can't be found, it could effect the further "
|
||||
+ "Dubbo service invocation",
|
||||
subscribedURL.getServiceKey());
|
||||
logger.info("APP {} instance changed, size changed to {}", appName,
|
||||
instances.size());
|
||||
}
|
||||
// group by revision
|
||||
Map<String, List<ServiceInstance>> newGroup = instances.stream()
|
||||
.collect(Collectors.groupingBy(this::getRevision));
|
||||
|
||||
synchronized (this) {
|
||||
|
||||
Map<String, List<ServiceInstance>> oldGroup = serviceRevisionInstanceMap
|
||||
.computeIfAbsent(appName, k -> new HashMap<>());
|
||||
|
||||
if (serviceInstanceNotChanged(oldGroup, newGroup)) {
|
||||
logger.debug("APP {} instance changed, but nothing different", appName);
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
try {
|
||||
|
||||
return templateExportedURLs;
|
||||
// ensure that the service metadata is correct
|
||||
refreshServiceMetadataInfo(appName, instances);
|
||||
|
||||
// then , refresh general service associated with current application
|
||||
refreshGeneralServiceInfo(appName, oldGroup, newGroup);
|
||||
|
||||
// mark process successful
|
||||
reSubscribeManager.onRefreshSuccess(event);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error(String.format(
|
||||
"APP %s instance changed, handler faild, try resubscribe",
|
||||
appName), e);
|
||||
reSubscribeManager.onRefreshFail(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private DubboMetadataService getProxy(List<ServiceInstance> serviceInstances) {
|
||||
return dubboMetadataConfigServiceProxy.getProxy(serviceInstances);
|
||||
private void refreshGeneralServiceInfo(String appName,
|
||||
Map<String, List<ServiceInstance>> oldGroup,
|
||||
Map<String, List<ServiceInstance>> newGroup) {
|
||||
|
||||
Set<URL> urls2refresh = new HashSet<>();
|
||||
|
||||
// compare with local
|
||||
for (String revision : oldGroup.keySet()) {
|
||||
|
||||
if (!newGroup.containsKey(revision)) {
|
||||
// all instances of this list with revision has losted
|
||||
urlSubscribeHandlerMap.forEach((url, handler) -> {
|
||||
if (handler.relatedWith(appName, revision)) {
|
||||
handler.removeAppNameWithRevision(appName, revision);
|
||||
urls2refresh.add(url);
|
||||
}
|
||||
});
|
||||
logger.debug("Subscription app {} revision {} has all losted", appName,
|
||||
revision);
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, List<ServiceInstance>> entry : newGroup.entrySet()) {
|
||||
String revision = entry.getKey();
|
||||
List<ServiceInstance> instanceList = entry.getValue();
|
||||
|
||||
if (!oldGroup.containsKey(revision)) {
|
||||
// this instance list of revision not exists
|
||||
// should acquire urls
|
||||
urlSubscribeHandlerMap.forEach(
|
||||
(url, handler) -> handler.init(appName, revision, instanceList));
|
||||
}
|
||||
|
||||
urlSubscribeHandlerMap.forEach((url, handler) -> {
|
||||
if (handler.relatedWith(appName, revision)) {
|
||||
urls2refresh.add(url);
|
||||
}
|
||||
});
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Subscription app {} revision {} changed, instance list {}",
|
||||
appName, revision,
|
||||
instanceList.stream().map(
|
||||
instance -> instance.getHost() + ":" + instance.getPort())
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
serviceRevisionInstanceMap.put(appName, newGroup);
|
||||
|
||||
if (urls2refresh.size() == 0) {
|
||||
logger.debug("Subscription app {}, no urls will be refreshed", appName);
|
||||
}
|
||||
else {
|
||||
logger.debug("Subscription app {}, the following url will be refresh:{}",
|
||||
appName, urls2refresh.stream().map(URL::getServiceKey)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
for (URL url : urls2refresh) {
|
||||
GenearalServiceSubscribeHandler handler = urlSubscribeHandlerMap.get(url);
|
||||
if (handler == null) {
|
||||
logger.warn("Subscription app {}, can't find handler for service {}",
|
||||
appName, url.getServiceKey());
|
||||
continue;
|
||||
}
|
||||
handler.refresh();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshServiceMetadataInfo(String serviceName,
|
||||
List<ServiceInstance> serviceInstances) {
|
||||
MetadataServiceSubscribeHandler handler = metadataSubscribeHandlerMap
|
||||
.get(serviceName);
|
||||
|
||||
if (handler == null) {
|
||||
logger.warn("Subscription app {}, can't find metadata handler", serviceName);
|
||||
return;
|
||||
}
|
||||
handler.refresh(serviceInstances);
|
||||
}
|
||||
|
||||
private boolean serviceInstanceNotChanged(Map<String, List<ServiceInstance>> oldGroup,
|
||||
Map<String, List<ServiceInstance>> newGroup) {
|
||||
if (newGroup.size() != oldGroup.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (Map.Entry<String, List<ServiceInstance>> entry : newGroup.entrySet()) {
|
||||
String appName = entry.getKey();
|
||||
List<ServiceInstance> newInstances = entry.getValue();
|
||||
|
||||
if (!oldGroup.containsKey(appName)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
List<ServiceInstance> oldInstances = oldGroup.get(appName);
|
||||
if (newInstances.size() != oldInstances.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean matched = newInstances.stream().allMatch(newInstance -> {
|
||||
|
||||
for (ServiceInstance oldInstance : oldInstances) {
|
||||
if (instanceSame(newInstance, oldInstance)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
if (!matched) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean instanceSame(ServiceInstance newInstance,
|
||||
ServiceInstance oldInstance) {
|
||||
if (!StringUtils.equals(newInstance.getInstanceId(),
|
||||
oldInstance.getInstanceId())) {
|
||||
return false;
|
||||
}
|
||||
if (!StringUtils.equals(newInstance.getHost(), oldInstance.getHost())) {
|
||||
return false;
|
||||
}
|
||||
if (!StringUtils.equals(newInstance.getServiceId(), oldInstance.getServiceId())) {
|
||||
return false;
|
||||
}
|
||||
if (!StringUtils.equals(newInstance.getScheme(), oldInstance.getScheme())) {
|
||||
return false;
|
||||
}
|
||||
if (oldInstance.getPort() != newInstance.getPort()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!oldInstance.getMetadata().equals(newInstance.getMetadata())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
String getRevision(ServiceInstance instance) {
|
||||
Map<String, String> metadata = instance.getMetadata();
|
||||
String revision = metadata.get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
|
||||
|
||||
if (revision == null) {
|
||||
revision = RevisionResolver.getEmptyRevision();
|
||||
}
|
||||
return revision;
|
||||
}
|
||||
|
||||
private List<ServiceInstance> filter(Collection<ServiceInstance> serviceInstances) {
|
||||
@ -380,40 +462,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
|
||||
|
||||
private Set<String> getServices(URL url) {
|
||||
Set<String> subscribedServices = repository.getSubscribedServices();
|
||||
if (subscribedServices.contains("*")) {
|
||||
subscribedServices = new HashSet<>(discoveryClient.getServices());
|
||||
}
|
||||
// TODO Add the filter feature
|
||||
return subscribedServices;
|
||||
}
|
||||
|
||||
private void notifyAllSubscribedURLs(URL url, List<URL> subscribedURLs,
|
||||
NotifyListener listener) {
|
||||
|
||||
if (isEmpty(subscribedURLs)) {
|
||||
// Add the EMPTY_PROTOCOL URL
|
||||
subscribedURLs.add(emptyURL(url));
|
||||
|
||||
// if (isDubboMetadataServiceURL(url)) {
|
||||
// if meta service change, and serviceInstances is zero, will clean up
|
||||
// information about this client
|
||||
// String serviceName = url.getParameter(GROUP_KEY);
|
||||
// repository.removeMetadataAndInitializedService(serviceName, url);
|
||||
// }
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("The subscribed URL[{}] will notify all URLs : {}", url,
|
||||
subscribedURLs);
|
||||
}
|
||||
|
||||
// Notify all
|
||||
listener.notify(subscribedURLs);
|
||||
}
|
||||
|
||||
private List<ServiceInstance> getServiceInstances(Iterable<String> serviceNames) {
|
||||
return stream(serviceNames.spliterator(), false).map(this::getServiceInstances)
|
||||
.flatMap(Collection::stream).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<ServiceInstance> getServiceInstances(String serviceName) {
|
||||
List<ServiceInstance> getServiceInstances(String serviceName) {
|
||||
return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList();
|
||||
}
|
||||
|
||||
@ -430,106 +486,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
|
||||
return serviceInstances;
|
||||
}
|
||||
|
||||
private String generateId(URL url) {
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
private URL emptyURL(URL url) {
|
||||
// issue : When the last service provider is closed, the client still periodically
|
||||
// connects to the last provider.n
|
||||
// fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
|
||||
return from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY)
|
||||
.build();
|
||||
}
|
||||
|
||||
private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService,
|
||||
URL subscribedURL) {
|
||||
String serviceInterface = subscribedURL.getServiceInterface();
|
||||
String group = subscribedURL.getParameter(GROUP_KEY);
|
||||
String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
// The subscribed protocol may be null
|
||||
String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
|
||||
group, version);
|
||||
return jsonUtils.toURLs(exportedURLsJSON).stream()
|
||||
.filter(exportedURL -> subscribedProtocol == null
|
||||
|| subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
|
||||
NotifyListener listener) {
|
||||
|
||||
// Sync subscription
|
||||
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
|
||||
getServiceName(subscribedURL));
|
||||
|
||||
// Sync subscription
|
||||
if (containsProviderCategory(subscribedURL)) {
|
||||
registerServiceInstancesChangedListener(subscribedURL,
|
||||
new ApplicationListener<ServiceInstancesChangedEvent>() {
|
||||
|
||||
private final URL url2subscribe = subscribedURL;
|
||||
|
||||
@Override
|
||||
@Order(Ordered.LOWEST_PRECEDENCE - 1)
|
||||
public void onApplicationEvent(
|
||||
ServiceInstancesChangedEvent event) {
|
||||
String sourceServiceName = event.getServiceName();
|
||||
String serviceName = getServiceName(subscribedURL);
|
||||
|
||||
if (Objects.equals(sourceServiceName, serviceName)) {
|
||||
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
|
||||
sourceServiceName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ServiceInstancesChangedEventListener:"
|
||||
+ subscribedURL.getServiceKey();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// the group of DubboMetadataService is current application name
|
||||
private String getServiceName(URL subscribedURL) {
|
||||
return subscribedURL.getParameter(GROUP_KEY);
|
||||
}
|
||||
|
||||
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
|
||||
NotifyListener listener, String serviceName) {
|
||||
|
||||
String serviceInterface = subscribedURL.getServiceInterface();
|
||||
String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
|
||||
List<ServiceInstance> serviceInstances = getServiceInstances(serviceName);
|
||||
|
||||
List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
|
||||
serviceInterface, version, protocol);
|
||||
|
||||
notifyAllSubscribedURLs(subscribedURL, urls, listener);
|
||||
}
|
||||
|
||||
// private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
|
||||
// NotifyListener listener, Set<String> serviceNames) {
|
||||
//
|
||||
// String serviceInterface = subscribedURL.getServiceInterface();
|
||||
// String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
// String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
//
|
||||
// List<ServiceInstance> serviceInstances = getServiceInstances(serviceNames);
|
||||
//
|
||||
// List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
|
||||
// serviceInterface, version, protocol);
|
||||
//
|
||||
// notifyAllSubscribedURLs(subscribedURL, urls, listener);
|
||||
// }
|
||||
|
||||
private boolean containsProviderCategory(URL subscribedURL) {
|
||||
String category = subscribedURL.getParameter(CATEGORY_KEY);
|
||||
return category == null ? false : category.contains(PROVIDER);
|
||||
return category != null && category.contains(PROVIDER);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -550,4 +514,32 @@ public class DubboCloudRegistry extends FailbackRegistry {
|
||||
return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface());
|
||||
}
|
||||
|
||||
protected boolean isConsumerServiceURL(URL url) {
|
||||
return CONSUMER.equals(url.getProtocol());
|
||||
}
|
||||
|
||||
public List<ServiceInstance> getServiceInstances(Map<String, Set<String>> providers) {
|
||||
List<ServiceInstance> instances = new ArrayList<>();
|
||||
|
||||
providers.forEach((appName, revisions) -> {
|
||||
Map<String, List<ServiceInstance>> revisionMap = serviceRevisionInstanceMap
|
||||
.get(appName);
|
||||
if (revisionMap == null) {
|
||||
return;
|
||||
}
|
||||
for (String revision : revisions) {
|
||||
List<ServiceInstance> list = revisionMap.get(revision);
|
||||
if (list != null) {
|
||||
instances.addAll(list);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return instances;
|
||||
}
|
||||
|
||||
public Map<String, Map<String, List<ServiceInstance>>> getServiceRevisionInstanceMap() {
|
||||
return serviceRevisionInstanceMap;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,270 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataService;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy;
|
||||
import com.alibaba.cloud.dubbo.util.JSONUtils;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.URLBuilder;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.apache.dubbo.common.URLBuilder.from;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class GenearalServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
|
||||
|
||||
private final Map<String, Set<String>> providers = new HashMap<>();
|
||||
|
||||
private final Map<String, URL> urlTemplateMap = new HashMap<>();
|
||||
|
||||
private final JSONUtils jsonUtils;
|
||||
|
||||
private final DubboServiceMetadataRepository repository;
|
||||
|
||||
private final DubboMetadataServiceProxy dubboMetadataConfigServiceProxy;
|
||||
|
||||
public GenearalServiceSubscribeHandler(URL url, NotifyListener listener,
|
||||
DubboCloudRegistry registry, DubboServiceMetadataRepository repository,
|
||||
JSONUtils jsonUtils,
|
||||
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy) {
|
||||
super(url, listener, registry);
|
||||
this.repository = repository;
|
||||
this.jsonUtils = jsonUtils;
|
||||
this.dubboMetadataConfigServiceProxy = dubboMetadataConfigServiceProxy;
|
||||
}
|
||||
|
||||
public boolean relatedWith(String appName, String revision) {
|
||||
Set<String> list = providers.get(appName);
|
||||
if (list != null && list.size() > 0) {
|
||||
if (list.contains(revision)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void removeAppNameWithRevision(String appName, String revision) {
|
||||
Set<String> list = providers.get(appName);
|
||||
if (list != null) {
|
||||
list.remove(revision);
|
||||
if (list.size() == 0) {
|
||||
providers.remove(appName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void addAppNameWithRevision(String appName, String revision) {
|
||||
Set<String> set = providers.computeIfAbsent(appName, k -> new HashSet<>());
|
||||
set.add(revision);
|
||||
}
|
||||
|
||||
public synchronized void doInit() {
|
||||
logger.debug("Subscription interface {}, GenearalServiceSubscribeHandler init",
|
||||
url.getServiceKey());
|
||||
Map<String, Map<String, List<ServiceInstance>>> map = registry
|
||||
.getServiceRevisionInstanceMap();
|
||||
for (Map.Entry<String, Map<String, List<ServiceInstance>>> entry : map
|
||||
.entrySet()) {
|
||||
String appName = entry.getKey();
|
||||
Map<String, List<ServiceInstance>> revisionMap = entry.getValue();
|
||||
|
||||
for (Map.Entry<String, List<ServiceInstance>> revisionEntity : revisionMap
|
||||
.entrySet()) {
|
||||
String revision = revisionEntity.getKey();
|
||||
List<ServiceInstance> instances = revisionEntity.getValue();
|
||||
init(appName, revision, instances);
|
||||
}
|
||||
}
|
||||
refresh();
|
||||
}
|
||||
|
||||
public void init(String appName, String revision,
|
||||
List<ServiceInstance> instanceList) {
|
||||
List<URL> urls = getTemplateExportedURLs(url, instanceList);
|
||||
if (urls != null && urls.size() > 0) {
|
||||
addAppNameWithRevision(appName, revision);
|
||||
setUrlTemplate(appName, revision, urls);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void refresh() {
|
||||
List<URL> urls = getProviderURLs();
|
||||
notifyAllSubscribedURLs(url, urls, listener);
|
||||
}
|
||||
|
||||
private List<URL> getProviderURLs() {
|
||||
List<ServiceInstance> instances = registry.getServiceInstances(providers);
|
||||
|
||||
logger.debug("Subscription interfece {}, providers {}, total {}",
|
||||
url.getServiceKey(), providers, instances.size());
|
||||
|
||||
if (instances.size() == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return cloneExportedURLs(instances);
|
||||
}
|
||||
|
||||
void setUrlTemplate(String appName, String revision, List<URL> urls) {
|
||||
if (urls == null || urls.size() == 0) {
|
||||
return;
|
||||
}
|
||||
String key = getAppRevisionKey(appName, revision);
|
||||
if (urlTemplateMap.containsKey(key)) {
|
||||
return;
|
||||
}
|
||||
urlTemplateMap.put(key, urls.get(0));
|
||||
}
|
||||
|
||||
private String getAppRevisionKey(String appName, String revision) {
|
||||
return appName + "@" + revision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone the subscribed URLs based on the template URLs.
|
||||
* @param serviceInstances the list of
|
||||
* {@link org.springframework.cloud.client.ServiceInstance service instances}
|
||||
* @return
|
||||
*/
|
||||
List<URL> cloneExportedURLs(List<ServiceInstance> serviceInstances) {
|
||||
|
||||
List<URL> urlsCloneTo = new ArrayList<>();
|
||||
serviceInstances.forEach(serviceInstance -> {
|
||||
|
||||
String host = serviceInstance.getHost();
|
||||
String appName = serviceInstance.getServiceId();
|
||||
String revision = registry.getRevision(serviceInstance);
|
||||
|
||||
URL template = urlTemplateMap.get(getAppRevisionKey(appName, revision));
|
||||
|
||||
Stream.of(template)
|
||||
.map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
|
||||
.map(templateURL -> templateURL.removeParameter(PID_KEY))
|
||||
.map(templateURL -> {
|
||||
String protocol = templateURL.getProtocol();
|
||||
Integer port = repository.getDubboProtocolPort(serviceInstance,
|
||||
protocol);
|
||||
|
||||
// reserve tag
|
||||
String tag = null;
|
||||
List<URL> urls = jsonUtils.toURLs(serviceInstance.getMetadata()
|
||||
.get("dubbo.metadata-service.urls"));
|
||||
if (urls != null && urls.size() > 0) {
|
||||
Map<String, String> parameters = urls.get(0).getParameters();
|
||||
tag = parameters.get("dubbo.tag");
|
||||
}
|
||||
|
||||
if (Objects.equals(templateURL.getHost(), host)
|
||||
&& Objects.equals(templateURL.getPort(), port)) { // use
|
||||
// templateURL
|
||||
// if
|
||||
// equals
|
||||
return templateURL;
|
||||
}
|
||||
|
||||
if (port == null) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"The protocol[{}] port of Dubbo service instance[host : {}] "
|
||||
+ "can't be resolved",
|
||||
protocol, host);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
URLBuilder clonedURLBuilder = from(templateURL) // remove the
|
||||
// parameters from
|
||||
// the template
|
||||
// URL
|
||||
.setHost(host) // reset the host
|
||||
.setPort(port) // reset the port
|
||||
.addParameter("dubbo.tag", tag); // reset the tag
|
||||
|
||||
return clonedURLBuilder.build();
|
||||
}
|
||||
|
||||
}).filter(Objects::nonNull).forEach(urlsCloneTo::add);
|
||||
});
|
||||
return urlsCloneTo;
|
||||
}
|
||||
|
||||
private List<URL> getTemplateExportedURLs(URL subscribedURL,
|
||||
List<ServiceInstance> serviceInstances) {
|
||||
|
||||
DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
|
||||
|
||||
List<URL> templateExportedURLs = emptyList();
|
||||
|
||||
if (dubboMetadataService != null) {
|
||||
templateExportedURLs = getExportedURLs(dubboMetadataService, subscribedURL);
|
||||
}
|
||||
else {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"The metadata of Dubbo service[key : {}] still can't be found, it could effect the further "
|
||||
+ "Dubbo service invocation",
|
||||
subscribedURL.getServiceKey());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return templateExportedURLs;
|
||||
}
|
||||
|
||||
private DubboMetadataService getProxy(List<ServiceInstance> serviceInstances) {
|
||||
return dubboMetadataConfigServiceProxy.getProxy(serviceInstances);
|
||||
}
|
||||
|
||||
private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService,
|
||||
URL subscribedURL) {
|
||||
String serviceInterface = subscribedURL.getServiceInterface();
|
||||
String group = subscribedURL.getParameter(GROUP_KEY);
|
||||
String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
// The subscribed protocol may be null
|
||||
String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
|
||||
group, version);
|
||||
return jsonUtils.toURLs(exportedURLsJSON).stream()
|
||||
.filter(exportedURL -> subscribedProtocol == null
|
||||
|| subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.dubbo.util.DubboMetadataUtils;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class MetadataServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
|
||||
|
||||
private final String appName;
|
||||
|
||||
private final DubboMetadataUtils dubboMetadataUtils;
|
||||
|
||||
public MetadataServiceSubscribeHandler(String appName, URL url,
|
||||
NotifyListener listener, DubboCloudRegistry registry,
|
||||
DubboMetadataUtils dubboMetadataUtils) {
|
||||
super(url, listener, registry);
|
||||
this.appName = appName;
|
||||
this.dubboMetadataUtils = dubboMetadataUtils;
|
||||
}
|
||||
|
||||
public void doInit() {
|
||||
logger.debug("Subscription app {} MetadataService handler init", appName);
|
||||
List<ServiceInstance> serviceInstances = registry.getServiceInstances(appName);
|
||||
subscribeDubboMetadataServiceURLs(url, listener, serviceInstances);
|
||||
}
|
||||
|
||||
public void refresh(List<ServiceInstance> serviceInstances) {
|
||||
logger.debug("Subscription app {}, instance changed, new size = {}", appName,
|
||||
serviceInstances.size());
|
||||
subscribeDubboMetadataServiceURLs(url, listener, serviceInstances);
|
||||
}
|
||||
|
||||
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
|
||||
NotifyListener listener, List<ServiceInstance> serviceInstances) {
|
||||
|
||||
logger.debug("Subscription app {}, service instance changed to size {}", appName,
|
||||
serviceInstances.size());
|
||||
|
||||
String serviceInterface = subscribedURL.getServiceInterface();
|
||||
String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
|
||||
List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
|
||||
serviceInterface, version, protocol);
|
||||
|
||||
notifyAllSubscribedURLs(subscribedURL, urls, listener);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.alibaba.cloud.dubbo.env.DubboCloudProperties;
|
||||
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class ReSubscribeManager {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(ReSubscribeManager.class);
|
||||
|
||||
private final Map<String, ReSubscribeMetadataJob> reConnectJobMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
|
||||
5);
|
||||
|
||||
private final DubboCloudRegistry registry;
|
||||
|
||||
private final DubboCloudProperties properties;
|
||||
|
||||
public ReSubscribeManager(DubboCloudRegistry registry) {
|
||||
this.registry = registry;
|
||||
this.properties = registry.getBean(DubboCloudProperties.class);
|
||||
|
||||
reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
|
||||
reConnectPool.allowCoreThreadTimeOut(true);
|
||||
}
|
||||
|
||||
public void onRefreshSuccess(ServiceInstancesChangedEvent event) {
|
||||
reConnectJobMap.remove(event.getServiceName());
|
||||
}
|
||||
|
||||
public void onRefreshFail(ServiceInstancesChangedEvent event) {
|
||||
String serviceName = event.getServiceName();
|
||||
|
||||
int count = 1;
|
||||
|
||||
if (event instanceof FakeServiceInstancesChangedEvent) {
|
||||
count = ((FakeServiceInstancesChangedEvent) event).getCount() + 1;
|
||||
}
|
||||
|
||||
if (count >= properties.getMaxReSubscribeMetadataTimes()) {
|
||||
logger.error(
|
||||
"reSubscribe failed too many times, serviceName = {}, count = {}",
|
||||
serviceName, count);
|
||||
return;
|
||||
}
|
||||
|
||||
ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, count);
|
||||
reConnectPool.schedule(job, properties.getReSubscribeMetadataIntervial(),
|
||||
TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private final class ReSubscribeMetadataJob implements Runnable {
|
||||
|
||||
private final String serviceName;
|
||||
|
||||
private final int errorCounts;
|
||||
|
||||
private ReSubscribeMetadataJob(String serviceName, int errorCounts) {
|
||||
this.errorCounts = errorCounts;
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (!reConnectJobMap.containsKey(serviceName)
|
||||
|| reConnectJobMap.get(serviceName) != this) {
|
||||
return;
|
||||
}
|
||||
List<ServiceInstance> list = registry.getServiceInstances(serviceName);
|
||||
FakeServiceInstancesChangedEvent event = new FakeServiceInstancesChangedEvent(
|
||||
serviceName, list, errorCounts);
|
||||
registry.onApplicationEvent(event);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final class FakeServiceInstancesChangedEvent
|
||||
extends ServiceInstancesChangedEvent {
|
||||
|
||||
private static final long serialVersionUID = -2832478604601472915L;
|
||||
|
||||
private final int count;
|
||||
|
||||
private FakeServiceInstancesChangedEvent(String serviceName,
|
||||
List<ServiceInstance> serviceInstances, int count) {
|
||||
super(serviceName, serviceInstances);
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
|
||||
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
|
||||
/**
|
||||
* The interface of ServiceInstanceChange event Listener.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
* @see ServiceInstancesChangedEvent
|
||||
* @see Ordered
|
||||
* @see ApplicationListener
|
||||
*/
|
||||
public interface ServiceInstanceChangeListener
|
||||
extends ApplicationListener<ServiceInstancesChangedEvent>, Ordered {
|
||||
|
||||
}
|
@ -89,7 +89,7 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
|
||||
DubboCloudProperties dubboCloudProperties = applicationContext
|
||||
.getBean(DubboCloudProperties.class);
|
||||
|
||||
Registry registry = null;
|
||||
Registry registry;
|
||||
|
||||
switch (dubboCloudProperties.getRegistryType()) {
|
||||
case SPRING_CLOUD_REGISTRY_PROPERTY_VALUE:
|
||||
@ -100,7 +100,7 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
|
||||
default:
|
||||
registry = new DubboCloudRegistry(url, discoveryClient,
|
||||
dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
|
||||
jsonUtils, dubboGenericServiceFactory, applicationContext);
|
||||
jsonUtils, applicationContext);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
default=com.alibaba.cloud.dubbo.metadata.DefaultMetadataParamsFilter
|
Loading…
x
Reference in New Issue
Block a user