1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Merge pull request #2071 from theonefx/master

resolve @order not effect when handle ServiceInstanceChangedEvent
This commit is contained in:
Mercy Ma 2021-05-13 18:29:39 +08:00 committed by GitHub
commit 3f039ad099
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 235 additions and 40 deletions

View File

@ -51,6 +51,10 @@ public class DubboCloudProperties {
private String registryType = DUBBO_CLOUD_REGISTRY_PROPERTY_VALUE;
private int maxReSubscribeMetadataTimes = 1000;
private int reSubscribeMetadataIntervial = 5;
public String getSubscribedServices() {
return subscribedServices;
}
@ -91,4 +95,20 @@ public class DubboCloudProperties {
this.registryType = registryType;
}
public int getMaxReSubscribeMetadataTimes() {
return maxReSubscribeMetadataTimes;
}
public void setMaxReSubscribeMetadataTimes(int maxReSubscribeMetadataTimes) {
this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
}
public int getReSubscribeMetadataIntervial() {
return reSubscribeMetadataIntervial;
}
public void setReSubscribeMetadataIntervial(int reSubscribeMetadataIntervial) {
this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
}
}

View File

@ -23,6 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -45,12 +48,10 @@ import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.util.CollectionUtils;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.stream.StreamSupport.stream;
import static org.apache.dubbo.common.URLBuilder.from;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
@ -110,11 +111,23 @@ public class DubboCloudRegistry extends FailbackRegistry {
private final String currentApplicationName;
private final Map<URL, NotifyListener> urlNotifyListenerMap = new ConcurrentHashMap<>();
private final Map<String, ReSubscribeMetadataJob> reConnectJobMap = new ConcurrentHashMap<>();
private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
2);
private final int maxReSubscribeMetadataTimes;
private final int reSubscribeMetadataIntervial;
public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient,
DubboServiceMetadataRepository repository,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy,
JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory,
ConfigurableApplicationContext applicationContext) {
ConfigurableApplicationContext applicationContext,
int maxReSubscribeMetadataTimes, int reSubscribeMetadataIntervial) {
super(url);
this.servicesLookupInterval = url
@ -127,6 +140,11 @@ public class DubboCloudRegistry extends FailbackRegistry {
this.applicationContext = applicationContext;
this.dubboMetadataUtils = getBean(DubboMetadataUtils.class);
this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName();
this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
reConnectPool.allowCoreThreadTimeOut(true);
}
private <T> T getBean(Class<T> beanClass) {
@ -177,6 +195,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
}
else { // for general Dubbo Services
subscribeURLs(url, listener);
urlNotifyListenerMap.put(url, listener);
}
}
@ -188,19 +207,34 @@ public class DubboCloudRegistry extends FailbackRegistry {
// Async subscription
registerServiceInstancesChangedListener(url,
new ApplicationListener<ServiceInstancesChangedEvent>() {
private final URL url2subscribe = url;
new ServiceInstanceChangeListener() {
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
@Order
public void onApplicationEvent(ServiceInstancesChangedEvent event) {
Set<String> serviceNames = getServices(url);
String serviceName = event.getServiceName();
if (serviceNames.contains(serviceName)) {
subscribeURLs(url, serviceNames, listener);
logger.debug(
"handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}",
event.getServiceName(), url.getServiceKey());
try {
subscribeURLs(url, serviceNames, listener);
reConnectJobMap.remove(serviceName);
}
catch (Exception e) {
logger.warn(String.format(
"subscribeURLs failed, serviceName = %s, try reSubscribe again",
serviceName), e);
addReSubscribeMetadataJob(serviceName, 0);
}
}
}
@ -212,8 +246,19 @@ public class DubboCloudRegistry extends FailbackRegistry {
});
}
private void subscribeURLs(URL url, Set<String> serviceNames,
NotifyListener listener) {
void addReSubscribeMetadataJob(String serviceName, int count) {
if (count > maxReSubscribeMetadataTimes) {
logger.error(
"reSubscribe failed too many times, serviceName = {}, count = {}",
serviceName, count);
return;
}
ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, this, count);
reConnectJobMap.put(serviceName, job);
reConnectPool.schedule(job, reSubscribeMetadataIntervial, TimeUnit.SECONDS);
}
void subscribeURLs(URL url, Set<String> serviceNames, NotifyListener listener) {
List<URL> subscribedURLs = new LinkedList<>();
@ -252,6 +297,10 @@ public class DubboCloudRegistry extends FailbackRegistry {
serviceName));
}
}
else {
logger.debug("subscribe from serviceName = {}, size = {}", serviceName,
serviceInstances.size());
}
List<URL> exportedURLs = getExportedURLs(subscribedURL, serviceName,
serviceInstances);
@ -389,7 +438,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
return metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME);
}
private Set<String> getServices(URL url) {
Set<String> getServices(URL url) {
Set<String> subscribedServices = repository.getSubscribedServices();
// TODO Add the filter feature
return subscribedServices;
@ -419,11 +468,6 @@ public class DubboCloudRegistry extends FailbackRegistry {
listener.notify(subscribedURLs);
}
private List<ServiceInstance> getServiceInstances(Iterable<String> serviceNames) {
return stream(serviceNames.spliterator(), false).map(this::getServiceInstances)
.flatMap(Collection::stream).collect(Collectors.toList());
}
private List<ServiceInstance> getServiceInstances(String serviceName) {
return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList();
}
@ -471,27 +515,38 @@ public class DubboCloudRegistry extends FailbackRegistry {
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
NotifyListener listener) {
// Sync subscription
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
getServiceName(subscribedURL));
// Sync subscription
if (containsProviderCategory(subscribedURL)) {
registerServiceInstancesChangedListener(subscribedURL,
new ApplicationListener<ServiceInstancesChangedEvent>() {
private final URL url2subscribe = subscribedURL;
registerServiceInstancesChangedListener(subscribedURL,
new ServiceInstanceChangeListener() {
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 1;
}
@Override
@Order(Ordered.LOWEST_PRECEDENCE - 1)
public void onApplicationEvent(
ServiceInstancesChangedEvent event) {
String sourceServiceName = event.getServiceName();
List<ServiceInstance> serviceInstances = event
.getServiceInstances();
String serviceName = getServiceName(subscribedURL);
if (Objects.equals(sourceServiceName, serviceName)) {
logger.debug(
"handle serviceInstanceChange of metadata service, serviceName = {}, subscribeUrl={}",
event.getServiceName(),
subscribedURL.getServiceKey());
// only update serviceInstances of the specified
// serviceName
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
sourceServiceName);
sourceServiceName, serviceInstances);
}
}
@ -509,34 +564,25 @@ public class DubboCloudRegistry extends FailbackRegistry {
}
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
NotifyListener listener, String serviceName) {
NotifyListener listener, String serviceName,
List<ServiceInstance> serviceInstances) {
String serviceInterface = subscribedURL.getServiceInterface();
String version = subscribedURL.getParameter(VERSION_KEY);
String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
List<ServiceInstance> serviceInstances = getServiceInstances(serviceName);
List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
serviceInterface, version, protocol);
notifyAllSubscribedURLs(subscribedURL, urls, listener);
}
// private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
// NotifyListener listener, Set<String> serviceNames) {
//
// String serviceInterface = subscribedURL.getServiceInterface();
// String version = subscribedURL.getParameter(VERSION_KEY);
// String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
//
// List<ServiceInstance> serviceInstances = getServiceInstances(serviceNames);
//
// List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
// serviceInterface, version, protocol);
//
// notifyAllSubscribedURLs(subscribedURL, urls, listener);
// }
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
NotifyListener listener, String serviceName) {
List<ServiceInstance> serviceInstances = getServiceInstances(serviceName);
subscribeDubboMetadataServiceURLs(subscribedURL, listener, serviceName,
serviceInstances);
}
private boolean containsProviderCategory(URL subscribedURL) {
String category = subscribedURL.getParameter(CATEGORY_KEY);
@ -557,6 +603,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
return ADMIN_PROTOCOL.equals(url.getProtocol());
}
public Map<URL, NotifyListener> getUrlNotifyListenerMap() {
return urlNotifyListenerMap;
}
public Map<String, ReSubscribeMetadataJob> getReConnectJobMap() {
return reConnectJobMap;
}
protected boolean isDubboMetadataServiceURL(URL url) {
return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface());
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.dubbo.registry;
import java.util.Map;
import java.util.Set;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.NotifyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* For re subscribe URL from provider.
*
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
*/
public class ReSubscribeMetadataJob implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(ReSubscribeMetadataJob.class);
private final String serviceName;
private final DubboCloudRegistry dubboCloudRegistry;
private final int errorCounts;
public ReSubscribeMetadataJob(String serviceName,
DubboCloudRegistry dubboCloudRegistry, int errorCounts) {
this.errorCounts = errorCounts;
this.serviceName = serviceName;
this.dubboCloudRegistry = dubboCloudRegistry;
}
public ReSubscribeMetadataJob(String serviceName,
DubboCloudRegistry dubboCloudRegistry) {
this(serviceName, dubboCloudRegistry, 0);
}
@Override
public void run() {
if (dubboCloudRegistry.getReConnectJobMap().get(serviceName) != this) {
return;
}
try {
logger.info("reSubscribe, serviceName = {}, count = {}", serviceName,
errorCounts);
for (Map.Entry<URL, NotifyListener> entry : dubboCloudRegistry
.getUrlNotifyListenerMap().entrySet()) {
doRun(entry.getKey(), entry.getValue());
}
dubboCloudRegistry.getReConnectJobMap().remove(serviceName);
}
catch (Exception e) {
logger.warn(String.format(
"reSubscribe failed, serviceName = %s, try refresh again",
serviceName), e);
dubboCloudRegistry.addReSubscribeMetadataJob(serviceName, errorCounts + 1);
}
}
private void doRun(URL url, NotifyListener listener) {
Set<String> serviceNames = dubboCloudRegistry.getServices(url);
if (serviceNames.contains(serviceName)) {
dubboCloudRegistry.subscribeURLs(url, serviceNames, listener);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.dubbo.registry;
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
/**
* The interface of ServiceInstanceChange event Listener.
*
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
* @see ServiceInstancesChangedEvent
* @see Ordered
* @see ApplicationListener
*/
public interface ServiceInstanceChangeListener
extends ApplicationListener<ServiceInstancesChangedEvent>, Ordered {
}

View File

@ -100,7 +100,9 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
default:
registry = new DubboCloudRegistry(url, discoveryClient,
dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
jsonUtils, dubboGenericServiceFactory, applicationContext);
jsonUtils, dubboGenericServiceFactory, applicationContext,
dubboCloudProperties.getMaxReSubscribeMetadataTimes(),
dubboCloudProperties.getReSubscribeMetadataIntervial());
break;
}