1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00
NacosWatch to update local metadata
This commit is contained in:
Mercy Ma 2020-09-11 17:05:50 +08:00 committed by GitHub
commit f10e750cbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 55 deletions

View File

@ -18,8 +18,8 @@ package com.alibaba.cloud.nacos.discovery;
import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -31,7 +31,6 @@ import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
/** /**
* @author xiaojing * @author xiaojing
@ -56,9 +55,9 @@ public class NacosDiscoveryClientConfiguration {
@ConditionalOnMissingBean @ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
matchIfMissing = true) matchIfMissing = true)
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties, public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
ObjectProvider<TaskScheduler> taskScheduler) { NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosDiscoveryProperties, taskScheduler); return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
} }
} }

View File

@ -16,67 +16,54 @@
package com.alibaba.cloud.nacos.discovery; package com.alibaba.cloud.nacos.discovery;
import java.util.concurrent.ScheduledFuture; import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent; import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/** /**
* @author xiaojing * @author xiaojing
* @author yuhuangbin
*/ */
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle { public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class); private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
private final NacosDiscoveryProperties properties; private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
private final TaskScheduler taskScheduler;
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
private ApplicationEventPublisher publisher; private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture; private NacosServiceManager nacosServiceManager;
public NacosWatch(NacosDiscoveryProperties properties) { private final NacosDiscoveryProperties properties;
this(properties, getTaskScheduler());
}
public NacosWatch(NacosDiscoveryProperties properties, TaskScheduler taskScheduler) { public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties; this.properties = properties;
this.taskScheduler = taskScheduler;
}
/**
* The constructor with {@link NacosDiscoveryProperties} bean and the optional.
* {@link TaskScheduler} bean
* @param properties {@link NacosDiscoveryProperties} bean
* @param taskScheduler the optional {@link TaskScheduler} bean
* @since 2.2.0
*/
public NacosWatch(NacosDiscoveryProperties properties,
ObjectProvider<TaskScheduler> taskScheduler) {
this(properties, taskScheduler.getIfAvailable(NacosWatch::getTaskScheduler));
}
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
} }
@Override @Override
@ -98,19 +85,67 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
@Override @Override
public void start() { public void start() {
if (this.running.compareAndSet(false, true)) { if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
this::nacosServicesWatch, this.properties.getWatchDelay()); event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event)
.getInstances();
Optional<Instance> instanceOptional = selectCurrentInstance(
instances);
instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance);
});
publisher.publishEvent(
new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement()));
} }
} }
});
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try {
namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
}
}
private String buildKey() {
return String.join(":", properties.getService(), properties.getGroup());
}
private void resetIfNeeded(Instance instance) {
if (!properties.getMetadata().equals(instance.getMetadata())) {
properties.setMetadata(instance.getMetadata());
}
}
private Optional<Instance> selectCurrentInstance(List<Instance> instances) {
return instances.stream()
.filter(instance -> properties.getIp().equals(instance.getIp())
&& properties.getPort() == instance.getPort())
.findFirst();
}
@Override @Override
public void stop() { public void stop() {
if (this.running.compareAndSet(true, false) && this.watchFuture != null) { if (this.running.compareAndSet(true, false)) {
// shutdown current user-thread, EventListener eventListener = listenerMap.get(buildKey());
// then the other daemon-threads will terminate automatic. NamingService namingService = nacosServiceManager
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown(); .getNamingService(properties.getNacosProperties());
try {
this.watchFuture.cancel(true); namingService.unsubscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (NacosException e) {
log.error("namingService unsubscribe failed, properties:{}", properties,
e);
}
} }
} }
@ -124,12 +159,4 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
return 0; return 0;
} }
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
} }