1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

add service instance change event listener order

This commit is contained in:
theonefx 2021-01-28 19:41:55 +08:00
parent 8542756d90
commit 6924f0c3be
3 changed files with 79 additions and 34 deletions

View File

@ -32,7 +32,8 @@ import org.springframework.context.annotation.PropertySource;
* *
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a> * @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
*/ */
@ConditionalOnClass(name = "org.springframework.boot.actuate.endpoint.annotation.Endpoint") @ConditionalOnClass(
name = "org.springframework.boot.actuate.endpoint.annotation.Endpoint")
@PropertySource("classpath:/META-INF/dubbo/default/actuator-endpoints.properties") @PropertySource("classpath:/META-INF/dubbo/default/actuator-endpoints.properties")
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
public class DubboMetadataEndpointAutoConfiguration { public class DubboMetadataEndpointAutoConfiguration {

View File

@ -16,6 +16,7 @@
package com.alibaba.cloud.dubbo.actuate.endpoint; package com.alibaba.cloud.dubbo.actuate.endpoint;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -38,7 +39,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
/** /**
* Dubbo Rest Metadata {@link Endpoint}. * Dubbo Registry Directory Metadata {@link DubboCloudRegistry}.
* *
* @author <a href="mailto:chenxilzx1@gmail.com">Theonefx</a> * @author <a href="mailto:chenxilzx1@gmail.com">Theonefx</a>
*/ */
@ -55,23 +56,36 @@ public class DubboDiscoveryEndpoint {
return Collections.emptyMap(); return Collections.emptyMap();
} }
Map<URL, Set<NotifyListener>> map = registry.getSubscribed(); Map<URL, Set<NotifyListener>> subscribeMap = registry.getSubscribed();
Map<String, List<String>> result = new HashMap<>(); Map<String, List<Map<String, Object>>> result = new HashMap<>();
map.forEach((url, listeners) -> { subscribeMap.forEach((url, listeners) -> {
String side = url.getParameter(SIDE_KEY); String side = url.getParameter(SIDE_KEY);
if (!CONSUMER_SIDE.equals(side)) { if (!CONSUMER_SIDE.equals(side)) {
return; return;
} }
List<String> list = listeners.stream() List<Map<String, Object>> pairs = result.computeIfAbsent(url.getServiceKey(),
.filter(l -> l instanceof RegistryDirectory) o -> new ArrayList<>());
.map(l -> (RegistryDirectory<?>) l)
.map(RegistryDirectory::getAllInvokers).flatMap(List::stream)
.map(Invoker::getUrl).map(URL::toServiceString).distinct().sorted()
.collect(Collectors.toList());
result.put(url.getServiceKey(), list); Map<String, Object> pair = new HashMap<>();
List<String> invokerServices = new ArrayList<>();
for (NotifyListener listener : listeners) {
if (!(listener instanceof RegistryDirectory)) {
continue;
}
RegistryDirectory<?> directory = (RegistryDirectory<?>) listener;
List<? extends Invoker<?>> invokers = directory.getAllInvokers();
if (invokers == null) {
continue;
}
invokerServices.addAll(invokers.stream().map(Invoker::getUrl)
.map(URL::toServiceString).collect(Collectors.toList()));
}
pair.put("invokers", invokerServices);
pair.put("subscribeUrl", url.toMap());
pairs.add(pair);
}); });
return result; return result;
} }

View File

@ -44,6 +44,8 @@ import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import static java.lang.String.format; import static java.lang.String.format;
@ -184,16 +186,30 @@ public class DubboCloudRegistry extends FailbackRegistry {
subscribeURLs(url, getServices(url), listener); subscribeURLs(url, getServices(url), listener);
// Async subscription // Async subscription
registerServiceInstancesChangedListener(url, event -> { registerServiceInstancesChangedListener(url,
Set<String> serviceNames = getServices(url); new ApplicationListener<ServiceInstancesChangedEvent>() {
String serviceName = event.getServiceName(); private final URL url2subscribe = url;
if (serviceNames.contains(serviceName)) { @Override
subscribeURLs(url, serviceNames, listener); @Order
} public void onApplicationEvent(ServiceInstancesChangedEvent event) {
}); Set<String> serviceNames = getServices(url);
String serviceName = event.getServiceName();
if (serviceNames.contains(serviceName)) {
subscribeURLs(url, serviceNames, listener);
}
}
@Override
public String toString() {
return "ServiceInstancesChangedEventListener:"
+ url.getServiceKey();
}
});
} }
private void subscribeURLs(URL url, Set<String> serviceNames, private void subscribeURLs(URL url, Set<String> serviceNames,
@ -375,12 +391,12 @@ public class DubboCloudRegistry extends FailbackRegistry {
// Add the EMPTY_PROTOCOL URL // Add the EMPTY_PROTOCOL URL
subscribedURLs.add(emptyURL(url)); subscribedURLs.add(emptyURL(url));
if (isDubboMetadataServiceURL(url)) { // if (isDubboMetadataServiceURL(url)) {
// if meta service change, and serviceInstances is zero, will clean up // if meta service change, and serviceInstances is zero, will clean up
// information about this client // information about this client
String serviceName = url.getParameter(GROUP_KEY); // String serviceName = url.getParameter(GROUP_KEY);
repository.removeMetadataAndInitializedService(serviceName, url); // repository.removeMetadataAndInitializedService(serviceName, url);
} // }
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -415,7 +431,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
} }
private String generateId(URL url) { private String generateId(URL url) {
return url.getServiceKey(); return url.toString();
} }
private URL emptyURL(URL url) { private URL emptyURL(URL url) {
@ -450,16 +466,30 @@ public class DubboCloudRegistry extends FailbackRegistry {
// Sync subscription // Sync subscription
if (containsProviderCategory(subscribedURL)) { if (containsProviderCategory(subscribedURL)) {
registerServiceInstancesChangedListener(subscribedURL, event -> { registerServiceInstancesChangedListener(subscribedURL,
new ApplicationListener<ServiceInstancesChangedEvent>() {
String sourceServiceName = event.getServiceName(); private final URL url2subscribe = subscribedURL;
String serviceName = getServiceName(subscribedURL);
if (Objects.equals(sourceServiceName, serviceName)) { @Override
subscribeDubboMetadataServiceURLs(subscribedURL, listener, @Order(Ordered.LOWEST_PRECEDENCE - 1)
sourceServiceName); public void onApplicationEvent(
} ServiceInstancesChangedEvent event) {
}); String sourceServiceName = event.getServiceName();
String serviceName = getServiceName(subscribedURL);
if (Objects.equals(sourceServiceName, serviceName)) {
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
sourceServiceName);
}
}
@Override
public String toString() {
return "ServiceInstancesChangedEventListener:"
+ subscribedURL.getServiceKey();
}
});
} }
} }