diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/DefaultMetadataParamsFilter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/DefaultMetadataParamsFilter.java
new file mode 100644
index 00000000..53d090f1
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/DefaultMetadataParamsFilter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.metadata;
+
+import org.apache.dubbo.common.extension.Activate;
+
+import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
+import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
+import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
+import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
+import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
+import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
+
+/**
+ * Copy from org.apache.dubbo.metadata.DefaultMetadataParamsFilter.
+ *
+ * @author theonefx
+ */
+@Activate
+public class DefaultMetadataParamsFilter implements MetadataParamsFilter {
+
+ @Override
+ public String[] serviceParamsIncluded() {
+ return new String[] { CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY,
+ CONNECTIONS_KEY, DEPRECATED_KEY, GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY,
+ PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY, WEIGHT_KEY,
+ DUBBO_VERSION_KEY, RELEASE_KEY };
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/MetadataParamsFilter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/MetadataParamsFilter.java
new file mode 100644
index 00000000..f7b4dc7d
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/MetadataParamsFilter.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.metadata;
+
+import org.apache.dubbo.common.extension.SPI;
+
+/**
+ * Copy from org.apache.dubbo.metadata.MetadataParamsFilter.
+ *
+ * @author theonefx
+ */
+@SPI
+public interface MetadataParamsFilter {
+
+ /**
+ * params that need to be sent to metadata center.
+ * @return arrays of keys
+ */
+ String[] serviceParamsIncluded();
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/RevisionResolver.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/RevisionResolver.java
new file mode 100644
index 00000000..53f9f1bf
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/RevisionResolver.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.metadata;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Copy from org.apache.dubbo.metadata.RevisionResolver.
+ *
+ * @author theonefx
+ */
+public final class RevisionResolver {
+
+ private static final Logger logger = LoggerFactory.getLogger(RevisionResolver.class);
+
+ private static final String EMPTY_REVISION = "0";
+
+ private static final char[] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8',
+ '9', 'A', 'B', 'C', 'D', 'E', 'F' };
+
+ private static MessageDigest mdInst;
+
+ static {
+ try {
+ mdInst = MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ logger.error("Failed to calculate metadata revision", e);
+ }
+ }
+
+ private RevisionResolver() {
+
+ }
+
+ public static String getEmptyRevision() {
+ return EMPTY_REVISION;
+ }
+
+ public static String calRevision(String metadata) {
+ mdInst.update(metadata.getBytes(UTF_8));
+ byte[] md5 = mdInst.digest();
+
+ int j = md5.length;
+ char[] str = new char[j * 2];
+ int k = 0;
+ for (byte byte0 : md5) {
+ str[k++] = hexDigits[byte0 >>> 4 & 0xf];
+ str[k++] = hexDigits[byte0 & 0xf];
+ }
+ return new String(str);
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/ServiceInfo.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/ServiceInfo.java
new file mode 100644
index 00000000..a734a8e7
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/ServiceInfo.java
@@ -0,0 +1,351 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.metadata;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.compiler.support.ClassUtils;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+/**
+ * Copy from org.apache.dubbo.metadata.MetadataInfo.ServiceInfo.
+ *
+ * @author theonefx
+ */
+public class ServiceInfo implements Serializable {
+
+ private static final long serialVersionUID = -258557978718735302L;
+
+ private static ExtensionLoader loader = ExtensionLoader
+ .getExtensionLoader(MetadataParamsFilter.class);
+
+ private String name;
+
+ private String group;
+
+ private String version;
+
+ private String protocol;
+
+ private String path; // most of the time, path is the same with the interface name.
+
+ private Map params;
+
+ // params configured on consumer side,
+ private transient Map consumerParams;
+
+ // cached method params
+ private transient Map> methodParams;
+
+ private transient Map> consumerMethodParams;
+
+ // cached numbers
+ private transient Map numbers;
+
+ private transient Map> methodNumbers;
+
+ // service + group + version
+ private transient String serviceKey;
+
+ // service + group + version + protocol
+ private transient String matchKey;
+
+ private transient URL url;
+
+ public ServiceInfo() {
+ }
+
+ public ServiceInfo(URL url) {
+ this(url.getServiceInterface(), url.getParameter(GROUP_KEY),
+ url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
+
+ this.url = url;
+ Map params = new HashMap<>();
+ List filters = loader.getActivateExtension(url,
+ "params-filter");
+ for (MetadataParamsFilter filter : filters) {
+ String[] paramsIncluded = filter.serviceParamsIncluded();
+ if (ArrayUtils.isNotEmpty(paramsIncluded)) {
+ for (String p : paramsIncluded) {
+ String value = url.getParameter(p);
+ if (StringUtils.isNotEmpty(value) && params.get(p) == null) {
+ params.put(p, value);
+ }
+ String[] methods = url.getParameter(METHODS_KEY, (String[]) null);
+ if (methods != null) {
+ for (String method : methods) {
+ String mValue = getMethodParameterStrict(url, method, p);
+ if (StringUtils.isNotEmpty(mValue)) {
+ params.put(method + DOT_SEPARATOR + p, mValue);
+ }
+ }
+ }
+ }
+ }
+ }
+ this.params = params;
+ }
+
+ public String getMethodParameterStrict(URL url, String method, String key) {
+ Map keyMap = url.getMethodParameters().get(method);
+ String value = null;
+ if (keyMap != null) {
+ value = keyMap.get(key);
+ }
+ return value;
+ }
+
+ public ServiceInfo(String name, String group, String version, String protocol,
+ String path, Map params) {
+ this.name = name;
+ this.group = group;
+ this.version = version;
+ this.protocol = protocol;
+ this.path = path;
+ this.params = params == null ? new HashMap<>() : params;
+
+ this.serviceKey = URL.buildKey(name, group, version);
+ this.matchKey = buildMatchKey();
+ }
+
+ public String getMatchKey() {
+ if (matchKey != null) {
+ return matchKey;
+ }
+ buildMatchKey();
+ return matchKey;
+ }
+
+ private String buildMatchKey() {
+ matchKey = getServiceKey();
+ if (StringUtils.isNotEmpty(protocol)) {
+ matchKey = getServiceKey() + GROUP_CHAR_SEPARATOR + protocol;
+ }
+ return matchKey;
+ }
+
+ public String getServiceKey() {
+ if (serviceKey != null) {
+ return serviceKey;
+ }
+ this.serviceKey = URL.buildKey(name, group, version);
+ return serviceKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Map getParams() {
+ if (params == null) {
+ return Collections.emptyMap();
+ }
+ return params;
+ }
+
+ public void setParams(Map params) {
+ this.params = params;
+ }
+
+ public Map getAllParams() {
+ if (consumerParams != null) {
+ Map allParams = new HashMap<>(
+ (int) ((params.size() + consumerParams.size()) / 0.75f + 1));
+ allParams.putAll(params);
+ allParams.putAll(consumerParams);
+ return allParams;
+ }
+ return params;
+ }
+
+ public String getParameter(String key) {
+ if (consumerParams != null) {
+ String value = consumerParams.get(key);
+ if (value != null) {
+ return value;
+ }
+ }
+ return params.get(key);
+ }
+
+ public String getMethodParameter(String method, String key, String defaultValue) {
+ if (methodParams == null) {
+ methodParams = URL.toMethodParameters(params);
+ consumerMethodParams = URL.toMethodParameters(consumerParams);
+ }
+
+ String value = getMethodParameter(method, key, consumerMethodParams);
+ if (value != null) {
+ return value;
+ }
+ value = getMethodParameter(method, key, methodParams);
+ return value == null ? defaultValue : value;
+ }
+
+ private String getMethodParameter(String method, String key,
+ Map> map) {
+ Map keyMap = map.get(method);
+ String value = null;
+ if (keyMap != null) {
+ value = keyMap.get(key);
+ }
+ if (StringUtils.isEmpty(value)) {
+ value = getParameter(key);
+ }
+ return value;
+ }
+
+ public boolean hasMethodParameter(String method, String key) {
+ String value = this.getMethodParameter(method, key, (String) null);
+ return StringUtils.isNotEmpty(value);
+ }
+
+ public boolean hasMethodParameter(String method) {
+ if (methodParams == null) {
+ methodParams = URL.toMethodParameters(params);
+ consumerMethodParams = URL.toMethodParameters(consumerParams);
+ }
+
+ return consumerMethodParams.containsKey(method)
+ || methodParams.containsKey(method);
+ }
+
+ public String toDescString() {
+ return this.getMatchKey() + getMethodSignaturesString() + getParams();
+ }
+
+ private String getMethodSignaturesString() {
+ SortedSet methodStrings = new TreeSet();
+
+ Method[] methods = ClassUtils.forName(name).getMethods();
+ for (Method method : methods) {
+ methodStrings.add(method.toString());
+ }
+ return methodStrings.toString();
+ }
+
+ public void addParameter(String key, String value) {
+ if (consumerParams != null) {
+ this.consumerParams.put(key, value);
+ }
+ }
+
+ public void addParameterIfAbsent(String key, String value) {
+ if (consumerParams != null) {
+ this.consumerParams.putIfAbsent(key, value);
+ }
+ }
+
+ public void addConsumerParams(Map params) {
+ // copy once for one service subscription
+ if (consumerParams == null) {
+ consumerParams = new HashMap<>(params);
+ }
+ }
+
+ public Map getNumbers() {
+ // concurrent initialization is tolerant
+ if (numbers == null) {
+ numbers = new ConcurrentHashMap<>();
+ }
+ return numbers;
+ }
+
+ public Map> getMethodNumbers() {
+ if (methodNumbers == null) { // concurrent initialization is tolerant
+ methodNumbers = new ConcurrentHashMap<>();
+ }
+ return methodNumbers;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ServiceInfo)) {
+ return false;
+ }
+
+ ServiceInfo serviceInfo = (ServiceInfo) obj;
+ return this.getMatchKey().equals(serviceInfo.getMatchKey())
+ && this.getParams().equals(serviceInfo.getParams());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getMatchKey(), getParams());
+ }
+
+ @Override
+ public String toString() {
+ return "service{" + "name='" + name + "'," + "group='" + group + "',"
+ + "version='" + version + "'," + "protocol='" + protocol + "',"
+ + "params=" + params + "," + "consumerParams=" + consumerParams + "}";
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java
index 85c50738..7e85e624 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java
@@ -16,6 +16,7 @@
package com.alibaba.cloud.dubbo.metadata.repository;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -32,6 +33,8 @@ import com.alibaba.cloud.dubbo.env.DubboCloudProperties;
import com.alibaba.cloud.dubbo.http.matcher.RequestMetadataMatcher;
import com.alibaba.cloud.dubbo.metadata.DubboRestServiceMetadata;
import com.alibaba.cloud.dubbo.metadata.RequestMetadata;
+import com.alibaba.cloud.dubbo.metadata.RevisionResolver;
+import com.alibaba.cloud.dubbo.metadata.ServiceInfo;
import com.alibaba.cloud.dubbo.metadata.ServiceRestMetadata;
import com.alibaba.cloud.dubbo.registry.event.SubscribedServicesChangedEvent;
import com.alibaba.cloud.dubbo.service.DubboMetadataService;
@@ -43,6 +46,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +100,12 @@ public class DubboServiceMetadataRepository
@Deprecated
public static final String DUBBO_METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_URLS_PROPERTY_NAME;
+ /**
+ * The key of dubbo metadata revision. copyed from
+ * ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME.
+ */
+ public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision";
+
/**
* The {@link String#format(String, Object...) pattern} of dubbo protocols port.
*/
@@ -117,14 +127,11 @@ public class DubboServiceMetadataRepository
*/
private final MultiValueMap allExportedURLs = new LinkedMultiValueMap<>();
- // =================================== Registration
- // =================================== //
+ // ======================== Registration ======================== //
- // ====================================================================================
- // //
+ // ============================================================== //
- // =================================== Subscription
- // =================================== //
+ // ======================== Subscription ======================== //
/**
* A Map to store REST metadata temporary, its' key is the special service name for a
* Dubbo service, the value is a JSON content of JAX-RS or Spring MVC REST metadata
@@ -134,11 +141,9 @@ public class DubboServiceMetadataRepository
private ApplicationEventPublisher applicationEventPublisher;
- // ====================================================================================
- // //
+ // =============================================================== //
- // =================================== REST Metadata
- // ================================== //
+ // ======================== REST Metadata ======================== //
private volatile Set subscribedServices = emptySet();
/**
@@ -147,11 +152,9 @@ public class DubboServiceMetadataRepository
*/
private Map> dubboRestServiceMetadataRepository = newHashMap();
- // ====================================================================================
- // //
+ // =============================================================== //
- // =================================== Dependencies
- // =================================== //
+ // ======================== Dependencies ========================= //
@Autowired
private DubboCloudProperties dubboCloudProperties;
@@ -180,8 +183,7 @@ public class DubboServiceMetadataRepository
@Autowired
private DubboMetadataServiceExporter dubboMetadataServiceExporter;
- // ====================================================================================
- // //
+ // =============================================================== //
private static Map getMap(Map> repository,
String key) {
@@ -189,12 +191,7 @@ public class DubboServiceMetadataRepository
}
private static V getOrDefault(Map source, K key, V defaultValue) {
- V value = source.get(key);
- if (value == null) {
- value = defaultValue;
- source.put(key, value);
- }
- return value;
+ return source.computeIfAbsent(key, k -> defaultValue);
}
private static Map newHashMap() {
@@ -248,13 +245,13 @@ public class DubboServiceMetadataRepository
@Override
public void afterSingletonsInstantiated() {
- initializeMetadata();
+ // initializeMetadata();
}
/**
* Initialize the metadata.
*/
- private void initializeMetadata() {
+ public void initializeMetadata() {
doGetSubscribedServices().forEach(this::initializeMetadata);
if (logger.isInfoEnabled()) {
logger.info("The metadata of Dubbo services has been initialized");
@@ -301,12 +298,36 @@ public class DubboServiceMetadataRepository
addDubboMetadataServiceURLsMetadata(metadata, dubboMetadataServiceURLs);
addDubboProtocolsPortMetadata(metadata);
+ addRevision(metadata);
return Collections.unmodifiableMap(metadata);
}
+ private void addRevision(Map metadata) {
+ metadata.put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, calAndGetRevision());
+ }
+
+ public String calAndGetRevision() {
+ if (CollectionUtils.isEmptyMap(allExportedURLs)) {
+ return RevisionResolver.getEmptyRevision();
+ }
+ else {
+ List descs = new ArrayList<>(allExportedURLs.size());
+ for (Map.Entry> entry : allExportedURLs.entrySet()) {
+ entry.getValue().stream().map(ServiceInfo::new)
+ .map(ServiceInfo::toDescString).forEach(descs::add);
+ }
+
+ descs.sort(String::compareTo);
+
+ return RevisionResolver.calRevision(descs.toString());
+ }
+ }
+
private void removeDubboMetadataServiceURLs(List dubboMetadataServiceURLs) {
- dubboMetadataServiceURLs.forEach(this::unexportURL);
+ dubboMetadataServiceURLs.stream().map(URL::getServiceKey).distinct()
+ .forEach(allExportedURLs::remove);
+ // dubboMetadataServiceURLs.forEach(this::unexportURL);
}
private void addDubboMetadataServiceURLsMetadata(Map metadata,
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractServiceSubscribeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractServiceSubscribeHandler.java
new file mode 100644
index 00000000..aa642c3c
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractServiceSubscribeHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.registry;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.NotifyListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.dubbo.common.URLBuilder.from;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
+
+/**
+ * @author theonefx
+ */
+public abstract class AbstractServiceSubscribeHandler {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected final URL url;
+
+ protected final NotifyListener listener;
+
+ protected final DubboCloudRegistry registry;
+
+ public AbstractServiceSubscribeHandler(URL url, NotifyListener listener,
+ DubboCloudRegistry registry) {
+ this.url = url;
+ this.listener = listener;
+ this.registry = registry;
+ }
+
+ protected void notifyAllSubscribedURLs(URL url, List subscribedURLs,
+ NotifyListener listener) {
+
+ if (isEmpty(subscribedURLs)) {
+ // Add the EMPTY_PROTOCOL URL
+ listener.notify(Collections.singletonList(emptyURL(url)));
+ // if (isDubboMetadataServiceURL(url)) {
+ // if meta service change, and serviceInstances is zero, will clean up
+ // information about this client
+ // String serviceName = url.getParameter(GROUP_KEY);
+ // repository.removeMetadataAndInitializedService(serviceName, url);
+ // }
+ }
+ else {
+ // Notify all
+ listener.notify(subscribedURLs);
+ }
+ }
+
+ private URL emptyURL(URL url) {
+ // issue : When the last service provider is closed, the client still periodically
+ // connects to the last provider.n
+ // fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
+ return from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY)
+ .build();
+ }
+
+ private final AtomicBoolean inited = new AtomicBoolean(false);
+
+ public void init() {
+ if (inited.compareAndSet(false, true)) {
+ doInit();
+ }
+ }
+
+ protected abstract void doInit();
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java
index cce75ba3..1fe83183 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java
@@ -16,54 +16,47 @@
package com.alibaba.cloud.dubbo.registry;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import com.alibaba.cloud.commons.lang.StringUtils;
+import com.alibaba.cloud.dubbo.metadata.RevisionResolver;
import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository;
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
-import com.alibaba.cloud.dubbo.service.DubboGenericServiceFactory;
import com.alibaba.cloud.dubbo.service.DubboMetadataService;
import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy;
import com.alibaba.cloud.dubbo.util.DubboMetadataUtils;
import com.alibaba.cloud.dubbo.util.JSONUtils;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.core.Ordered;
-import org.springframework.util.CollectionUtils;
+import org.springframework.context.event.ContextRefreshedEvent;
-import static java.lang.String.format;
+import static com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static java.util.Collections.emptyList;
-import static org.apache.dubbo.common.URLBuilder.from;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
-import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME;
import static org.springframework.util.StringUtils.hasText;
@@ -72,22 +65,14 @@ import static org.springframework.util.StringUtils.hasText;
* Dubbo Cloud {@link FailbackRegistry} is based on Spring Cloud {@link DiscoveryClient}.
*
* @author Mercy
+ * @author theonefx
*/
-public class DubboCloudRegistry extends FailbackRegistry {
-
- /**
- * The parameter name of {@link #servicesLookupInterval}.
- */
- public static final String SERVICES_LOOKUP_INTERVAL_PARAM_NAME = "dubbo.services.lookup.interval";
+public class DubboCloudRegistry extends FailbackRegistry
+ implements ApplicationListener {
protected static final String DUBBO_METADATA_SERVICE_CLASS_NAME = DubboMetadataService.class
.getName();
- /**
- * Caches the IDs of {@link ApplicationListener}.
- */
- private static final Set REGISTER_LISTENERS = new HashSet<>();
-
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final DiscoveryClient discoveryClient;
@@ -98,56 +83,84 @@ public class DubboCloudRegistry extends FailbackRegistry {
private final JSONUtils jsonUtils;
- private final DubboGenericServiceFactory dubboGenericServiceFactory;
-
private final DubboMetadataUtils dubboMetadataUtils;
- /**
- * The interval in second of lookup service names(only for Dubbo-OPS).
- */
- private final long servicesLookupInterval;
-
private final ConfigurableApplicationContext applicationContext;
- private final String currentApplicationName;
+ private final ReSubscribeManager reSubscribeManager;
- private final Map urlNotifyListenerMap = new ConcurrentHashMap<>();
+ private final AtomicBoolean inited = new AtomicBoolean(false);
- private final Map reConnectJobMap = new ConcurrentHashMap<>();
+ /**
+ * {subscribedURL : ServiceSubscribeHandler}.
+ */
+ private final Map urlSubscribeHandlerMap = new ConcurrentHashMap<>();
- private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
- 2);
+ /**
+ * {appName: MetadataServiceSubscribeHandler}.
+ */
+ private final Map metadataSubscribeHandlerMap = new ConcurrentHashMap<>();
- private final int maxReSubscribeMetadataTimes;
-
- private final int reSubscribeMetadataIntervial;
+ /**
+ * {appName : {revision: [instances]}}.
+ */
+ private final Map>> serviceRevisionInstanceMap = new ConcurrentHashMap<>();
public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient,
DubboServiceMetadataRepository repository,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy,
- JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory,
- ConfigurableApplicationContext applicationContext,
- int maxReSubscribeMetadataTimes, int reSubscribeMetadataIntervial) {
+ JSONUtils jsonUtils, ConfigurableApplicationContext applicationContext) {
super(url);
- this.servicesLookupInterval = url
- .getParameter(SERVICES_LOOKUP_INTERVAL_PARAM_NAME, 60L);
this.discoveryClient = discoveryClient;
this.repository = repository;
this.dubboMetadataConfigServiceProxy = dubboMetadataConfigServiceProxy;
this.jsonUtils = jsonUtils;
- this.dubboGenericServiceFactory = dubboGenericServiceFactory;
this.applicationContext = applicationContext;
this.dubboMetadataUtils = getBean(DubboMetadataUtils.class);
- this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName();
- this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
- this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
+ this.reSubscribeManager = new ReSubscribeManager(this);
- reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
- reConnectPool.allowCoreThreadTimeOut(true);
+ applicationContext.addApplicationListener(
+ (ApplicationListener) event -> preInit());
}
- private T getBean(Class beanClass) {
+ private void preInit() {
+ if (inited.compareAndSet(false, true)) {
+ Set subscribeApps = getServices(null);
+
+ for (String appName : subscribeApps) {
+ List instances = discoveryClient.getInstances(appName);
+
+ Map> map = serviceRevisionInstanceMap
+ .computeIfAbsent(appName, k -> new HashMap<>());
+
+ for (ServiceInstance instance : instances) {
+ String revision = getRevision(instance);
+ List list = map.computeIfAbsent(revision,
+ k -> new ArrayList<>());
+ list.add(instance);
+ }
+
+ if (map.size() == 0) {
+ logger.debug("APP {} preInited, instance siez is zero!!", appName);
+ }
+ else {
+ map.forEach((revision, list) -> logger.debug(
+ "APP {} revision {} preInited, instance size = {}", appName,
+ revision, list.size()));
+ }
+ }
+
+ metadataSubscribeHandlerMap.forEach((url, handler) -> handler.init());
+ urlSubscribeHandlerMap.forEach((url, handler) -> handler.init());
+ repository.initializeMetadata();
+ applicationContext.addApplicationListener(this);
+
+ logger.info("DubboCloudRegistry preInit Done.");
+ }
+ }
+
+ protected T getBean(Class beanClass) {
return this.applicationContext.getBean(beanClass);
}
@@ -183,249 +196,248 @@ public class DubboCloudRegistry extends FailbackRegistry {
@Override
public final void doSubscribe(URL url, NotifyListener listener) {
-
- if (isAdminURL(url)) {
- // TODO in future
- if (logger.isWarnEnabled()) {
- logger.warn("This feature about admin will be supported in the future.");
+ synchronized (this) {
+ if (isAdminURL(url)) {
+ // TODO in future
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ "This feature about admin will be supported in the future.");
+ }
+ }
+ else if (isDubboMetadataServiceURL(url) && containsProviderCategory(url)) {
+ // for DubboMetadataService
+ String appName = getServiceName(url);
+ MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler(
+ appName, url, listener, this, dubboMetadataUtils);
+ if (inited.get()) {
+ handler.init();
+ }
+ metadataSubscribeHandlerMap.put(appName, handler);
+ }
+ else if (isConsumerServiceURL(url)) {
+ // for general Dubbo Services
+ GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler(
+ url, listener, this, repository, jsonUtils,
+ dubboMetadataConfigServiceProxy);
+ if (inited.get()) {
+ handler.init();
+ }
+ urlSubscribeHandlerMap.put(url, handler);
}
}
- else if (isDubboMetadataServiceURL(url)) { // for DubboMetadataService
- subscribeDubboMetadataServiceURLs(url, listener);
- }
- else { // for general Dubbo Services
- subscribeURLs(url, listener);
- urlNotifyListenerMap.put(url, listener);
- }
- }
-
- private void subscribeURLs(URL url, NotifyListener listener) {
-
- // Sync subscription
- subscribeURLs(url, getServices(url), listener);
-
- // Async subscription
- registerServiceInstancesChangedListener(url,
-
- new ServiceInstanceChangeListener() {
-
- @Override
- public int getOrder() {
- return Ordered.LOWEST_PRECEDENCE;
- }
-
- @Override
- public void onApplicationEvent(ServiceInstancesChangedEvent event) {
-
- Set serviceNames = getServices(url);
-
- String serviceName = event.getServiceName();
-
- if (serviceNames.contains(serviceName)) {
- logger.debug(
- "handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}",
- event.getServiceName(), url.getServiceKey());
- try {
- subscribeURLs(url, serviceNames, listener);
- reConnectJobMap.remove(serviceName);
- }
- catch (Exception e) {
- logger.warn(String.format(
- "subscribeURLs failed, serviceName = %s, try reSubscribe again",
- serviceName), e);
- addReSubscribeMetadataJob(serviceName, 0);
- }
- }
- }
-
- @Override
- public String toString() {
- return "ServiceInstancesChangedEventListener:"
- + url.getServiceKey();
- }
- });
- }
-
- void addReSubscribeMetadataJob(String serviceName, int count) {
- if (count > maxReSubscribeMetadataTimes) {
- logger.error(
- "reSubscribe failed too many times, serviceName = {}, count = {}",
- serviceName, count);
- return;
- }
- ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, this, count);
- reConnectJobMap.put(serviceName, job);
- reConnectPool.schedule(job, reSubscribeMetadataIntervial, TimeUnit.SECONDS);
- }
-
- void subscribeURLs(URL url, Set serviceNames, NotifyListener listener) {
-
- List subscribedURLs = new LinkedList<>();
-
- serviceNames.forEach(serviceName -> {
-
- subscribeURLs(url, subscribedURLs, serviceName,
- () -> getServiceInstances(serviceName));
-
- });
-
- // Notify all
- notifyAllSubscribedURLs(url, subscribedURLs, listener);
- }
-
- private void registerServiceInstancesChangedListener(URL url,
- ApplicationListener listener) {
- String listenerId = generateId(url);
- if (REGISTER_LISTENERS.add(listenerId)) {
- applicationContext.addApplicationListener(listener);
- }
- }
-
- private void subscribeURLs(URL subscribedURL, List subscribedURLs,
- String serviceName,
- Supplier> serviceInstancesSupplier) {
- List serviceInstances = serviceInstancesSupplier.get();
- subscribeURLs(subscribedURL, subscribedURLs, serviceName, serviceInstances);
- }
-
- private void subscribeURLs(URL subscribedURL, List subscribedURLs,
- String serviceName, List serviceInstances) {
-
- if (CollectionUtils.isEmpty(serviceInstances)) {
- if (logger.isWarnEnabled()) {
- logger.warn(format("There is no instance in service[name : %s]",
- serviceName));
- }
- }
- else {
- logger.debug("subscribe from serviceName = {}, size = {}", serviceName,
- serviceInstances.size());
- }
-
- List exportedURLs = getExportedURLs(subscribedURL, serviceName,
- serviceInstances);
-
- /**
- * Add the exported URLs from {@link MetadataService}
- */
- subscribedURLs.addAll(exportedURLs);
- }
-
- private List getExportedURLs(URL subscribedURL, String serviceName,
- List serviceInstances) {
-
- List validServiceInstances = filter(serviceInstances);
-
- // If there is no valid ServiceInstance, return empty result
- if (isEmpty(validServiceInstances)) {
- if (logger.isWarnEnabled()) {
- logger.warn(
- "There is no instance from service[name : {}], and then Dubbo Service[key : {}] will not be "
- + "available , please make sure the further impact",
- serviceName, subscribedURL.getServiceKey());
- }
- return emptyList();
- }
-
- List subscribedURLs = cloneExportedURLs(subscribedURL, serviceInstances);
-
- // clear local service instances, help GC
- validServiceInstances.clear();
-
- return subscribedURLs;
}
/**
- * Clone the subscribed URLs based on the template URLs.
- * @param subscribedURL the URL to be subscribed
- * @param serviceInstances the list of {@link ServiceInstance service instances}
- * @return non-null
+ * Process ServiceInstanceChangedEvent, refresh dubbo reference and metadata info.
*/
- private List cloneExportedURLs(URL subscribedURL,
- List serviceInstances) {
+ @Override
+ public void onApplicationEvent(ServiceInstancesChangedEvent event) {
- List clonedExportedURLs = new LinkedList<>();
+ String appName = event.getServiceName();
- serviceInstances.forEach(serviceInstance -> {
+ List instances = filter(event.getServiceInstances() != null
+ ? event.getServiceInstances() : Collections.emptyList());
- String host = serviceInstance.getHost();
+ Set subscribedServiceNames = getServices(null);
- getTemplateExportedURLs(subscribedURL, serviceInstances).stream()
- .map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
- .map(templateURL -> templateURL.removeParameter(PID_KEY))
- .map(templateURL -> {
- String protocol = templateURL.getProtocol();
- Integer port = repository.getDubboProtocolPort(serviceInstance,
- protocol);
+ if (!subscribedServiceNames.contains(appName)) {
+ return;
+ }
- // reserve tag
- String tag = null;
- List urls = jsonUtils.toURLs(serviceInstance.getMetadata()
- .get("dubbo.metadata-service.urls"));
- if (urls != null && urls.size() > 0) {
- Map parameters = urls.get(0).getParameters();
- tag = parameters.get("dubbo.tag");
- }
-
- if (Objects.equals(templateURL.getHost(), host)
- && Objects.equals(templateURL.getPort(), port)) { // use
- // templateURL
- // if
- // equals
- return templateURL;
- }
-
- if (port == null) {
- if (logger.isWarnEnabled()) {
- logger.warn(
- "The protocol[{}] port of Dubbo service instance[host : {}] "
- + "can't be resolved",
- protocol, host);
- }
- return null;
- }
- else {
- URLBuilder clonedURLBuilder = from(templateURL) // remove the
- // parameters from
- // the template
- // URL
- .setHost(host) // reset the host
- .setPort(port) // reset the port
- .addParameter("dubbo.tag", tag); // reset the tag
-
- return clonedURLBuilder.build();
- }
-
- }).filter(Objects::nonNull).forEach(clonedExportedURLs::add);
- });
- return clonedExportedURLs;
- }
-
- private List getTemplateExportedURLs(URL subscribedURL,
- List serviceInstances) {
-
- DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
-
- List templateExportedURLs = emptyList();
-
- if (dubboMetadataService != null) {
- templateExportedURLs = getExportedURLs(dubboMetadataService, subscribedURL);
+ if (instances.size() == 0) {
+ logger.warn("APP {} instance changed, size changed zero!!!", appName);
}
else {
- if (logger.isWarnEnabled()) {
- logger.warn(
- "The metadata of Dubbo service[key : {}] still can't be found, it could effect the further "
- + "Dubbo service invocation",
- subscribedURL.getServiceKey());
+ logger.info("APP {} instance changed, size changed to {}", appName,
+ instances.size());
+ }
+ // group by revision
+ Map> newGroup = instances.stream()
+ .collect(Collectors.groupingBy(this::getRevision));
+
+ synchronized (this) {
+
+ Map> oldGroup = serviceRevisionInstanceMap
+ .computeIfAbsent(appName, k -> new HashMap<>());
+
+ if (serviceInstanceNotChanged(oldGroup, newGroup)) {
+ logger.debug("APP {} instance changed, but nothing different", appName);
+ return;
}
- }
+ try {
- return templateExportedURLs;
+ // ensure that the service metadata is correct
+ refreshServiceMetadataInfo(appName, instances);
+
+ // then , refresh general service associated with current application
+ refreshGeneralServiceInfo(appName, oldGroup, newGroup);
+
+ // mark process successful
+ reSubscribeManager.onRefreshSuccess(event);
+ }
+ catch (Exception e) {
+ logger.error(String.format(
+ "APP %s instance changed, handler faild, try resubscribe",
+ appName), e);
+ reSubscribeManager.onRefreshFail(event);
+ }
+ }
}
- private DubboMetadataService getProxy(List serviceInstances) {
- return dubboMetadataConfigServiceProxy.getProxy(serviceInstances);
+ private void refreshGeneralServiceInfo(String appName,
+ Map> oldGroup,
+ Map> newGroup) {
+
+ Set urls2refresh = new HashSet<>();
+
+ // compare with local
+ for (String revision : oldGroup.keySet()) {
+
+ if (!newGroup.containsKey(revision)) {
+ // all instances of this list with revision has losted
+ urlSubscribeHandlerMap.forEach((url, handler) -> {
+ if (handler.relatedWith(appName, revision)) {
+ handler.removeAppNameWithRevision(appName, revision);
+ urls2refresh.add(url);
+ }
+ });
+ logger.debug("Subscription app {} revision {} has all losted", appName,
+ revision);
+ }
+ }
+
+ for (Map.Entry> entry : newGroup.entrySet()) {
+ String revision = entry.getKey();
+ List instanceList = entry.getValue();
+
+ if (!oldGroup.containsKey(revision)) {
+ // this instance list of revision not exists
+ // should acquire urls
+ urlSubscribeHandlerMap.forEach(
+ (url, handler) -> handler.init(appName, revision, instanceList));
+ }
+
+ urlSubscribeHandlerMap.forEach((url, handler) -> {
+ if (handler.relatedWith(appName, revision)) {
+ urls2refresh.add(url);
+ }
+ });
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Subscription app {} revision {} changed, instance list {}",
+ appName, revision,
+ instanceList.stream().map(
+ instance -> instance.getHost() + ":" + instance.getPort())
+ .collect(Collectors.toList()));
+ }
+ }
+
+ serviceRevisionInstanceMap.put(appName, newGroup);
+
+ if (urls2refresh.size() == 0) {
+ logger.debug("Subscription app {}, no urls will be refreshed", appName);
+ }
+ else {
+ logger.debug("Subscription app {}, the following url will be refresh:{}",
+ appName, urls2refresh.stream().map(URL::getServiceKey)
+ .collect(Collectors.toList()));
+
+ for (URL url : urls2refresh) {
+ GenearalServiceSubscribeHandler handler = urlSubscribeHandlerMap.get(url);
+ if (handler == null) {
+ logger.warn("Subscription app {}, can't find handler for service {}",
+ appName, url.getServiceKey());
+ continue;
+ }
+ handler.refresh();
+ }
+ }
+ }
+
+ private void refreshServiceMetadataInfo(String serviceName,
+ List serviceInstances) {
+ MetadataServiceSubscribeHandler handler = metadataSubscribeHandlerMap
+ .get(serviceName);
+
+ if (handler == null) {
+ logger.warn("Subscription app {}, can't find metadata handler", serviceName);
+ return;
+ }
+ handler.refresh(serviceInstances);
+ }
+
+ private boolean serviceInstanceNotChanged(Map> oldGroup,
+ Map> newGroup) {
+ if (newGroup.size() != oldGroup.size()) {
+ return false;
+ }
+
+ for (Map.Entry> entry : newGroup.entrySet()) {
+ String appName = entry.getKey();
+ List newInstances = entry.getValue();
+
+ if (!oldGroup.containsKey(appName)) {
+ return false;
+ }
+
+ List oldInstances = oldGroup.get(appName);
+ if (newInstances.size() != oldInstances.size()) {
+ return false;
+ }
+
+ boolean matched = newInstances.stream().allMatch(newInstance -> {
+
+ for (ServiceInstance oldInstance : oldInstances) {
+ if (instanceSame(newInstance, oldInstance)) {
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (!matched) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private boolean instanceSame(ServiceInstance newInstance,
+ ServiceInstance oldInstance) {
+ if (!StringUtils.equals(newInstance.getInstanceId(),
+ oldInstance.getInstanceId())) {
+ return false;
+ }
+ if (!StringUtils.equals(newInstance.getHost(), oldInstance.getHost())) {
+ return false;
+ }
+ if (!StringUtils.equals(newInstance.getServiceId(), oldInstance.getServiceId())) {
+ return false;
+ }
+ if (!StringUtils.equals(newInstance.getScheme(), oldInstance.getScheme())) {
+ return false;
+ }
+ if (oldInstance.getPort() != newInstance.getPort()) {
+ return false;
+ }
+
+ if (!oldInstance.getMetadata().equals(newInstance.getMetadata())) {
+ return false;
+ }
+
+ return true;
+ }
+
+ String getRevision(ServiceInstance instance) {
+ Map metadata = instance.getMetadata();
+ String revision = metadata.get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
+
+ if (revision == null) {
+ revision = RevisionResolver.getEmptyRevision();
+ }
+ return revision;
}
private List filter(Collection serviceInstances) {
@@ -440,35 +452,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
Set getServices(URL url) {
Set subscribedServices = repository.getSubscribedServices();
+ if (subscribedServices.contains("*")) {
+ subscribedServices = new HashSet<>(discoveryClient.getServices());
+ }
// TODO Add the filter feature
return subscribedServices;
}
- private void notifyAllSubscribedURLs(URL url, List subscribedURLs,
- NotifyListener listener) {
-
- if (isEmpty(subscribedURLs)) {
- // Add the EMPTY_PROTOCOL URL
- subscribedURLs.add(emptyURL(url));
-
- // if (isDubboMetadataServiceURL(url)) {
- // if meta service change, and serviceInstances is zero, will clean up
- // information about this client
- // String serviceName = url.getParameter(GROUP_KEY);
- // repository.removeMetadataAndInitializedService(serviceName, url);
- // }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("The subscribed URL[{}] will notify all URLs : {}", url,
- subscribedURLs);
- }
-
- // Notify all
- listener.notify(subscribedURLs);
- }
-
- private List getServiceInstances(String serviceName) {
+ List getServiceInstances(String serviceName) {
return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList();
}
@@ -485,108 +476,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
return serviceInstances;
}
- private String generateId(URL url) {
- return url.toString();
- }
-
- private URL emptyURL(URL url) {
- // issue : When the last service provider is closed, the client still periodically
- // connects to the last provider.n
- // fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
- return from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY)
- .build();
- }
-
- private List getExportedURLs(DubboMetadataService dubboMetadataService,
- URL subscribedURL) {
- String serviceInterface = subscribedURL.getServiceInterface();
- String group = subscribedURL.getParameter(GROUP_KEY);
- String version = subscribedURL.getParameter(VERSION_KEY);
- // The subscribed protocol may be null
- String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
- String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
- group, version);
- return jsonUtils.toURLs(exportedURLsJSON).stream()
- .filter(exportedURL -> subscribedProtocol == null
- || subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
- .collect(Collectors.toList());
- }
-
- private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
- NotifyListener listener) {
-
- subscribeDubboMetadataServiceURLs(subscribedURL, listener,
- getServiceName(subscribedURL));
-
- // Sync subscription
- if (containsProviderCategory(subscribedURL)) {
-
- registerServiceInstancesChangedListener(subscribedURL,
- new ServiceInstanceChangeListener() {
-
- @Override
- public int getOrder() {
- return Ordered.LOWEST_PRECEDENCE - 1;
- }
-
- @Override
- public void onApplicationEvent(
- ServiceInstancesChangedEvent event) {
- String sourceServiceName = event.getServiceName();
- List serviceInstances = event
- .getServiceInstances();
- String serviceName = getServiceName(subscribedURL);
-
- if (Objects.equals(sourceServiceName, serviceName)) {
- logger.debug(
- "handle serviceInstanceChange of metadata service, serviceName = {}, subscribeUrl={}",
- event.getServiceName(),
- subscribedURL.getServiceKey());
-
- // only update serviceInstances of the specified
- // serviceName
- subscribeDubboMetadataServiceURLs(subscribedURL, listener,
- sourceServiceName, serviceInstances);
- }
- }
-
- @Override
- public String toString() {
- return "ServiceInstancesChangedEventListener:"
- + subscribedURL.getServiceKey();
- }
- });
- }
- }
-
+ // the group of DubboMetadataService is current application name
private String getServiceName(URL subscribedURL) {
return subscribedURL.getParameter(GROUP_KEY);
}
- private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
- NotifyListener listener, String serviceName,
- List serviceInstances) {
-
- String serviceInterface = subscribedURL.getServiceInterface();
- String version = subscribedURL.getParameter(VERSION_KEY);
- String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
-
- List urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
- serviceInterface, version, protocol);
-
- notifyAllSubscribedURLs(subscribedURL, urls, listener);
- }
-
- private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
- NotifyListener listener, String serviceName) {
- List serviceInstances = getServiceInstances(serviceName);
- subscribeDubboMetadataServiceURLs(subscribedURL, listener, serviceName,
- serviceInstances);
- }
-
private boolean containsProviderCategory(URL subscribedURL) {
String category = subscribedURL.getParameter(CATEGORY_KEY);
- return category == null ? false : category.contains(PROVIDER);
+ return category != null && category.contains(PROVIDER);
}
@Override
@@ -603,16 +500,36 @@ public class DubboCloudRegistry extends FailbackRegistry {
return ADMIN_PROTOCOL.equals(url.getProtocol());
}
- public Map getUrlNotifyListenerMap() {
- return urlNotifyListenerMap;
- }
-
- public Map getReConnectJobMap() {
- return reConnectJobMap;
- }
-
protected boolean isDubboMetadataServiceURL(URL url) {
return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface());
}
+ protected boolean isConsumerServiceURL(URL url) {
+ return CONSUMER.equals(url.getProtocol());
+ }
+
+ public List getServiceInstances(Map> providers) {
+ List instances = new ArrayList<>();
+
+ providers.forEach((appName, revisions) -> {
+ Map> revisionMap = serviceRevisionInstanceMap
+ .get(appName);
+ if (revisionMap == null) {
+ return;
+ }
+ for (String revision : revisions) {
+ List list = revisionMap.get(revision);
+ if (list != null) {
+ instances.addAll(list);
+ }
+ }
+ });
+
+ return instances;
+ }
+
+ public Map>> getServiceRevisionInstanceMap() {
+ return serviceRevisionInstanceMap;
+ }
+
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/GenearalServiceSubscribeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/GenearalServiceSubscribeHandler.java
new file mode 100644
index 00000000..a2f3ca4f
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/GenearalServiceSubscribeHandler.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.registry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository;
+import com.alibaba.cloud.dubbo.service.DubboMetadataService;
+import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy;
+import com.alibaba.cloud.dubbo.util.JSONUtils;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.registry.NotifyListener;
+
+import org.springframework.cloud.client.ServiceInstance;
+
+import static java.util.Collections.emptyList;
+import static org.apache.dubbo.common.URLBuilder.from;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+/**
+ * @author theonefx
+ */
+public class GenearalServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
+
+ private final Map> providers = new HashMap<>();
+
+ private final Map urlTemplateMap = new HashMap<>();
+
+ private final JSONUtils jsonUtils;
+
+ private final DubboServiceMetadataRepository repository;
+
+ private final DubboMetadataServiceProxy dubboMetadataConfigServiceProxy;
+
+ public GenearalServiceSubscribeHandler(URL url, NotifyListener listener,
+ DubboCloudRegistry registry, DubboServiceMetadataRepository repository,
+ JSONUtils jsonUtils,
+ DubboMetadataServiceProxy dubboMetadataConfigServiceProxy) {
+ super(url, listener, registry);
+ this.repository = repository;
+ this.jsonUtils = jsonUtils;
+ this.dubboMetadataConfigServiceProxy = dubboMetadataConfigServiceProxy;
+ }
+
+ public boolean relatedWith(String appName, String revision) {
+ Set list = providers.get(appName);
+ if (list != null && list.size() > 0) {
+ if (list.contains(revision)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void removeAppNameWithRevision(String appName, String revision) {
+ Set list = providers.get(appName);
+ if (list != null) {
+ list.remove(revision);
+ if (list.size() == 0) {
+ providers.remove(appName);
+ }
+ }
+ }
+
+ public void addAppNameWithRevision(String appName, String revision) {
+ Set set = providers.computeIfAbsent(appName, k -> new HashSet<>());
+ set.add(revision);
+ }
+
+ public synchronized void doInit() {
+ logger.debug("Subscription interface {}, GenearalServiceSubscribeHandler init",
+ url.getServiceKey());
+ Map>> map = registry
+ .getServiceRevisionInstanceMap();
+ for (Map.Entry>> entry : map
+ .entrySet()) {
+ String appName = entry.getKey();
+ Map> revisionMap = entry.getValue();
+
+ for (Map.Entry> revisionEntity : revisionMap
+ .entrySet()) {
+ String revision = revisionEntity.getKey();
+ List instances = revisionEntity.getValue();
+ init(appName, revision, instances);
+ }
+ }
+ refresh();
+ }
+
+ public void init(String appName, String revision,
+ List instanceList) {
+ List urls = getTemplateExportedURLs(url, instanceList);
+ if (urls != null && urls.size() > 0) {
+ addAppNameWithRevision(appName, revision);
+ setUrlTemplate(appName, revision, urls);
+ }
+ }
+
+ public synchronized void refresh() {
+ List urls = getProviderURLs();
+ notifyAllSubscribedURLs(url, urls, listener);
+ }
+
+ private List getProviderURLs() {
+ List instances = registry.getServiceInstances(providers);
+
+ logger.debug("Subscription interfece {}, providers {}, total {}",
+ url.getServiceKey(), providers, instances.size());
+
+ if (instances.size() == 0) {
+ return Collections.emptyList();
+ }
+
+ return cloneExportedURLs(instances);
+ }
+
+ void setUrlTemplate(String appName, String revision, List urls) {
+ if (urls == null || urls.size() == 0) {
+ return;
+ }
+ String key = getAppRevisionKey(appName, revision);
+ if (urlTemplateMap.containsKey(key)) {
+ return;
+ }
+ urlTemplateMap.put(key, urls.get(0));
+ }
+
+ private String getAppRevisionKey(String appName, String revision) {
+ return appName + "@" + revision;
+ }
+
+ /**
+ * Clone the subscribed URLs based on the template URLs.
+ * @param serviceInstances the list of
+ * {@link org.springframework.cloud.client.ServiceInstance service instances}
+ * @return
+ */
+ List cloneExportedURLs(List serviceInstances) {
+
+ List urlsCloneTo = new ArrayList<>();
+ serviceInstances.forEach(serviceInstance -> {
+
+ String host = serviceInstance.getHost();
+ String appName = serviceInstance.getServiceId();
+ String revision = registry.getRevision(serviceInstance);
+
+ URL template = urlTemplateMap.get(getAppRevisionKey(appName, revision));
+
+ Stream.of(template)
+ .map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
+ .map(templateURL -> templateURL.removeParameter(PID_KEY))
+ .map(templateURL -> {
+ String protocol = templateURL.getProtocol();
+ Integer port = repository.getDubboProtocolPort(serviceInstance,
+ protocol);
+
+ // reserve tag
+ String tag = null;
+ List urls = jsonUtils.toURLs(serviceInstance.getMetadata()
+ .get("dubbo.metadata-service.urls"));
+ if (urls != null && urls.size() > 0) {
+ Map parameters = urls.get(0).getParameters();
+ tag = parameters.get("dubbo.tag");
+ }
+
+ if (Objects.equals(templateURL.getHost(), host)
+ && Objects.equals(templateURL.getPort(), port)) { // use
+ // templateURL
+ // if
+ // equals
+ return templateURL;
+ }
+
+ if (port == null) {
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ "The protocol[{}] port of Dubbo service instance[host : {}] "
+ + "can't be resolved",
+ protocol, host);
+ }
+ return null;
+ }
+ else {
+ URLBuilder clonedURLBuilder = from(templateURL) // remove the
+ // parameters from
+ // the template
+ // URL
+ .setHost(host) // reset the host
+ .setPort(port) // reset the port
+ .addParameter("dubbo.tag", tag); // reset the tag
+
+ return clonedURLBuilder.build();
+ }
+
+ }).filter(Objects::nonNull).forEach(urlsCloneTo::add);
+ });
+ return urlsCloneTo;
+ }
+
+ private List getTemplateExportedURLs(URL subscribedURL,
+ List serviceInstances) {
+
+ DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
+
+ List templateExportedURLs = emptyList();
+
+ if (dubboMetadataService != null) {
+ templateExportedURLs = getExportedURLs(dubboMetadataService, subscribedURL);
+ }
+ else {
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ "The metadata of Dubbo service[key : {}] still can't be found, it could effect the further "
+ + "Dubbo service invocation",
+ subscribedURL.getServiceKey());
+ }
+
+ }
+
+ return templateExportedURLs;
+ }
+
+ private DubboMetadataService getProxy(List serviceInstances) {
+ return dubboMetadataConfigServiceProxy.getProxy(serviceInstances);
+ }
+
+ private List getExportedURLs(DubboMetadataService dubboMetadataService,
+ URL subscribedURL) {
+ String serviceInterface = subscribedURL.getServiceInterface();
+ String group = subscribedURL.getParameter(GROUP_KEY);
+ String version = subscribedURL.getParameter(VERSION_KEY);
+ // The subscribed protocol may be null
+ String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
+ String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
+ group, version);
+ return jsonUtils.toURLs(exportedURLsJSON).stream()
+ .filter(exportedURL -> subscribedProtocol == null
+ || subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/MetadataServiceSubscribeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/MetadataServiceSubscribeHandler.java
new file mode 100644
index 00000000..9a9f9932
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/MetadataServiceSubscribeHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.registry;
+
+import java.util.List;
+
+import com.alibaba.cloud.dubbo.util.DubboMetadataUtils;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.NotifyListener;
+
+import org.springframework.cloud.client.ServiceInstance;
+
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+/**
+ * @author theonefx
+ */
+public class MetadataServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
+
+ private final String appName;
+
+ private final DubboMetadataUtils dubboMetadataUtils;
+
+ public MetadataServiceSubscribeHandler(String appName, URL url,
+ NotifyListener listener, DubboCloudRegistry registry,
+ DubboMetadataUtils dubboMetadataUtils) {
+ super(url, listener, registry);
+ this.appName = appName;
+ this.dubboMetadataUtils = dubboMetadataUtils;
+ }
+
+ public void doInit() {
+ logger.debug("Subscription app {} MetadataService handler init", appName);
+ List serviceInstances = registry.getServiceInstances(appName);
+ subscribeDubboMetadataServiceURLs(url, listener, serviceInstances);
+ }
+
+ public void refresh(List serviceInstances) {
+ logger.debug("Subscription app {}, instance changed, new size = {}", appName,
+ serviceInstances.size());
+ subscribeDubboMetadataServiceURLs(url, listener, serviceInstances);
+ }
+
+ private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
+ NotifyListener listener, List serviceInstances) {
+
+ logger.debug("Subscription app {}, service instance changed to size {}", appName,
+ serviceInstances.size());
+
+ String serviceInterface = subscribedURL.getServiceInterface();
+ String version = subscribedURL.getParameter(VERSION_KEY);
+ String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
+
+ List urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
+ serviceInterface, version, protocol);
+
+ notifyAllSubscribedURLs(subscribedURL, urls, listener);
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeManager.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeManager.java
new file mode 100644
index 00000000..5de52dbf
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeManager.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.registry;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.cloud.dubbo.env.DubboCloudProperties;
+import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.cloud.client.ServiceInstance;
+
+/**
+ * @author theonefx
+ */
+public class ReSubscribeManager {
+
+ private final Logger logger = LoggerFactory.getLogger(ReSubscribeManager.class);
+
+ private final Map reConnectJobMap = new ConcurrentHashMap<>();
+
+ private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
+ 5);
+
+ private final DubboCloudRegistry registry;
+
+ private final DubboCloudProperties properties;
+
+ public ReSubscribeManager(DubboCloudRegistry registry) {
+ this.registry = registry;
+ this.properties = registry.getBean(DubboCloudProperties.class);
+
+ reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
+ reConnectPool.allowCoreThreadTimeOut(true);
+ }
+
+ public void onRefreshSuccess(ServiceInstancesChangedEvent event) {
+ reConnectJobMap.remove(event.getServiceName());
+ }
+
+ public void onRefreshFail(ServiceInstancesChangedEvent event) {
+ String serviceName = event.getServiceName();
+
+ int count = 1;
+
+ if (event instanceof FakeServiceInstancesChangedEvent) {
+ count = ((FakeServiceInstancesChangedEvent) event).getCount() + 1;
+ }
+
+ if (count >= properties.getMaxReSubscribeMetadataTimes()) {
+ logger.error(
+ "reSubscribe failed too many times, serviceName = {}, count = {}",
+ serviceName, count);
+ return;
+ }
+
+ ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, count);
+ reConnectPool.schedule(job, properties.getReSubscribeMetadataIntervial(),
+ TimeUnit.SECONDS);
+ }
+
+ private final class ReSubscribeMetadataJob implements Runnable {
+
+ private final String serviceName;
+
+ private final int errorCounts;
+
+ private ReSubscribeMetadataJob(String serviceName, int errorCounts) {
+ this.errorCounts = errorCounts;
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public void run() {
+ if (!reConnectJobMap.containsKey(serviceName)
+ || reConnectJobMap.get(serviceName) != this) {
+ return;
+ }
+ List list = registry.getServiceInstances(serviceName);
+ FakeServiceInstancesChangedEvent event = new FakeServiceInstancesChangedEvent(
+ serviceName, list, errorCounts);
+ registry.onApplicationEvent(event);
+ }
+
+ }
+
+ private static final class FakeServiceInstancesChangedEvent
+ extends ServiceInstancesChangedEvent {
+
+ private static final long serialVersionUID = -2832478604601472915L;
+
+ private final int count;
+
+ private FakeServiceInstancesChangedEvent(String serviceName,
+ List serviceInstances, int count) {
+ super(serviceName, serviceInstances);
+ this.count = count;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java
deleted file mode 100644
index 74d917f0..00000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.dubbo.registry;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.registry.NotifyListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * For re subscribe URL from provider.
- *
- * @author theonefx
- */
-public class ReSubscribeMetadataJob implements Runnable {
-
- protected final Logger logger = LoggerFactory.getLogger(ReSubscribeMetadataJob.class);
-
- private final String serviceName;
-
- private final DubboCloudRegistry dubboCloudRegistry;
-
- private final int errorCounts;
-
- public ReSubscribeMetadataJob(String serviceName,
- DubboCloudRegistry dubboCloudRegistry, int errorCounts) {
- this.errorCounts = errorCounts;
- this.serviceName = serviceName;
- this.dubboCloudRegistry = dubboCloudRegistry;
- }
-
- public ReSubscribeMetadataJob(String serviceName,
- DubboCloudRegistry dubboCloudRegistry) {
- this(serviceName, dubboCloudRegistry, 0);
- }
-
- @Override
- public void run() {
- if (dubboCloudRegistry.getReConnectJobMap().get(serviceName) != this) {
- return;
- }
- try {
- logger.info("reSubscribe, serviceName = {}, count = {}", serviceName,
- errorCounts);
- for (Map.Entry entry : dubboCloudRegistry
- .getUrlNotifyListenerMap().entrySet()) {
- doRun(entry.getKey(), entry.getValue());
- }
- dubboCloudRegistry.getReConnectJobMap().remove(serviceName);
- }
- catch (Exception e) {
- logger.warn(String.format(
- "reSubscribe failed, serviceName = %s, try refresh again",
- serviceName), e);
- dubboCloudRegistry.addReSubscribeMetadataJob(serviceName, errorCounts + 1);
- }
- }
-
- private void doRun(URL url, NotifyListener listener) {
- Set serviceNames = dubboCloudRegistry.getServices(url);
-
- if (serviceNames.contains(serviceName)) {
- dubboCloudRegistry.subscribeURLs(url, serviceNames, listener);
- }
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java
index d13c5179..2b3b676a 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java
@@ -89,7 +89,7 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
DubboCloudProperties dubboCloudProperties = applicationContext
.getBean(DubboCloudProperties.class);
- Registry registry = null;
+ Registry registry;
switch (dubboCloudProperties.getRegistryType()) {
case SPRING_CLOUD_REGISTRY_PROPERTY_VALUE:
@@ -100,9 +100,7 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
default:
registry = new DubboCloudRegistry(url, discoveryClient,
dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
- jsonUtils, dubboGenericServiceFactory, applicationContext,
- dubboCloudProperties.getMaxReSubscribeMetadataTimes(),
- dubboCloudProperties.getReSubscribeMetadataIntervial());
+ jsonUtils, applicationContext);
break;
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/resources/META-INF/dubbo/com.alibaba.cloud.dubbo.metadata.MetadataParamsFilter b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/resources/META-INF/dubbo/com.alibaba.cloud.dubbo.metadata.MetadataParamsFilter
new file mode 100644
index 00000000..ce4f0db1
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/resources/META-INF/dubbo/com.alibaba.cloud.dubbo.metadata.MetadataParamsFilter
@@ -0,0 +1 @@
+default=com.alibaba.cloud.dubbo.metadata.DefaultMetadataParamsFilter
\ No newline at end of file