1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Polish spring-cloud-incubator/spring-cloud-alibaba#623 : Bugfix for the missing Event handling

This commit is contained in:
mercyblitz 2019-08-01 11:15:28 +08:00
parent a666ddc8cb
commit a8b274e61b
2 changed files with 27 additions and 19 deletions

View File

@ -21,6 +21,7 @@ import org.apache.dubbo.registry.NotifyListener;
import com.alibaba.cloud.dubbo.env.DubboCloudProperties;
import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository;
import com.alibaba.cloud.dubbo.registry.AbstractSpringCloudRegistry;
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.discovery.NacosWatch;
@ -142,6 +143,10 @@ public class DubboServiceDiscoveryAutoConfiguration {
return;
}
ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName, serviceInstances);
if (logger.isInfoEnabled()) {
logger.info("The event of the service instances[name : {} , size : {}] change is about to be dispatched",
serviceName, serviceInstances.size());
}
applicationEventPublisher.publishEvent(event);
}

View File

@ -174,24 +174,14 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
* @param listener {@link NotifyListener}
*/
private void registerServiceInstancesChangedEventListener(URL url, NotifyListener listener) {
String listenerId = url.toIdentityString();
String listenerId = generateId(url);
if (registerListeners.add(listenerId)) {
applicationContext.addApplicationListener(new ApplicationListener<ServiceInstancesChangedEvent>() {
@Override
public void onApplicationEvent(ServiceInstancesChangedEvent event) {
if (event.isProcessed()) { // If processed, return immediately
return;
}
String serviceName = event.getServiceName();
Collection<ServiceInstance> serviceInstances = event.getServiceInstances();
if (logger.isInfoEnabled()) {
logger.info("The event of the service instances[name : {} , size: {}] change has been arrived",
serviceName, serviceInstances.size());
}
subscribeDubboServiceURLs(url, listener, serviceName, s -> serviceInstances);
// Mark event to be processed
event.processed();
subscribeDubboServiceURL(url, listener, serviceName, s -> serviceInstances);
}
});
}
@ -201,12 +191,17 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
Set<String> subscribedServices = repository.getSubscribedServices();
// Sync
subscribedServices.forEach(service -> subscribeDubboServiceURLs(url, listener, service, this::getServiceInstances));
subscribedServices.forEach(service -> subscribeDubboServiceURL(url, listener, service, this::getServiceInstances));
}
protected void subscribeDubboServiceURLs(URL url, NotifyListener listener, String serviceName,
protected void subscribeDubboServiceURL(URL url, NotifyListener listener, String serviceName,
Function<String, Collection<ServiceInstance>> serviceInstancesFunction) {
if (logger.isInfoEnabled()) {
logger.info("The Dubbo Service URL[ID : {}] is being subscribed for service[name : {}]",
generateId(url), serviceName);
}
DubboMetadataService dubboMetadataService = dubboMetadataConfigServiceProxy.getProxy(serviceName);
if (dubboMetadataService == null) { // If not found, try to initialize
@ -231,6 +226,10 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
List<URL> allSubscribedURLs = new LinkedList<>();
if (CollectionUtils.isEmpty(serviceInstances)) {
if (logger.isWarnEnabled()) {
logger.warn("There is no instance from service[name : {}], and then Dubbo Service[key : {}] will not be " +
"available , please make sure the further impact", serviceName, url.getServiceKey());
}
/**
* URLs with {@link RegistryConstants#EMPTY_PROTOCOL}
*/
@ -255,17 +254,21 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
}
});
if (logger.isDebugEnabled()) {
logger.debug("The subscribed URL[{}] will notify all URLs : {}", url, subscribedURLs);
}
allSubscribedURLs.addAll(subscribedURLs);
}
}
if (logger.isDebugEnabled()) {
logger.debug("The subscribed URL[{}] will notify all URLs : {}", url, allSubscribedURLs);
}
listener.notify(allSubscribedURLs);
}
private String generateId(URL url) {
return url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
}
private List<URL> emptyURLs(URL url) {
return asList(from(url).setProtocol(EMPTY_PROTOCOL).build());
}