1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Polish alibaba/spring-cloud-alibaba#1363 : Sync spring-cloud-starter-dubbo module

This commit is contained in:
mercyblitz 2020-04-14 12:02:31 +08:00
parent b099b8b5b2
commit c03d0706bc
6 changed files with 120 additions and 52 deletions

View File

@ -88,7 +88,7 @@
<slf4j-api.version>1.7.25</slf4j-api.version> <slf4j-api.version>1.7.25</slf4j-api.version>
<!-- Apache Dubbo --> <!-- Apache Dubbo -->
<dubbo.version>2.7.4.1</dubbo.version> <dubbo.version>2.7.6</dubbo.version>
<curator.version>4.0.1</curator.version> <curator.version>4.0.1</curator.version>
<!-- Apache RocketMQ --> <!-- Apache RocketMQ -->

View File

@ -30,7 +30,7 @@ import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_RETRIES; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_RETRIES;
/** /**
* {@link DubboTransported @DubboTransported} annotation indicates that the traditional * {@link DubboTransported @DubboTransported} annotation indicates that the traditional

View File

@ -29,7 +29,7 @@ import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import static com.alibaba.cloud.dubbo.registry.SpringCloudRegistryFactory.PROTOCOL; import static com.alibaba.cloud.dubbo.registry.SpringCloudRegistryFactory.PROTOCOL;
import static org.apache.dubbo.config.spring.util.PropertySourcesUtils.getPrefixedProperties; import static com.alibaba.spring.util.PropertySourcesUtils.getSubProperties;
/** /**
* Missing {@link SpringCloudRegistry} Property {@link Condition}. * Missing {@link SpringCloudRegistry} Property {@link Condition}.
@ -61,7 +61,7 @@ public class MissingSpringCloudRegistryConfigPropertyCondition
"'spring-cloud' protocol was found from 'dubbo.registry.address'"); "'spring-cloud' protocol was found from 'dubbo.registry.address'");
} }
Map<String, Object> properties = getPrefixedProperties( Map<String, Object> properties = getSubProperties(
environment.getPropertySources(), "dubbo.registries."); environment.getPropertySources(), "dubbo.registries.");
boolean found = properties.entrySet().stream().anyMatch(entry -> { boolean found = properties.entrySet().stream().anyMatch(entry -> {

View File

@ -35,7 +35,7 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
import static org.apache.dubbo.config.spring.util.PropertySourcesUtils.getPrefixedProperties; import static com.alibaba.spring.util.PropertySourcesUtils.getSubProperties;
/** /**
* Dubbo {@link WebApplicationType#NONE Non-Web Application} * Dubbo {@link WebApplicationType#NONE Non-Web Application}
@ -149,7 +149,7 @@ public class DubboNonWebApplicationEnvironmentPostProcessor
String restPort = null; String restPort = null;
Map<String, Object> subProperties = getPrefixedProperties( Map<String, Object> subProperties = getSubProperties(
environment.getPropertySources(), PROTOCOLS_PROPERTY_NAME_PREFIX); environment.getPropertySources(), PROTOCOLS_PROPERTY_NAME_PREFIX);
Properties properties = new Properties(); Properties properties = new Properties();

View File

@ -22,6 +22,7 @@ import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -189,7 +190,7 @@ public class DubboServiceMetadataRepository
// // // //
private static <K, V> Map<K, V> getMap(Map<String, Map<K, V>> repository, private static <K, V> Map<K, V> getMap(Map<String, Map<K, V>> repository,
String key) { String key) {
return getOrDefault(repository, key, newHashMap()); return getOrDefault(repository, key, newHashMap());
} }
@ -289,9 +290,11 @@ public class DubboServiceMetadataRepository
serviceName); serviceName);
} }
initSubscribedDubboMetadataService(serviceName); if (initSubscribedDubboMetadataService(serviceName)) {
// mark this service name having been initialized // mark this service name having been initialized
initializedServices.add(serviceName); initializedServices.add(serviceName);
}
} }
} }
} }
@ -300,12 +303,19 @@ public class DubboServiceMetadataRepository
* Remove the metadata and initialized service of Dubbo Services if no there is no * Remove the metadata and initialized service of Dubbo Services if no there is no
* service instance. * service instance.
* @param serviceName the service name * @param serviceName the service name
* @param url the meta service url
*/ */
public void removeMetadataAndInitializedService(String serviceName) { public void removeMetadataAndInitializedService(String serviceName, URL url) {
synchronized (monitor) { synchronized (monitor) {
initializedServices.remove(serviceName); initializedServices.remove(serviceName);
dubboRestServiceMetadataRepository.remove(serviceName); dubboRestServiceMetadataRepository.remove(serviceName);
subscribedDubboMetadataServiceURLs.remove(serviceName); // fix #1260 if the subscribedDubboMetadataServiceURLs removed failold meta
// information will be retained
if (DubboMetadataService.class.getName().equals(url.getServiceInterface())) {
String serviceKey = url.getServiceKey();
subscribedDubboMetadataServiceURLs.remove(serviceKey);
}
} }
} }
@ -333,7 +343,7 @@ public class DubboServiceMetadataRepository
} }
private void addDubboMetadataServiceURLsMetadata(Map<String, String> metadata, private void addDubboMetadataServiceURLsMetadata(Map<String, String> metadata,
List<URL> dubboMetadataServiceURLs) { List<URL> dubboMetadataServiceURLs) {
String dubboMetadataServiceURLsJSON = jsonUtils.toJSON(dubboMetadataServiceURLs); String dubboMetadataServiceURLsJSON = jsonUtils.toJSON(dubboMetadataServiceURLs);
metadata.put(DUBBO_METADATA_SERVICE_URLS_PROPERTY_NAME, metadata.put(DUBBO_METADATA_SERVICE_URLS_PROPERTY_NAME,
dubboMetadataServiceURLsJSON); dubboMetadataServiceURLsJSON);
@ -380,7 +390,7 @@ public class DubboServiceMetadataRepository
} }
public List<URL> findSubscribedDubboMetadataServiceURLs(String serviceName, public List<URL> findSubscribedDubboMetadataServiceURLs(String serviceName,
String group, String version, String protocol) { String group, String version, String protocol) {
String serviceKey = URL.buildKey(serviceName, group, version); String serviceKey = URL.buildKey(serviceName, group, version);
List<URL> urls = null; List<URL> urls = null;
@ -460,7 +470,7 @@ public class DubboServiceMetadataRepository
} }
public Integer getDubboProtocolPort(ServiceInstance serviceInstance, public Integer getDubboProtocolPort(ServiceInstance serviceInstance,
String protocol) { String protocol) {
String protocolProperty = getDubboProtocolPropertyName(protocol); String protocolProperty = getDubboProtocolPropertyName(protocol);
Map<String, String> metadata = serviceInstance.getMetadata(); Map<String, String> metadata = serviceInstance.getMetadata();
String protocolPort = metadata.get(protocolProperty); String protocolPort = metadata.get(protocolProperty);
@ -468,7 +478,7 @@ public class DubboServiceMetadataRepository
} }
public List<URL> getExportedURLs(String serviceInterface, String group, public List<URL> getExportedURLs(String serviceInterface, String group,
String version) { String version) {
String serviceKey = URL.buildKey(serviceInterface, group, version); String serviceKey = URL.buildKey(serviceInterface, group, version);
return allExportedURLs.getOrDefault(serviceKey, Collections.emptyList()); return allExportedURLs.getOrDefault(serviceKey, Collections.emptyList());
} }
@ -525,7 +535,7 @@ public class DubboServiceMetadataRepository
* @return {@link DubboRestServiceMetadata} if matched, or <code>null</code> * @return {@link DubboRestServiceMetadata} if matched, or <code>null</code>
*/ */
public DubboRestServiceMetadata get(String serviceName, public DubboRestServiceMetadata get(String serviceName,
RequestMetadata requestMetadata) { RequestMetadata requestMetadata) {
return match(dubboRestServiceMetadataRepository, serviceName, requestMetadata); return match(dubboRestServiceMetadataRepository, serviceName, requestMetadata);
} }
@ -542,7 +552,7 @@ public class DubboServiceMetadataRepository
} }
private <T> T match(Map<String, Map<RequestMetadataMatcher, T>> repository, private <T> T match(Map<String, Map<RequestMetadataMatcher, T>> repository,
String serviceName, RequestMetadata requestMetadata) { String serviceName, RequestMetadata requestMetadata) {
Map<RequestMetadataMatcher, T> map = repository.get(serviceName); Map<RequestMetadataMatcher, T> map = repository.get(serviceName);
@ -614,24 +624,47 @@ public class DubboServiceMetadataRepository
subscribedServices.remove(currentApplicationName); subscribedServices.remove(currentApplicationName);
} }
protected void initSubscribedDubboMetadataService(String serviceName) { protected Boolean initSubscribedDubboMetadataService(String serviceName) {
metadataServiceInstanceSelector.choose(discoveryClient.getInstances(serviceName)) // this need to judge whether the initialization is successful or not. The failed
.map(this::getDubboMetadataServiceURLs) // initialization will not change the initializedServices
.ifPresent(dubboMetadataServiceURLs -> { Optional<ServiceInstance> optionalServiceInstance = metadataServiceInstanceSelector
dubboMetadataServiceURLs.forEach(dubboMetadataServiceURL -> { .choose(discoveryClient.getInstances(serviceName));
try { if (!((Optional) optionalServiceInstance).isPresent()) {
initSubscribedDubboMetadataServiceURL( return false;
dubboMetadataServiceURL); }
initDubboMetadataServiceProxy(dubboMetadataServiceURL); ServiceInstance serviceInstance = optionalServiceInstance.get();
} if (null == serviceInstance) {
catch (Throwable e) { return false;
if (logger.isErrorEnabled()) { }
logger.error(e.getMessage(), e); List<URL> dubboMetadataServiceURLs = getDubboMetadataServiceURLs(serviceInstance);
} if (dubboMetadataServiceURLs.isEmpty()) {
} return false;
}); }
}); for (URL dubboMetadataServiceURL : dubboMetadataServiceURLs) {
try {
initSubscribedDubboMetadataServiceURL(dubboMetadataServiceURL);
DubboMetadataService dubboMetadataService = dubboMetadataConfigServiceProxy
.getProxy(serviceName);
if (dubboMetadataService == null) {
dubboMetadataService = initDubboMetadataServiceProxy(
dubboMetadataServiceURL);
}
if (dubboMetadataService == null) {
removeMetadataAndInitializedService(serviceName,
dubboMetadataServiceURL);
return false;
}
}
catch (Throwable e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
return false;
}
}
initDubboRestServiceMetadataRepository(serviceName); initDubboRestServiceMetadataRepository(serviceName);
return true;
} }
private void initSubscribedDubboMetadataServiceURL(URL dubboMetadataServiceURL) { private void initSubscribedDubboMetadataServiceURL(URL dubboMetadataServiceURL) {
@ -640,11 +673,13 @@ public class DubboServiceMetadataRepository
subscribedDubboMetadataServiceURLs.add(serviceKey, dubboMetadataServiceURL); subscribedDubboMetadataServiceURLs.add(serviceKey, dubboMetadataServiceURL);
} }
private void initDubboMetadataServiceProxy(URL dubboMetadataServiceURL) { private DubboMetadataService initDubboMetadataServiceProxy(
URL dubboMetadataServiceURL) {
String serviceName = dubboMetadataServiceURL.getParameter(APPLICATION_KEY); String serviceName = dubboMetadataServiceURL.getParameter(APPLICATION_KEY);
String version = dubboMetadataServiceURL.getParameter(VERSION_KEY); String version = dubboMetadataServiceURL.getParameter(VERSION_KEY);
// Initialize DubboMetadataService with right version // Initialize DubboMetadataService with right version
dubboMetadataConfigServiceProxy.initProxy(serviceName, version); return dubboMetadataConfigServiceProxy.initProxy(serviceName, version);
} }
@Override @Override

