diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java index 577d19e5..5135c5a8 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java @@ -51,6 +51,10 @@ public class DubboCloudProperties { private String registryType = DUBBO_CLOUD_REGISTRY_PROPERTY_VALUE; + private int maxReSubscribeMetadataTimes = 1000; + + private int reSubscribeMetadataIntervial = 5; + public String getSubscribedServices() { return subscribedServices; } @@ -91,4 +95,20 @@ public class DubboCloudProperties { this.registryType = registryType; } + public int getMaxReSubscribeMetadataTimes() { + return maxReSubscribeMetadataTimes; + } + + public void setMaxReSubscribeMetadataTimes(int maxReSubscribeMetadataTimes) { + this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes; + } + + public int getReSubscribeMetadataIntervial() { + return reSubscribeMetadataIntervial; + } + + public void setReSubscribeMetadataIntervial(int reSubscribeMetadataIntervial) { + this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial; + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java index 04d9c99e..cce75ba3 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -45,12 +48,10 @@ import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; import org.springframework.util.CollectionUtils; import static java.lang.String.format; import static java.util.Collections.emptyList; -import static java.util.stream.StreamSupport.stream; import static org.apache.dubbo.common.URLBuilder.from; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY; @@ -110,11 +111,23 @@ public class DubboCloudRegistry extends FailbackRegistry { private final String currentApplicationName; + private final Map urlNotifyListenerMap = new ConcurrentHashMap<>(); + + private final Map reConnectJobMap = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor( + 2); + + private final int maxReSubscribeMetadataTimes; + + private final int reSubscribeMetadataIntervial; + public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient, DubboServiceMetadataRepository repository, DubboMetadataServiceProxy dubboMetadataConfigServiceProxy, JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory, - ConfigurableApplicationContext applicationContext) { + ConfigurableApplicationContext applicationContext, + int maxReSubscribeMetadataTimes, int reSubscribeMetadataIntervial) { super(url); this.servicesLookupInterval = url @@ -127,6 +140,11 @@ public class DubboCloudRegistry extends FailbackRegistry { this.applicationContext = applicationContext; this.dubboMetadataUtils = getBean(DubboMetadataUtils.class); this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName(); + this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes; + this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial; + + reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES); + reConnectPool.allowCoreThreadTimeOut(true); } private T getBean(Class beanClass) { @@ -177,6 +195,7 @@ public class DubboCloudRegistry extends FailbackRegistry { } else { // for general Dubbo Services subscribeURLs(url, listener); + urlNotifyListenerMap.put(url, listener); } } @@ -188,19 +207,34 @@ public class DubboCloudRegistry extends FailbackRegistry { // Async subscription registerServiceInstancesChangedListener(url, - new ApplicationListener() { - - private final URL url2subscribe = url; + new ServiceInstanceChangeListener() { + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } @Override - @Order public void onApplicationEvent(ServiceInstancesChangedEvent event) { + Set serviceNames = getServices(url); String serviceName = event.getServiceName(); if (serviceNames.contains(serviceName)) { - subscribeURLs(url, serviceNames, listener); + logger.debug( + "handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}", + event.getServiceName(), url.getServiceKey()); + try { + subscribeURLs(url, serviceNames, listener); + reConnectJobMap.remove(serviceName); + } + catch (Exception e) { + logger.warn(String.format( + "subscribeURLs failed, serviceName = %s, try reSubscribe again", + serviceName), e); + addReSubscribeMetadataJob(serviceName, 0); + } } } @@ -212,8 +246,19 @@ public class DubboCloudRegistry extends FailbackRegistry { }); } - private void subscribeURLs(URL url, Set serviceNames, - NotifyListener listener) { + void addReSubscribeMetadataJob(String serviceName, int count) { + if (count > maxReSubscribeMetadataTimes) { + logger.error( + "reSubscribe failed too many times, serviceName = {}, count = {}", + serviceName, count); + return; + } + ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, this, count); + reConnectJobMap.put(serviceName, job); + reConnectPool.schedule(job, reSubscribeMetadataIntervial, TimeUnit.SECONDS); + } + + void subscribeURLs(URL url, Set serviceNames, NotifyListener listener) { List subscribedURLs = new LinkedList<>(); @@ -252,6 +297,10 @@ public class DubboCloudRegistry extends FailbackRegistry { serviceName)); } } + else { + logger.debug("subscribe from serviceName = {}, size = {}", serviceName, + serviceInstances.size()); + } List exportedURLs = getExportedURLs(subscribedURL, serviceName, serviceInstances); @@ -389,7 +438,7 @@ public class DubboCloudRegistry extends FailbackRegistry { return metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME); } - private Set getServices(URL url) { + Set getServices(URL url) { Set subscribedServices = repository.getSubscribedServices(); // TODO Add the filter feature return subscribedServices; @@ -419,11 +468,6 @@ public class DubboCloudRegistry extends FailbackRegistry { listener.notify(subscribedURLs); } - private List getServiceInstances(Iterable serviceNames) { - return stream(serviceNames.spliterator(), false).map(this::getServiceInstances) - .flatMap(Collection::stream).collect(Collectors.toList()); - } - private List getServiceInstances(String serviceName) { return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList(); } @@ -471,27 +515,38 @@ public class DubboCloudRegistry extends FailbackRegistry { private void subscribeDubboMetadataServiceURLs(URL subscribedURL, NotifyListener listener) { - // Sync subscription subscribeDubboMetadataServiceURLs(subscribedURL, listener, getServiceName(subscribedURL)); // Sync subscription if (containsProviderCategory(subscribedURL)) { - registerServiceInstancesChangedListener(subscribedURL, - new ApplicationListener() { - private final URL url2subscribe = subscribedURL; + registerServiceInstancesChangedListener(subscribedURL, + new ServiceInstanceChangeListener() { + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE - 1; + } @Override - @Order(Ordered.LOWEST_PRECEDENCE - 1) public void onApplicationEvent( ServiceInstancesChangedEvent event) { String sourceServiceName = event.getServiceName(); + List serviceInstances = event + .getServiceInstances(); String serviceName = getServiceName(subscribedURL); if (Objects.equals(sourceServiceName, serviceName)) { + logger.debug( + "handle serviceInstanceChange of metadata service, serviceName = {}, subscribeUrl={}", + event.getServiceName(), + subscribedURL.getServiceKey()); + + // only update serviceInstances of the specified + // serviceName subscribeDubboMetadataServiceURLs(subscribedURL, listener, - sourceServiceName); + sourceServiceName, serviceInstances); } } @@ -509,34 +564,25 @@ public class DubboCloudRegistry extends FailbackRegistry { } private void subscribeDubboMetadataServiceURLs(URL subscribedURL, - NotifyListener listener, String serviceName) { + NotifyListener listener, String serviceName, + List serviceInstances) { String serviceInterface = subscribedURL.getServiceInterface(); String version = subscribedURL.getParameter(VERSION_KEY); String protocol = subscribedURL.getParameter(PROTOCOL_KEY); - List serviceInstances = getServiceInstances(serviceName); - List urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances, serviceInterface, version, protocol); notifyAllSubscribedURLs(subscribedURL, urls, listener); } - // private void subscribeDubboMetadataServiceURLs(URL subscribedURL, - // NotifyListener listener, Set serviceNames) { - // - // String serviceInterface = subscribedURL.getServiceInterface(); - // String version = subscribedURL.getParameter(VERSION_KEY); - // String protocol = subscribedURL.getParameter(PROTOCOL_KEY); - // - // List serviceInstances = getServiceInstances(serviceNames); - // - // List urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances, - // serviceInterface, version, protocol); - // - // notifyAllSubscribedURLs(subscribedURL, urls, listener); - // } + private void subscribeDubboMetadataServiceURLs(URL subscribedURL, + NotifyListener listener, String serviceName) { + List serviceInstances = getServiceInstances(serviceName); + subscribeDubboMetadataServiceURLs(subscribedURL, listener, serviceName, + serviceInstances); + } private boolean containsProviderCategory(URL subscribedURL) { String category = subscribedURL.getParameter(CATEGORY_KEY); @@ -557,6 +603,14 @@ public class DubboCloudRegistry extends FailbackRegistry { return ADMIN_PROTOCOL.equals(url.getProtocol()); } + public Map getUrlNotifyListenerMap() { + return urlNotifyListenerMap; + } + + public Map getReConnectJobMap() { + return reConnectJobMap; + } + protected boolean isDubboMetadataServiceURL(URL url) { return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface()); } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java new file mode 100644 index 00000000..74d917f0 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java @@ -0,0 +1,84 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.dubbo.registry; + +import java.util.Map; +import java.util.Set; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.NotifyListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * For re subscribe URL from provider. + * + * @author theonefx + */ +public class ReSubscribeMetadataJob implements Runnable { + + protected final Logger logger = LoggerFactory.getLogger(ReSubscribeMetadataJob.class); + + private final String serviceName; + + private final DubboCloudRegistry dubboCloudRegistry; + + private final int errorCounts; + + public ReSubscribeMetadataJob(String serviceName, + DubboCloudRegistry dubboCloudRegistry, int errorCounts) { + this.errorCounts = errorCounts; + this.serviceName = serviceName; + this.dubboCloudRegistry = dubboCloudRegistry; + } + + public ReSubscribeMetadataJob(String serviceName, + DubboCloudRegistry dubboCloudRegistry) { + this(serviceName, dubboCloudRegistry, 0); + } + + @Override + public void run() { + if (dubboCloudRegistry.getReConnectJobMap().get(serviceName) != this) { + return; + } + try { + logger.info("reSubscribe, serviceName = {}, count = {}", serviceName, + errorCounts); + for (Map.Entry entry : dubboCloudRegistry + .getUrlNotifyListenerMap().entrySet()) { + doRun(entry.getKey(), entry.getValue()); + } + dubboCloudRegistry.getReConnectJobMap().remove(serviceName); + } + catch (Exception e) { + logger.warn(String.format( + "reSubscribe failed, serviceName = %s, try refresh again", + serviceName), e); + dubboCloudRegistry.addReSubscribeMetadataJob(serviceName, errorCounts + 1); + } + } + + private void doRun(URL url, NotifyListener listener) { + Set serviceNames = dubboCloudRegistry.getServices(url); + + if (serviceNames.contains(serviceName)) { + dubboCloudRegistry.subscribeURLs(url, serviceNames, listener); + } + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java new file mode 100644 index 00000000..12321483 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.dubbo.registry; + +import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent; + +import org.springframework.context.ApplicationListener; +import org.springframework.core.Ordered; + +/** + * The interface of ServiceInstanceChange event Listener. + * + * @author theonefx + * @see ServiceInstancesChangedEvent + * @see Ordered + * @see ApplicationListener + */ +public interface ServiceInstanceChangeListener + extends ApplicationListener, Ordered { + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java index f8b7896d..d13c5179 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java @@ -100,7 +100,9 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory { default: registry = new DubboCloudRegistry(url, discoveryClient, dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy, - jsonUtils, dubboGenericServiceFactory, applicationContext); + jsonUtils, dubboGenericServiceFactory, applicationContext, + dubboCloudProperties.getMaxReSubscribeMetadataTimes(), + dubboCloudProperties.getReSubscribeMetadataIntervial()); break; }