From 18c4fccebf6d09c04cd79805365877e541aad731 Mon Sep 17 00:00:00 2001 From: mercyblitz Date: Wed, 17 Apr 2019 13:33:55 +0800 Subject: [PATCH] Polish spring-cloud-incubator/spring-cloud-alibaba#559 : Optimize Scheduler tasks --- .../registry/AbstractSpringCloudRegistry.java | 97 ++++++------------- .../dubbo/registry/SpringCloudRegistry.java | 18 ---- 2 files changed, 29 insertions(+), 86 deletions(-) diff --git a/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/AbstractSpringCloudRegistry.java b/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/AbstractSpringCloudRegistry.java index 06855d02..b60c8247 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/AbstractSpringCloudRegistry.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/AbstractSpringCloudRegistry.java @@ -31,8 +31,7 @@ import org.springframework.cloud.alibaba.dubbo.util.JSONUtils; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; -import java.util.Collection; -import java.util.LinkedHashSet; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -40,7 +39,6 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -50,7 +48,6 @@ import static org.apache.dubbo.common.Constants.PROTOCOL_KEY; import static org.apache.dubbo.common.Constants.PROVIDER_SIDE; import static org.apache.dubbo.common.Constants.SIDE_KEY; import static org.apache.dubbo.common.Constants.VERSION_KEY; -import static org.springframework.util.ObjectUtils.isEmpty; import static org.springframework.util.StringUtils.hasText; /** @@ -67,6 +64,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { protected static final String DUBBO_METADATA_SERVICE_CLASS_NAME = DubboMetadataService.class.getName(); + private static final Set schedulerTasks = new HashSet<>(); + protected final Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -82,6 +81,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { private final JSONUtils jsonUtils; + protected final ScheduledExecutorService servicesLookupScheduler; public AbstractSpringCloudRegistry(URL url, @@ -159,9 +159,14 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { doSubscribeDubboServiceURLs(url, listener); - schedule(() -> { - doSubscribeDubboServiceURLs(url, listener); - }); + submitSchedulerTaskIfAbsent(url, listener); + } + + private void submitSchedulerTaskIfAbsent(URL url, NotifyListener listener) { + String taskId = url.toIdentityString(); + if (schedulerTasks.add(taskId)) { + schedule(() -> doSubscribeDubboServiceURLs(url, listener)); + } } protected void doSubscribeDubboServiceURLs(URL url, NotifyListener listener) { @@ -194,8 +199,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { }); if (logger.isDebugEnabled()) { - logger.debug("The subscribed URLs[ service name : {} , protocol : {}] will be notified : {}" - , serviceName, protocol, subscribedURLs); + logger.debug("The subscribed URL[{}] will notify all URLs : {}", url, subscribedURLs); } allSubscribedURLs.addAll(subscribedURLs); @@ -205,6 +209,22 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { }); } + private List getServiceInstances(String serviceName) { + return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList(); + } + + private List doGetServiceInstances(String serviceName) { + List serviceInstances = emptyList(); + try { + serviceInstances = discoveryClient.getInstances(serviceName); + } catch (Exception e) { + if (logger.isErrorEnabled()) { + logger.error(e.getMessage(), e); + } + } + return serviceInstances; + } + private List getExportedURLs(DubboMetadataService dubboMetadataService, URL url) { String serviceInterface = url.getServiceInterface(); String group = url.getParameter(GROUP_KEY); @@ -220,7 +240,6 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { ).collect(Collectors.toList()); } - private void subscribeDubboMetadataServiceURLs(URL url, NotifyListener listener) { String serviceInterface = url.getServiceInterface(); String group = url.getParameter(GROUP_KEY); @@ -248,16 +267,6 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { } } - private Set filterServiceNames(Collection serviceNames) { - return new LinkedHashSet<>(filter(serviceNames, this::supports)); - } - - protected abstract boolean supports(String serviceName); - - protected final Set getAllServiceNames() { - return new LinkedHashSet<>(discoveryClient.getServices()); - } - protected boolean isAdminURL(URL url) { return Constants.ADMIN_PROTOCOL.equals(url.getProtocol()); } @@ -266,56 +275,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface()); } - /** - * Get the service names for Dubbo OPS - * - * @param url {@link URL} - * @return non-null - */ - protected Set getServiceNamesForOps(URL url) { - Set serviceNames = getAllServiceNames(); - return filterServiceNames(serviceNames); - } - - private void doSubscribe(final URL url, final NotifyListener listener, final Collection serviceNames) { - - subscribe(url, listener, serviceNames); - - schedule(() -> { - subscribe(url, listener, serviceNames); - }); - } - protected ScheduledFuture schedule(Runnable runnable) { return this.servicesLookupScheduler.scheduleAtFixedRate(runnable, servicesLookupInterval, servicesLookupInterval, TimeUnit.SECONDS); } - - protected List getServiceInstances(String serviceName) { - return hasText(serviceName) ? discoveryClient.getInstances(serviceName) : emptyList(); - } - - private void subscribe(final URL url, final NotifyListener listener, final Collection serviceNames) { - for (String serviceName : serviceNames) { - List serviceInstances = getServiceInstances(serviceName); - if (!isEmpty(serviceInstances)) { - notifySubscriber(url, listener, serviceInstances); - } - } - } - - /** - * Notify the Healthy {@link ServiceInstance service instance} to subscriber. - * - * @param url {@link URL} - * @param listener {@link NotifyListener} - * @param serviceInstances all {@link ServiceInstance instances} - */ - protected abstract void notifySubscriber(URL url, NotifyListener listener, List serviceInstances); - - protected Collection filter(Collection collection, Predicate filter) { - return collection.stream() - .filter(filter) - .collect(Collectors.toList()); - } } diff --git a/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/SpringCloudRegistry.java b/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/SpringCloudRegistry.java index d3e04f47..4928d40e 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/SpringCloudRegistry.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/org/springframework/cloud/alibaba/dubbo/registry/SpringCloudRegistry.java @@ -17,18 +17,14 @@ package org.springframework.cloud.alibaba.dubbo.registry; import org.apache.dubbo.common.URL; -import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.RegistryFactory; import org.springframework.cloud.alibaba.dubbo.metadata.repository.DubboServiceMetadataRepository; import org.springframework.cloud.alibaba.dubbo.service.DubboMetadataServiceProxy; import org.springframework.cloud.alibaba.dubbo.util.JSONUtils; -import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; /** * Dubbo {@link RegistryFactory} uses Spring Cloud Service Registration abstraction, whose protocol is "spring-cloud" @@ -57,18 +53,4 @@ public class SpringCloudRegistry extends AbstractSpringCloudRegistry { protected void doUnregister0(URL url) { dubboServiceMetadataRepository.unexportURL(url); } - - @Override - protected boolean supports(String serviceName) { - return dubboServiceMetadataRepository.isSubscribedService(serviceName); - } - - @Override - protected void notifySubscriber(URL url, NotifyListener listener, List serviceInstances) { - List urls = serviceInstances.stream() - .map(dubboServiceMetadataRepository::getDubboMetadataServiceURLs) - .flatMap(List::stream) - .collect(Collectors.toList()); - notify(url, listener, urls); - } }