From 56e07c6d2b7f7a5a9b4e242536796be9d40936ea Mon Sep 17 00:00:00 2001 From: yuhuangbin Date: Wed, 9 Sep 2020 15:21:37 +0800 Subject: [PATCH 1/5] fix issue#1701 --- .../NacosDiscoveryClientConfiguration.java | 9 +- .../cloud/nacos/discovery/NacosWatch.java | 125 +++++++++++------- 2 files changed, 78 insertions(+), 56 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java index 61234516..af722dda 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java @@ -18,8 +18,8 @@ package com.alibaba.cloud.nacos.discovery; import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.cloud.nacos.NacosServiceManager; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -31,7 +31,6 @@ import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.TaskScheduler; /** * @author xiaojing @@ -56,9 +55,9 @@ public class NacosDiscoveryClientConfiguration { @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true) - public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties, - ObjectProvider taskScheduler) { - return new NacosWatch(nacosDiscoveryProperties, taskScheduler); + public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager, + NacosDiscoveryProperties nacosDiscoveryProperties) { + return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties); } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java index b5dfc8e3..46019113 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -16,21 +16,28 @@ package com.alibaba.cloud.nacos.discovery; -import java.util.concurrent.ScheduledFuture; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.cloud.nacos.NacosServiceManager; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.Instance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.client.discovery.event.HeartbeatEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.SmartLifecycle; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author xiaojing @@ -39,44 +46,20 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl private static final Logger log = LoggerFactory.getLogger(NacosWatch.class); - private final NacosDiscoveryProperties properties; - - private final TaskScheduler taskScheduler; - - private final AtomicLong nacosWatchIndex = new AtomicLong(0); + private Map listenerMap = new ConcurrentHashMap<>(16); private final AtomicBoolean running = new AtomicBoolean(false); private ApplicationEventPublisher publisher; - private ScheduledFuture watchFuture; + private NacosServiceManager nacosServiceManager; - public NacosWatch(NacosDiscoveryProperties properties) { - this(properties, getTaskScheduler()); - } + private final NacosDiscoveryProperties properties; - public NacosWatch(NacosDiscoveryProperties properties, TaskScheduler taskScheduler) { + public NacosWatch(NacosServiceManager nacosServiceManager, + NacosDiscoveryProperties properties) { + this.nacosServiceManager = nacosServiceManager; this.properties = properties; - this.taskScheduler = taskScheduler; - } - - /** - * The constructor with {@link NacosDiscoveryProperties} bean and the optional. - * {@link TaskScheduler} bean - * @param properties {@link NacosDiscoveryProperties} bean - * @param taskScheduler the optional {@link TaskScheduler} bean - * @since 2.2.0 - */ - public NacosWatch(NacosDiscoveryProperties properties, - ObjectProvider taskScheduler) { - this(properties, taskScheduler.getIfAvailable(NacosWatch::getTaskScheduler)); - } - - private static ThreadPoolTaskScheduler getTaskScheduler() { - ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); - taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler"); - taskScheduler.initialize(); - return taskScheduler; } @Override @@ -98,19 +81,67 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl @Override public void start() { if (this.running.compareAndSet(false, true)) { - this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( - this::nacosServicesWatch, this.properties.getWatchDelay()); + EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), + event -> new EventListener() { + @Override + public void onEvent(Event event) { + if (event instanceof NamingEvent) { + List instances = ((NamingEvent) event) + .getInstances(); + Instance currentInstance = selectCurrentInstance( + instances); + if (Objects.nonNull(currentInstance)) { + resetIfNeeded(currentInstance); + publisher.publishEvent( + new HeartbeatEvent(this, currentInstance)); + } + } + } + }); + + NamingService namingService = nacosServiceManager + .getNamingService(properties.getNacosProperties()); + try { + namingService.subscribe(properties.getService(), properties.getGroup(), + Arrays.asList(properties.getClusterName()), eventListener); + } + catch (Exception e) { + log.error("namingService subscribe failed, properties:{}", properties, e); + } } } + private String buildKey() { + return String.join(":", properties.getService(), properties.getGroup()); + } + + private void resetIfNeeded(Instance instance) { + if (!properties.getMetadata().equals(instance.getMetadata())) { + properties.setMetadata(instance.getMetadata()); + } + } + + private Instance selectCurrentInstance(List instances) { + return instances.stream() + .filter(instance -> properties.getIp().equals(instance.getIp()) + && properties.getPort() == instance.getPort()) + .findFirst().orElse(null); + } + @Override public void stop() { - if (this.running.compareAndSet(true, false) && this.watchFuture != null) { - // shutdown current user-thread, - // then the other daemon-threads will terminate automatic. - ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown(); - - this.watchFuture.cancel(true); + if (this.running.compareAndSet(true, false)) { + EventListener eventListener = listenerMap.get(buildKey()); + NamingService namingService = nacosServiceManager + .getNamingService(properties.getNacosProperties()); + try { + namingService.unsubscribe(properties.getService(), properties.getGroup(), + Arrays.asList(properties.getClusterName()), eventListener); + } + catch (NacosException e) { + log.error("namingService unsubscribe failed, properties:{}", properties, + e); + } } } @@ -124,12 +155,4 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl return 0; } - public void nacosServicesWatch() { - - // nacos doesn't support watch now , publish an event every 30 seconds. - this.publisher.publishEvent( - new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); - - } - } From bd2cd3f39468cae824d2b1a65e6882eae9f58cdd Mon Sep 17 00:00:00 2001 From: yuhuangbin Date: Wed, 9 Sep 2020 19:18:45 +0800 Subject: [PATCH 2/5] nacos watch enhance --- .../alibaba/cloud/nacos/discovery/NacosWatch.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java index 46019113..d01089eb 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -19,7 +19,7 @@ package com.alibaba.cloud.nacos.discovery; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +41,7 @@ import org.springframework.context.SmartLifecycle; /** * @author xiaojing + * @author yuhuangbin */ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle { @@ -88,13 +89,13 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl if (event instanceof NamingEvent) { List instances = ((NamingEvent) event) .getInstances(); - Instance currentInstance = selectCurrentInstance( + Optional instanceOptional = selectCurrentInstance( instances); - if (Objects.nonNull(currentInstance)) { + instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); publisher.publishEvent( new HeartbeatEvent(this, currentInstance)); - } + }); } } }); @@ -121,11 +122,11 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl } } - private Instance selectCurrentInstance(List instances) { + private Optional selectCurrentInstance(List instances) { return instances.stream() .filter(instance -> properties.getIp().equals(instance.getIp()) && properties.getPort() == instance.getPort()) - .findFirst().orElse(null); + .findFirst(); } @Override From fec6c6554c8782e952905da8c37cb6864d361ef8 Mon Sep 17 00:00:00 2001 From: yuhuangbin Date: Thu, 10 Sep 2020 09:54:30 +0800 Subject: [PATCH 3/5] fix --- .../java/com/alibaba/cloud/nacos/discovery/NacosWatch.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java index d01089eb..3371e51d 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; @@ -51,6 +52,8 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong nacosWatchIndex = new AtomicLong(0); + private ApplicationEventPublisher publisher; private NacosServiceManager nacosServiceManager; @@ -93,9 +96,9 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl instances); instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); - publisher.publishEvent( - new HeartbeatEvent(this, currentInstance)); }); + publisher.publishEvent( + new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); } } }); From f60cb805c8217c10d0b036d6c425bc591f441f90 Mon Sep 17 00:00:00 2001 From: yuhuangbin Date: Fri, 11 Sep 2020 15:32:40 +0800 Subject: [PATCH 4/5] fix --- .../java/com/alibaba/cloud/nacos/discovery/NacosWatch.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java index 3371e51d..d01089eb 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; @@ -52,8 +51,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl private final AtomicBoolean running = new AtomicBoolean(false); - private final AtomicLong nacosWatchIndex = new AtomicLong(0); - private ApplicationEventPublisher publisher; private NacosServiceManager nacosServiceManager; @@ -96,9 +93,9 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl instances); instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); + publisher.publishEvent( + new HeartbeatEvent(this, currentInstance)); }); - publisher.publishEvent( - new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); } } }); From b35c05e4fc8affca02891d40de6ffe56b74477c8 Mon Sep 17 00:00:00 2001 From: yuhuangbin Date: Fri, 11 Sep 2020 16:23:02 +0800 Subject: [PATCH 5/5] fix --- .../java/com/alibaba/cloud/nacos/discovery/NacosWatch.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java index d01089eb..e572549a 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosServiceManager; @@ -51,6 +52,8 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong nacosWatchIndex = new AtomicLong(0); + private ApplicationEventPublisher publisher; private NacosServiceManager nacosServiceManager; @@ -93,9 +96,9 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl instances); instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); - publisher.publishEvent( - new HeartbeatEvent(this, currentInstance)); }); + publisher.publishEvent( + new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement())); } } });