View File

@ -48,9 +48,11 @@ import static java.util.Collections.emptyList;
import static org.apache.dubbo.common.URLBuilder.from; import static org.apache.dubbo.common.URLBuilder.from;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE; import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL; import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
import static org.springframework.util.StringUtils.hasText; import static org.springframework.util.StringUtils.hasText;
@ -96,10 +98,10 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
private final ConfigurableApplicationContext applicationContext; private final ConfigurableApplicationContext applicationContext;
public AbstractSpringCloudRegistry(URL url, DiscoveryClient discoveryClient, public AbstractSpringCloudRegistry(URL url, DiscoveryClient discoveryClient,
DubboServiceMetadataRepository dubboServiceMetadataRepository, DubboServiceMetadataRepository dubboServiceMetadataRepository,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy, DubboMetadataServiceProxy dubboMetadataConfigServiceProxy,
JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory, JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory,
ConfigurableApplicationContext applicationContext) { ConfigurableApplicationContext applicationContext) {
super(url); super(url);
this.servicesLookupInterval = url this.servicesLookupInterval = url
.getParameter(SERVICES_LOOKUP_INTERVAL_PARAM_NAME, 60L); .getParameter(SERVICES_LOOKUP_INTERVAL_PARAM_NAME, 60L);
@ -161,6 +163,13 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
} }
else if (isDubboMetadataServiceURL(url)) { // for DubboMetadataService else if (isDubboMetadataServiceURL(url)) { // for DubboMetadataService
subscribeDubboMetadataServiceURLs(url, listener); subscribeDubboMetadataServiceURLs(url, listener);
if (from(url).getParameter(CATEGORY_KEY) != null
&& from(url).getParameter(CATEGORY_KEY).contains(PROVIDER)) {
// Fix #1259 and #753 Listene meta service change events to remove useless
// clients
registerServiceInstancesChangedEventListener(url, listener);
}
} }
else { // for general Dubbo Services else { // for general Dubbo Services
subscribeDubboServiceURLs(url, listener); subscribeDubboServiceURLs(url, listener);
@ -181,7 +190,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
* @param listener {@link NotifyListener} * @param listener {@link NotifyListener}
*/ */
private void registerServiceInstancesChangedEventListener(URL url, private void registerServiceInstancesChangedEventListener(URL url,
NotifyListener listener) { NotifyListener listener) {
String listenerId = generateId(url); String listenerId = generateId(url);
if (registerListeners.add(listenerId)) { if (registerListeners.add(listenerId)) {
applicationContext.addApplicationListener( applicationContext.addApplicationListener(
@ -208,8 +217,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
} }
protected void subscribeDubboServiceURL(URL url, NotifyListener listener, protected void subscribeDubboServiceURL(URL url, NotifyListener listener,
String serviceName, String serviceName,
Function<String, Collection<ServiceInstance>> serviceInstancesFunction) { Function<String, Collection<ServiceInstance>> serviceInstancesFunction) {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info( logger.info(
@ -228,10 +237,13 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
// Re-obtain the latest list of available metadata address here, ip or port may // Re-obtain the latest list of available metadata address here, ip or port may
// change. // change.
// by https://github.com/wangzihaogithub // by https://github.com/wangzihaogithub
dubboMetadataConfigServiceProxy.removeProxy(serviceName); // When the last service provider is closed, fix 1259while close the
repository.removeMetadataAndInitializedService(serviceName); // channelwhen up a new provider then repository.initializeMetadata(serviceName)
dubboGenericServiceFactory.destroy(serviceName); // will throw Exception.
repository.initializeMetadata(serviceName); // dubboMetadataConfigServiceProxy.removeProxy(serviceName);
// repository.removeMetadataAndInitializedService(serviceName);
// dubboGenericServiceFactory.destroy(serviceName);
// repository.initializeMetadata(serviceName);
if (CollectionUtils.isEmpty(serviceInstances)) { if (CollectionUtils.isEmpty(serviceInstances)) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn( logger.warn(
@ -239,6 +251,18 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
+ "available , please make sure the further impact", + "available , please make sure the further impact",
serviceName, url.getServiceKey()); serviceName, url.getServiceKey());
} }
if (isDubboMetadataServiceURL(url)) {
// if meta service change, and serviceInstances is zero, will clean up
// information about this client
dubboMetadataConfigServiceProxy.removeProxy(serviceName);
repository.removeMetadataAndInitializedService(serviceName, url);
dubboGenericServiceFactory.destroy(serviceName);
String listenerId = generateId(url);
// The metaservice will restart the new listener. It needs to be optimized
// to see whether the original listener can be reused.
this.registerListeners.remove(listenerId);
}
/** /**
* URLs with {@link RegistryConstants#EMPTY_PROTOCOL} * URLs with {@link RegistryConstants#EMPTY_PROTOCOL}
*/ */
@ -250,6 +274,11 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
listener.notify(allSubscribedURLs); listener.notify(allSubscribedURLs);
return; return;
} }
if (isDubboMetadataServiceURL(url)) {
// Prevent duplicate generation of DubboMetadataService
return;
}
repository.initializeMetadata(serviceName);
DubboMetadataService dubboMetadataService = dubboMetadataConfigServiceProxy DubboMetadataService dubboMetadataService = dubboMetadataConfigServiceProxy
.getProxy(serviceName); .getProxy(serviceName);
@ -301,7 +330,11 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
} }
private List<URL> emptyURLs(URL url) { private List<URL> emptyURLs(URL url) {
return asList(from(url).setProtocol(EMPTY_PROTOCOL).build()); // issue : When the last service provider is closed, the client still periodically
// connects to the last provider.n
// fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
return asList(from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY)
.build());
} }
private List<ServiceInstance> getServiceInstances(String serviceName) { private List<ServiceInstance> getServiceInstances(String serviceName) {
@ -322,7 +355,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
} }
private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService, private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService,
URL url) { URL url) {
String serviceInterface = url.getServiceInterface(); String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY); String group = url.getParameter(GROUP_KEY);
String version = url.getParameter(VERSION_KEY); String version = url.getParameter(VERSION_KEY);