From 3d5eaefe1c3151e79cdc93673b85f77fd817b3ab Mon Sep 17 00:00:00 2001 From: theonefx Date: Wed, 12 May 2021 09:36:33 +0800 Subject: [PATCH 1/3] resolve @order not effect when handle ServiceInstanceChangedEvent --- .../dubbo/registry/DubboCloudRegistry.java | 75 ++++++++++--------- .../ServiceInstanceChangeListener.java | 35 +++++++++ 2 files changed, 73 insertions(+), 37 deletions(-) create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java index 04d9c99e..81464733 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java @@ -45,12 +45,10 @@ import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; import org.springframework.util.CollectionUtils; import static java.lang.String.format; import static java.util.Collections.emptyList; -import static java.util.stream.StreamSupport.stream; import static org.apache.dubbo.common.URLBuilder.from; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY; @@ -188,18 +186,24 @@ public class DubboCloudRegistry extends FailbackRegistry { // Async subscription registerServiceInstancesChangedListener(url, - new ApplicationListener() { - - private final URL url2subscribe = url; + new ServiceInstanceChangeListener() { + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } @Override - @Order public void onApplicationEvent(ServiceInstancesChangedEvent event) { + Set serviceNames = getServices(url); String serviceName = event.getServiceName(); if (serviceNames.contains(serviceName)) { + logger.debug( + "handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}", + event.getServiceName(), url.getServiceKey()); subscribeURLs(url, serviceNames, listener); } } @@ -419,11 +423,6 @@ public class DubboCloudRegistry extends FailbackRegistry { listener.notify(subscribedURLs); } - private List getServiceInstances(Iterable serviceNames) { - return stream(serviceNames.spliterator(), false).map(this::getServiceInstances) - .flatMap(Collection::stream).collect(Collectors.toList()); - } - private List getServiceInstances(String serviceName) { return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList(); } @@ -471,27 +470,38 @@ public class DubboCloudRegistry extends FailbackRegistry { private void subscribeDubboMetadataServiceURLs(URL subscribedURL, NotifyListener listener) { - // Sync subscription - subscribeDubboMetadataServiceURLs(subscribedURL, listener, - getServiceName(subscribedURL)); - // Sync subscription if (containsProviderCategory(subscribedURL)) { - registerServiceInstancesChangedListener(subscribedURL, - new ApplicationListener() { - private final URL url2subscribe = subscribedURL; + subscribeDubboMetadataServiceURLs(subscribedURL, listener, + getServiceName(subscribedURL)); + + registerServiceInstancesChangedListener(subscribedURL, + new ServiceInstanceChangeListener() { + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE - 1; + } @Override - @Order(Ordered.LOWEST_PRECEDENCE - 1) public void onApplicationEvent( ServiceInstancesChangedEvent event) { String sourceServiceName = event.getServiceName(); + List serviceInstances = event + .getServiceInstances(); String serviceName = getServiceName(subscribedURL); if (Objects.equals(sourceServiceName, serviceName)) { + logger.debug( + "handle serviceInstanceChange of metadata service, serviceName = {}, subscribeUrl={}", + event.getServiceName(), + subscribedURL.getServiceKey()); + + // only update serviceInstances of the specified + // serviceName subscribeDubboMetadataServiceURLs(subscribedURL, listener, - sourceServiceName); + sourceServiceName, serviceInstances); } } @@ -509,34 +519,25 @@ public class DubboCloudRegistry extends FailbackRegistry { } private void subscribeDubboMetadataServiceURLs(URL subscribedURL, - NotifyListener listener, String serviceName) { + NotifyListener listener, String serviceName, + List serviceInstances) { String serviceInterface = subscribedURL.getServiceInterface(); String version = subscribedURL.getParameter(VERSION_KEY); String protocol = subscribedURL.getParameter(PROTOCOL_KEY); - List serviceInstances = getServiceInstances(serviceName); - List urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances, serviceInterface, version, protocol); notifyAllSubscribedURLs(subscribedURL, urls, listener); } - // private void subscribeDubboMetadataServiceURLs(URL subscribedURL, - // NotifyListener listener, Set serviceNames) { - // - // String serviceInterface = subscribedURL.getServiceInterface(); - // String version = subscribedURL.getParameter(VERSION_KEY); - // String protocol = subscribedURL.getParameter(PROTOCOL_KEY); - // - // List serviceInstances = getServiceInstances(serviceNames); - // - // List urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances, - // serviceInterface, version, protocol); - // - // notifyAllSubscribedURLs(subscribedURL, urls, listener); - // } + private void subscribeDubboMetadataServiceURLs(URL subscribedURL, + NotifyListener listener, String serviceName) { + List serviceInstances = getServiceInstances(serviceName); + subscribeDubboMetadataServiceURLs(subscribedURL, listener, serviceName, + serviceInstances); + } private boolean containsProviderCategory(URL subscribedURL) { String category = subscribedURL.getParameter(CATEGORY_KEY); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java new file mode 100644 index 00000000..12321483 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ServiceInstanceChangeListener.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.dubbo.registry; + +import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent; + +import org.springframework.context.ApplicationListener; +import org.springframework.core.Ordered; + +/** + * The interface of ServiceInstanceChange event Listener. + * + * @author theonefx + * @see ServiceInstancesChangedEvent + * @see Ordered + * @see ApplicationListener + */ +public interface ServiceInstanceChangeListener + extends ApplicationListener, Ordered { + +} From f8161141224157aea0bcc08b36332aa2c2e58a80 Mon Sep 17 00:00:00 2001 From: theonefx Date: Wed, 12 May 2021 20:27:50 +0800 Subject: [PATCH 2/3] re subscribe when failed --- .../cloud/dubbo/env/DubboCloudProperties.java | 20 +++++ .../dubbo/registry/DubboCloudRegistry.java | 65 +++++++++++++-- .../registry/ReSubscribeMetadataJob.java | 82 +++++++++++++++++++ .../registry/SpringCloudRegistryFactory.java | 4 +- 4 files changed, 162 insertions(+), 9 deletions(-) create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java index 577d19e5..5135c5a8 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java @@ -51,6 +51,10 @@ public class DubboCloudProperties { private String registryType = DUBBO_CLOUD_REGISTRY_PROPERTY_VALUE; + private int maxReSubscribeMetadataTimes = 1000; + + private int reSubscribeMetadataIntervial = 5; + public String getSubscribedServices() { return subscribedServices; } @@ -91,4 +95,20 @@ public class DubboCloudProperties { this.registryType = registryType; } + public int getMaxReSubscribeMetadataTimes() { + return maxReSubscribeMetadataTimes; + } + + public void setMaxReSubscribeMetadataTimes(int maxReSubscribeMetadataTimes) { + this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes; + } + + public int getReSubscribeMetadataIntervial() { + return reSubscribeMetadataIntervial; + } + + public void setReSubscribeMetadataIntervial(int reSubscribeMetadataIntervial) { + this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial; + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java index 81464733..b7768c30 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -108,11 +111,23 @@ public class DubboCloudRegistry extends FailbackRegistry { private final String currentApplicationName; + private final Map urlNotifyListenerMap = new ConcurrentHashMap<>(); + + private final Map reConnectJobMap = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor( + 2); + + private final int maxReSubscribeMetadataTimes; + + private final int reSubscribeMetadataIntervial; + public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient, DubboServiceMetadataRepository repository, DubboMetadataServiceProxy dubboMetadataConfigServiceProxy, JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory, - ConfigurableApplicationContext applicationContext) { + ConfigurableApplicationContext applicationContext, + int maxReSubscribeMetadataTimes, int reSubscribeMetadataIntervial) { super(url); this.servicesLookupInterval = url @@ -125,6 +140,11 @@ public class DubboCloudRegistry extends FailbackRegistry { this.applicationContext = applicationContext; this.dubboMetadataUtils = getBean(DubboMetadataUtils.class); this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName(); + this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes; + this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial; + + reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES); + reConnectPool.allowCoreThreadTimeOut(true); } private T getBean(Class beanClass) { @@ -175,6 +195,7 @@ public class DubboCloudRegistry extends FailbackRegistry { } else { // for general Dubbo Services subscribeURLs(url, listener); + urlNotifyListenerMap.put(url, listener); } } @@ -204,7 +225,16 @@ public class DubboCloudRegistry extends FailbackRegistry { logger.debug( "handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}", event.getServiceName(), url.getServiceKey()); - subscribeURLs(url, serviceNames, listener); + try { + subscribeURLs(url, serviceNames, listener); + reConnectJobMap.remove(serviceName); + } + catch (Exception e) { + logger.warn(String.format( + "subscribeURLs failed, serviceName = %s, try reSubscribe again", + serviceName), e); + addReSubscribeMetadataJob(serviceName, 0); + } } } @@ -216,8 +246,19 @@ public class DubboCloudRegistry extends FailbackRegistry { }); } - private void subscribeURLs(URL url, Set serviceNames, - NotifyListener listener) { + void addReSubscribeMetadataJob(String serviceName, int count) { + if (count > maxReSubscribeMetadataTimes) { + logger.error( + "reSubscribe failed too many times, serviceName = {}, count = {}", + serviceName, count); + return; + } + ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, this, count); + reConnectJobMap.put(serviceName, job); + reConnectPool.schedule(job, reSubscribeMetadataIntervial, TimeUnit.SECONDS); + } + + void subscribeURLs(URL url, Set serviceNames, NotifyListener listener) { List subscribedURLs = new LinkedList<>(); @@ -393,7 +434,7 @@ public class DubboCloudRegistry extends FailbackRegistry { return metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME); } - private Set getServices(URL url) { + Set getServices(URL url) { Set subscribedServices = repository.getSubscribedServices(); // TODO Add the filter feature return subscribedServices; @@ -470,12 +511,12 @@ public class DubboCloudRegistry extends FailbackRegistry { private void subscribeDubboMetadataServiceURLs(URL subscribedURL, NotifyListener listener) { + subscribeDubboMetadataServiceURLs(subscribedURL, listener, + getServiceName(subscribedURL)); + // Sync subscription if (containsProviderCategory(subscribedURL)) { - subscribeDubboMetadataServiceURLs(subscribedURL, listener, - getServiceName(subscribedURL)); - registerServiceInstancesChangedListener(subscribedURL, new ServiceInstanceChangeListener() { @@ -558,6 +599,14 @@ public class DubboCloudRegistry extends FailbackRegistry { return ADMIN_PROTOCOL.equals(url.getProtocol()); } + public Map getUrlNotifyListenerMap() { + return urlNotifyListenerMap; + } + + public Map getReConnectJobMap() { + return reConnectJobMap; + } + protected boolean isDubboMetadataServiceURL(URL url) { return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface()); } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java new file mode 100644 index 00000000..68e0f42c --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java @@ -0,0 +1,82 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.dubbo.registry; + +import java.util.Map; +import java.util.Set; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.NotifyListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * For re subscribe URL from provider. + * + * @author theonefx + */ +public class ReSubscribeMetadataJob implements Runnable { + + protected final Logger logger = LoggerFactory.getLogger(ReSubscribeMetadataJob.class); + + private final String serviceName; + + private final DubboCloudRegistry dubboCloudRegistry; + + private final int errorCounts; + + public ReSubscribeMetadataJob(String serviceName, + DubboCloudRegistry dubboCloudRegistry, int errorCounts) { + this.errorCounts = errorCounts; + this.serviceName = serviceName; + this.dubboCloudRegistry = dubboCloudRegistry; + } + + public ReSubscribeMetadataJob(String serviceName, + DubboCloudRegistry dubboCloudRegistry) { + this(serviceName, dubboCloudRegistry, 0); + } + + @Override + public void run() { + if (dubboCloudRegistry.getReConnectJobMap().get(serviceName) != this) { + return; + } + try { + for (Map.Entry entry : dubboCloudRegistry + .getUrlNotifyListenerMap().entrySet()) { + doRun(entry.getKey(), entry.getValue()); + } + dubboCloudRegistry.getReConnectJobMap().remove(serviceName); + } + catch (Exception e) { + logger.warn(String.format( + "reSubscribe failed, serviceName = %s, try refresh again", + serviceName), e); + dubboCloudRegistry.addReSubscribeMetadataJob(serviceName, errorCounts + 1); + } + } + + private void doRun(URL url, NotifyListener listener) { + Set serviceNames = dubboCloudRegistry.getServices(url); + + if (serviceNames.contains(serviceName)) { + dubboCloudRegistry.subscribeURLs(url, serviceNames, listener); + } + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java index f8b7896d..d13c5179 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java @@ -100,7 +100,9 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory { default: registry = new DubboCloudRegistry(url, discoveryClient, dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy, - jsonUtils, dubboGenericServiceFactory, applicationContext); + jsonUtils, dubboGenericServiceFactory, applicationContext, + dubboCloudProperties.getMaxReSubscribeMetadataTimes(), + dubboCloudProperties.getReSubscribeMetadataIntervial()); break; } From cf71b7aa26764c98009fbf5cd2ef34963e111e02 Mon Sep 17 00:00:00 2001 From: theonefx Date: Wed, 12 May 2021 22:23:24 +0800 Subject: [PATCH 3/3] add more log --- .../com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java | 4 ++++ .../alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java index b7768c30..cce75ba3 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java @@ -297,6 +297,10 @@ public class DubboCloudRegistry extends FailbackRegistry { serviceName)); } } + else { + logger.debug("subscribe from serviceName = {}, size = {}", serviceName, + serviceInstances.size()); + } List exportedURLs = getExportedURLs(subscribedURL, serviceName, serviceInstances); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java index 68e0f42c..74d917f0 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java @@ -57,6 +57,8 @@ public class ReSubscribeMetadataJob implements Runnable { return; } try { + logger.info("reSubscribe, serviceName = {}, count = {}", serviceName, + errorCounts); for (Map.Entry entry : dubboCloudRegistry .getUrlNotifyListenerMap().entrySet()) { doRun(entry.getKey(), entry.getValue());