1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

ans starter suport eureka/consule registry and add sms module

This commit is contained in:
得少
2019-01-09 18:43:23 +08:00
parent d7fea8dfc3
commit 90b21480d7
62 changed files with 3292 additions and 100 deletions

View File

@@ -21,12 +21,16 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.alicloud.ans.migrate.MigrateOnConditionMissingClass;
import org.springframework.cloud.alicloud.ans.registry.AnsAutoServiceRegistration;
import org.springframework.cloud.alicloud.ans.registry.AnsRegistration;
import org.springframework.cloud.alicloud.ans.registry.AnsServiceRegistry;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
/**
@@ -34,6 +38,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
@EnableConfigurationProperties
@Conditional(MigrateOnConditionMissingClass.class)
@ConditionalOnClass(name = "org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent")
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@ConditionalOnAnsEnabled
@@ -49,8 +54,9 @@ public class AnsAutoConfiguration {
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public AnsRegistration ansRegistration() {
return new AnsRegistration();
public AnsRegistration ansRegistration(AnsProperties ansProperties,
ApplicationContext applicationContext) {
return new AnsRegistration(ansProperties, applicationContext);
}
@Bean
@@ -63,4 +69,5 @@ public class AnsAutoConfiguration {
return new AnsAutoServiceRegistration(registry, autoServiceRegistrationProperties,
registration);
}
}

View File

@@ -16,42 +16,33 @@
package org.springframework.cloud.alicloud.ans;
import java.util.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import org.springframework.cloud.client.DefaultServiceInstance;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.core.Host;
import org.springframework.cloud.alicloud.ans.registry.AnsRegistration;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.core.Host;
import java.util.*;
/**
* @author xiaolongzuo
* @author pbting
*/
public class AnsDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud ANS Discovery Client";
@Autowired
private AnsProperties ansProperties;
private AnsRegistration ansRegistration;
public AnsDiscoveryClient(AnsRegistration ansRegistration) {
this.ansRegistration = ansRegistration;
}
@Override
public String description() {
return DESCRIPTION;
}
@Override
public ServiceInstance getLocalServiceInstance() {
String serviceId = ansProperties.getClientDomains();
String host = ansProperties.getClientIp();
int port = ansProperties.getClientPort();
boolean secure = ansProperties.isSecure();
Map<String, String> metadata = ansProperties.getClientMetadata();
return new DefaultServiceInstance(serviceId, host, port, secure, metadata);
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
@@ -91,8 +82,9 @@ public class AnsDiscoveryClient implements DiscoveryClient {
@Override
public List<String> getServices() {
Set<String> publishers = NamingService.getPublishes();
Set<String> doms = NamingService.getDomsSubscribed();
doms.addAll(publishers);
List<String> result = new LinkedList<>();
for (String service : doms) {
result.add(service);
@@ -100,4 +92,8 @@ public class AnsDiscoveryClient implements DiscoveryClient {
return result;
}
@Override
public ServiceInstance getLocalServiceInstance() {
return ansRegistration;
}
}

View File

@@ -19,23 +19,28 @@ package org.springframework.cloud.alicloud.ans;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.alicloud.ans.migrate.MigrateOnConditionMissingClass;
import org.springframework.cloud.alicloud.ans.registry.AnsRegistration;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
/**
* @author xiaolongzuo
* @author pbting
*/
@Configuration
@Conditional(MigrateOnConditionMissingClass.class)
@ConditionalOnMissingBean(DiscoveryClient.class)
@EnableConfigurationProperties
@AutoConfigureBefore(SimpleDiscoveryClientAutoConfiguration.class)
public class AnsDiscoveryClientAutoConfiguration {
@Bean
public DiscoveryClient ansDiscoveryClient() {
return new AnsDiscoveryClient();
public DiscoveryClient ansDiscoveryClient(AnsRegistration ansRegistration) {
return new AnsDiscoveryClient(ansRegistration);
}
}

View File

@@ -16,25 +16,25 @@
package org.springframework.cloud.alicloud.ans.endpoint;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.core.Host;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.core.Host;
/**
* @author xiaolongzuo
* @author pbting
*/
public class AnsEndpoint extends AbstractEndpoint<Map<String, Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(AnsEndpoint.class);
private static final Log log = LogFactory.getLog(AnsEndpoint.class);
private AnsProperties ansProperties;
@@ -49,7 +49,7 @@ public class AnsEndpoint extends AbstractEndpoint<Map<String, Object>> {
@Override
public Map<String, Object> invoke() {
Map<String, Object> ansEndpoint = new HashMap<>();
LOGGER.info("ANS endpoint invoke, ansProperties is {}", ansProperties);
log.info("ANS endpoint invoke, ansProperties is " + ansProperties);
ansEndpoint.put("ansProperties", ansProperties);
Map<String, Object> subscribes = new HashMap<>();
@@ -64,7 +64,7 @@ public class AnsEndpoint extends AbstractEndpoint<Map<String, Object>> {
}
}
ansEndpoint.put("subscribes", subscribes);
LOGGER.info("ANS endpoint invoke, subscribes is {}", subscribes);
log.info("ANS endpoint invoke, subscribes is " + subscribes);
return ansEndpoint;
}

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.alicloud.ans.endpoint;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
@@ -25,7 +26,7 @@ import org.springframework.context.annotation.Bean;
* @author xiaolongzuo
*/
@ConditionalOnWebApplication
@ConditionalOnClass(name = "org.springframework.boot.actuate.endpoint.AbstractEndpoint")
@ConditionalOnClass(AbstractEndpoint.class)
public class AnsEndpointAutoConfiguration {
@Bean

View File

@@ -0,0 +1,34 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
* @author pbting
*/
public class MigrateEndpoint
extends AbstractEndpoint<Map<String, ConcurrentMap<String, ServerWrapper>>> {
private static final Log log = LogFactory.getLog(MigrateEndpoint.class);
public MigrateEndpoint() {
super("migrate");
}
/**
* @return ans endpoint
*/
@Override
public Map<String, ConcurrentMap<String, ServerWrapper>> invoke() {
Map<String, ConcurrentMap<String, ServerWrapper>> result = ServerListInvocationHandler
.getServerRegistry();
log.info("migrate server list :" + result);
return result;
}
}

View File

@@ -0,0 +1,20 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
/**
* @author pbting
*/
@ConditionalOnWebApplication
@ConditionalOnClass(name = "org.springframework.boot.actuate.endpoint.annotation.Endpoint")
@Conditional(MigrateOnConditionClass.class)
public class MigrateEndpointAutoConfiguration {
@Bean
public MigrateEndpoint ansEndpoint() {
return new MigrateEndpoint();
}
}

View File

@@ -0,0 +1,48 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.util.ClassUtils;
/**
* @author pbting
*/
public abstract class MigrateOnCondition implements Condition, BeanClassLoaderAware {
final String[] conditionOnClass = new String[] {
"org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistration",
"org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration" };
ClassLoader classLoader;
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public abstract boolean matches(ConditionContext context,
AnnotatedTypeMetadata metadata);
boolean isPresent(String className, ClassLoader classLoader) {
if (classLoader == null) {
classLoader = ClassUtils.getDefaultClassLoader();
}
try {
forName(className, classLoader);
return true;
}
catch (Throwable var3) {
return false;
}
}
Class<?> forName(String className, ClassLoader classLoader)
throws ClassNotFoundException {
return classLoader != null ? classLoader.loadClass(className)
: Class.forName(className);
}
}

View File

@@ -0,0 +1,22 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* @author pbting
*/
public class MigrateOnConditionClass extends MigrateOnCondition {
protected static final Log log = LogFactory.getLog(MigrateOnConditionClass.class);
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
boolean result = isPresent(conditionOnClass[0], classLoader)
|| isPresent(conditionOnClass[1], classLoader);
log.info(" matche result is " + result);
return result;
}
}

View File

@@ -0,0 +1,22 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* @author pbting
*/
public class MigrateOnConditionMissingClass extends MigrateOnConditionClass {
protected static final Log log = LogFactory
.getLog(MigrateOnConditionMissingClass.class);
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
boolean result = !super.matches(context, metadata);
log.info(" matche result is " + result);
return result;
}
}

View File

@@ -0,0 +1,102 @@
package org.springframework.cloud.alicloud.ans.migrate;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerList;
import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author pbting
*/
final class MigrateProxyManager {
private final static Log log = LogFactory.getLog(MigrateProxyManager.class);
private final static AtomicBoolean IS_PROXY = new AtomicBoolean(true);
private final static Set<String> SERVICES_ID = new ConcurrentSkipListSet<>();
private static Object springProxyFactory(Object target, ClassLoader classLoader,
List<Advice> adviceList, Class... interfaces) {
final ProxyFactory proxyFactory = new ProxyFactory(interfaces);
proxyFactory.setTarget(target);
for (Iterator<Advice> iterator = adviceList.iterator(); iterator.hasNext();) {
proxyFactory.addAdvice(iterator.next());
}
return proxyFactory.getProxy(classLoader);
}
static Object newServerListProxy(Object bean, ClassLoader classLoader,
IClientConfig clientConfig) {
List<Advice> adviceLists = new LinkedList<>();
adviceLists.add(new ServerListInvocationHandler(clientConfig));
bean = springProxyFactory(bean, classLoader, adviceLists,
new Class[] { ServerList.class });
log.info("[service id]" + clientConfig.getClientName()
+ " new a ServerList proxy instance for spring cloud netflix to spring cloud alibaba ");
collectServiceId(clientConfig.getClientName());
return bean;
}
static Object newLoadBalancerProxy(Object bean, ClassLoader classLoader,
final IClientConfig clientConfig) {
// step 1: initializer a advice for after returning advice
final List<Advice> adviceLists = new LinkedList<>();
adviceLists.add(new MethodInterceptor() {
private final IClientConfig iclientConfig = clientConfig;
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
Object returnValue = methodInvocation.proceed();
String methodName = methodInvocation.getMethod().getName();
if (!"chooseServer".equals(methodName)) {
return returnValue;
}
String serviceId = iclientConfig.getClientName();
Server server = ServerListInvocationHandler
.checkAndGetServiceServer(serviceId, (Server) returnValue);
ServerListInvocationHandler.incrementCallService(serviceId, server);
return server;
}
});
// step 2: new proxy instance by spring factory
bean = springProxyFactory(bean, classLoader, adviceLists,
new Class[] { ILoadBalancer.class });
log.info("[service id]" + clientConfig.getClientName()
+ " new a ILoadBalancer proxy instance for spring cloud netflix to spring cloud alibaba ");
return bean;
}
static void migrateProxyClose() {
IS_PROXY.set(false);
}
static void migrateProxyUp() {
IS_PROXY.set(true);
}
static boolean isMigrateProxy() {
return IS_PROXY.get();
}
static void collectServiceId(String serviceId) {
SERVICES_ID.add(serviceId);
}
static Set<String> getServicesId() {
return Collections.unmodifiableSet(SERVICES_ID);
}
}

View File

@@ -0,0 +1,95 @@
package org.springframework.cloud.alicloud.ans.migrate;
import com.netflix.loadbalancer.ILoadBalancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.context.named.NamedContextFactory;
import org.springframework.cloud.endpoint.event.RefreshEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author pbting
*/
@Component
public class MigrateRefreshEventListener implements ApplicationListener<RefreshEvent> {
private final static Log log = LogFactory.getLog(MigrateRefreshEventListener.class);
private final static String MIGRATE_SWITCH = "sca.migrate.ans.switch";
private volatile String lastScaMigrateAnsSwitchValue = "true";
private Environment environment;
private NamedContextFactory namedContextFactory;
public MigrateRefreshEventListener(Environment environment,
NamedContextFactory namedContextFactory) {
this.environment = environment;
this.namedContextFactory = namedContextFactory;
}
@PostConstruct
public void initTimerCheck() {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
onApplicationEvent(null);
}
catch (Exception e) {
log.error(
"check the value of 'sca.migrate.ans.switch' in environment whether changed or not cause an Exeption",
e);
}
}
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void onApplicationEvent(RefreshEvent event) {
String value = environment.getProperty(MIGRATE_SWITCH, "true");
// check 1: check the value
if (value.equals(lastScaMigrateAnsSwitchValue)) {
return;
}
updateLastScaMigrateAnsResetValue(value);
// step 1: migrate up
if ("true".equals(value)) {
MigrateProxyManager.migrateProxyUp();
serviceIdContextInit();
return;
}
// step 2: migrate close
if ("false".equals(value)) {
MigrateProxyManager.migrateProxyClose();
serviceIdContextInit();
return;
}
}
private void serviceIdContextInit() {
namedContextFactory.destroy();
// initializer each spring context for service id
Set<String> serviceIds = MigrateProxyManager.getServicesId();
for (Iterator<String> iterator = serviceIds.iterator(); iterator.hasNext();) {
namedContextFactory.getInstance(iterator.next(), ILoadBalancer.class);
}
}
private synchronized void updateLastScaMigrateAnsResetValue(String value) {
this.lastScaMigrateAnsSwitchValue = value;
}
}

View File

@@ -0,0 +1,62 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.ServerList;
/**
* @author pbting
*/
public class MigrateRibbonBeanPostProcessor
implements BeanPostProcessor, BeanClassLoaderAware {
protected static final Log log = LogFactory.getLog(MigrateOnCondition.class);
private ClassLoader classLoader;
private IClientConfig clientConfig;
public MigrateRibbonBeanPostProcessor(IClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
// step 1 : check the bean whether proxy or not
if (!MigrateProxyManager.isMigrateProxy()) {
log.info("Migrate proxy is Close.");
return bean;
}
// step 2 : proxy the designated bean
if (bean instanceof ServerList) {
bean = MigrateProxyManager.newServerListProxy(bean, classLoader,
clientConfig);
}
if (bean instanceof ILoadBalancer) {
bean = MigrateProxyManager.newLoadBalancerProxy(bean, classLoader,
clientConfig);
}
return bean;
}
@Override
public Object postProcessBeforeInitialization(Object o, String s)
throws BeansException {
return o;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
}

View File

@@ -0,0 +1,68 @@
package org.springframework.cloud.alicloud.ans.migrate;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.alicloud.ans.registry.AnsRegistration;
import org.springframework.cloud.alicloud.ans.registry.AnsServiceRegistry;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* @author pbting
*/
@Component
public class MigrateServiceRegistry {
private static final Log log = LogFactory.getLog(MigrateServiceRegistry.class);
private AtomicBoolean running = new AtomicBoolean(false);
private ServiceRegistry serviceRegistry;
private AnsRegistration ansRegistration;
public MigrateServiceRegistry(AnsProperties ansProperties,
ApplicationContext context) {
this.ansRegistration = new AnsRegistration(ansProperties, context);
this.ansRegistration.init();
this.serviceRegistry = new AnsServiceRegistry();
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationEvent(ApplicationReadyEvent event) {
String serverPort = event.getApplicationContext().getEnvironment()
.getProperty("server.port");
if (null != serverPort && serverPort.trim().length() > 0) {
try {
this.ansRegistration.setPort(Integer.parseInt(serverPort));
}
catch (NumberFormatException e) {
// nothing to do
}
}
log.info("[ Migrate ] change the port to " + serverPort);
if (this.running.get()) {
return;
}
if (this.ansRegistration.getPort() > 0) {
long s = System.currentTimeMillis();
log.info("[Migrate] start to registry server to ANS");
this.serviceRegistry.register(this.ansRegistration);
log.info("[migrate] end to registry server to ANS cost time with "
+ (System.currentTimeMillis() - s) + " ms.");
}
else {
log.warn("the server port is smaller than zero. "
+ this.ansRegistration.getPort());
}
this.running.set(true);
}
}

View File

@@ -0,0 +1,39 @@
package org.springframework.cloud.alicloud.ans.migrate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.alicloud.ans.ConditionalOnAnsEnabled;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import org.springframework.cloud.context.named.NamedContextFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* @author pbting
*/
@Configuration
@EnableConfigurationProperties
@Conditional(MigrateOnConditionClass.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@ConditionalOnAnsEnabled
public class MigrationAutoconfiguration {
@Bean
public MigrateServiceRegistry migrationManger(AnsProperties ansProperties,
ApplicationContext applicationContext) {
return new MigrateServiceRegistry(ansProperties, applicationContext);
}
@Bean
public MigrateRefreshEventListener migrateRefreshEventListener(
Environment environment,
@Qualifier(value = "springClientFactory") NamedContextFactory namedContextFactory) {
return new MigrateRefreshEventListener(environment, namedContextFactory);
}
}

View File

@@ -0,0 +1,168 @@
package org.springframework.cloud.alicloud.ans.migrate;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.Server;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.alicloud.ans.ribbon.AnsServer;
import org.springframework.cloud.alicloud.ans.ribbon.AnsServerList;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author pbting
*/
class ServerListInvocationHandler implements MethodInterceptor {
private final static Log log = LogFactory.getLog(ServerListInvocationHandler.class);
private final static ConcurrentMap<String, AnsServerList> SERVER_LIST_CONCURRENT_MAP = new ConcurrentHashMap<>();
private final static ConcurrentMap<String, ConcurrentMap<String, ServerWrapper>> CALL_SERVICE_COUNT = new ConcurrentHashMap<>();
private final static Set<String> INTERCEPTOR_METHOD_NAME = new ConcurrentSkipListSet();
private IClientConfig clientConfig;
private AnsServerList ansServerList;
private AtomicBoolean isFirst = new AtomicBoolean(false);
static {
INTERCEPTOR_METHOD_NAME.add("getInitialListOfServers");
INTERCEPTOR_METHOD_NAME.add("getUpdatedListOfServers");
}
ServerListInvocationHandler(IClientConfig clientConfig) {
this.clientConfig = clientConfig;
this.ansServerList = new AnsServerList(clientConfig.getClientName());
SERVER_LIST_CONCURRENT_MAP.put(ansServerList.getDom(), ansServerList);
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
String methodName = invocation.getMethod().getName();
// step 1: check the method is not interceptor
if (!INTERCEPTOR_METHOD_NAME.contains(methodName)) {
return invocation.proceed();
}
// step 2: interceptor the method
List<Server> serverList = (List<Server>) invocation.proceed();
long s = System.currentTimeMillis();
log.info("[ START ] merage server list for " + clientConfig.getClientName());
serverList = mergeAnsServerList(serverList);
log.info("[ END ] merage server list for " + clientConfig.getClientName()
+ ", cost time " + (System.currentTimeMillis() - s) + " ms .");
return serverList;
}
/**
* 后台线程 和 Eureka 两个注册中心进行 Merage 的时候List 表中始终保持是有效的 Server. 即不考虑 ANS 客户端本地容灾的情况
*/
private List<Server> mergeAnsServerList(final List<Server> source) {
if (source.size() > 0 && isFirst.compareAndSet(false, true)) {
return source;
}
// step 1: get all of server list and filter the alive
List<AnsServer> ansServerList = filterAliveAnsServer(
this.ansServerList.getInitialListOfServers());
if (ansServerList.isEmpty()) {
return source;
}
log.info("[" + this.clientConfig.getClientName() + "] Get Server List from ANS:"
+ ansServerList + "; loadbalancer server list override before:" + source);
// step 2:remove servers of that have in load balancer list
final Iterator<Server> serverIterator = source.iterator();
while (serverIterator.hasNext()) {
final Server server = serverIterator.next();
for (Iterator<AnsServer> iterator = ansServerList.iterator(); iterator
.hasNext();) {
AnsServer ansServer = iterator.next();
if (server.getHostPort()
.equals(ansServer.getHealthService().toInetAddr())) {
// fix bug: mast be set the zone, update server list,will filter
// by: ZoneAffinityPredicate
ansServer.setZone(server.getZone());
ansServer.setSchemea(server.getScheme());
ansServer.setId(server.getId());
ansServer.setReadyToServe(true);
serverIterator.remove();
log.info("Source Server is remove " + server.getHostPort()
+ ", and from ANS Server is override"
+ ansServer.toString());
}
}
}
for (Iterator<AnsServer> iterator = ansServerList.iterator(); iterator
.hasNext();) {
source.add(iterator.next());
}
log.info("[" + this.clientConfig.getClientName() + "] "
+ "; loadbalancer server list override after:" + source);
// override
return source;
}
private List<AnsServer> filterAliveAnsServer(List<AnsServer> sourceServerList) {
final List<AnsServer> resultServerList = new LinkedList<>();
for (Iterator<AnsServer> iterator = sourceServerList.iterator(); iterator
.hasNext();) {
AnsServer ansServer = iterator.next();
boolean isAlive = ansServer.isAlive(3);
if (isAlive) {
resultServerList.add(ansServer);
}
log.warn(ansServer.toString() + " isAlive :" + isAlive);
}
return resultServerList;
}
static Map<String, ConcurrentMap<String, ServerWrapper>> getServerRegistry() {
return Collections.unmodifiableMap(CALL_SERVICE_COUNT);
}
static Server checkAndGetServiceServer(String serviceId, Server server) {
if (server == null) {
log.warn(String.format("[%s] refers the server is null", server));
}
server = (server == null ? SERVER_LIST_CONCURRENT_MAP.get(serviceId)
.getInitialListOfServers().get(0) : server);
return server;
}
/**
*
* @param serviceId
* @param server
* @return
*/
static long incrementCallService(String serviceId, Server server) {
ConcurrentMap<String, ServerWrapper> concurrentHashMap = CALL_SERVICE_COUNT
.putIfAbsent(serviceId, new ConcurrentHashMap<String, ServerWrapper>());
if (concurrentHashMap == null) {
concurrentHashMap = CALL_SERVICE_COUNT.get(serviceId);
}
String ipPort = server.getHostPort();
ServerWrapper serverWraper = concurrentHashMap.putIfAbsent(ipPort,
new ServerWrapper(server, new AtomicLong()));
if (serverWraper == null) {
serverWraper = concurrentHashMap.get(ipPort);
}
serverWraper.setServer(server);
return serverWraper.getCallCount().incrementAndGet();
}
}

View File

@@ -0,0 +1,38 @@
package org.springframework.cloud.alicloud.ans.migrate;
import com.netflix.loadbalancer.Server;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author pbting
*/
public class ServerWrapper {
private Server server;
private AtomicLong callCount;
public ServerWrapper() {
}
public ServerWrapper(Server server, AtomicLong callCount) {
this.server = server;
this.callCount = callCount;
}
public Server getServer() {
return server;
}
public void setServer(Server server) {
this.server = server;
}
public AtomicLong getCallCount() {
return callCount;
}
public void setCallCount(AtomicLong callCount) {
this.callCount = callCount;
}
}

View File

@@ -16,24 +16,27 @@
package org.springframework.cloud.alicloud.ans.registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
import org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationProperties;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author xiaolongzuo
* @author pbting
*/
@Component
public class AnsAutoServiceRegistration
extends AbstractAutoServiceRegistration<AnsRegistration> {
private static final Logger LOGGER = LoggerFactory
.getLogger(AnsAutoServiceRegistration.class);
@Autowired
private static final Log log = LogFactory.getLog(AnsAutoServiceRegistration.class);
private AnsRegistration registration;
public AnsAutoServiceRegistration(ServiceRegistry<AnsRegistration> serviceRegistry,
@@ -59,13 +62,23 @@ public class AnsAutoServiceRegistration
@Override
protected AnsRegistration getManagementRegistration() {
return null;
return registration;
}
@Override
public void start() {
// nothing to do
}
@EventListener(EmbeddedServletContainerInitializedEvent.class)
public void doStart() {
super.start();
}
@Override
protected void register() {
if (!this.registration.getAnsProperties().isRegisterEnabled()) {
LOGGER.debug("Registration disabled.");
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
@@ -83,16 +96,6 @@ public class AnsAutoServiceRegistration
}
@Override
protected int getConfiguredPort() {
return this.getPort().get();
}
@Override
protected void setConfiguredPort(int port) {
this.getPort().set(port);
}
@Override
protected Object getConfiguration() {
return this.registration.getAnsProperties();
@@ -110,4 +113,13 @@ public class AnsAutoServiceRegistration
return StringUtils.isEmpty(appName) ? super.getAppName() : appName;
}
@Override
protected int getConfiguredPort() {
return registration.getPort();
}
@Override
protected void setConfiguredPort(int port) {
this.registration.setPort(port);
}
}

View File

@@ -16,12 +16,6 @@
package org.springframework.cloud.alicloud.ans.registry;
import java.net.URI;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.alicloud.context.ans.AnsProperties;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
@@ -31,6 +25,10 @@ import org.springframework.context.ApplicationContext;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.net.URI;
import java.util.Map;
/**
* @author xiaolongzuo
*/
@@ -40,12 +38,14 @@ public class AnsRegistration implements Registration, ServiceInstance {
private static final String MANAGEMENT_CONTEXT_PATH = "management.context-path";
private static final String MANAGEMENT_ADDRESS = "management.address";
@Autowired
private AnsProperties ansProperties;
@Autowired
private ApplicationContext context;
public AnsRegistration(AnsProperties ansProperties, ApplicationContext context) {
this.ansProperties = ansProperties;
this.context = context;
}
@PostConstruct
public void init() {
@@ -54,8 +54,9 @@ public class AnsRegistration implements Registration, ServiceInstance {
if (null != managementPort) {
Map<String, String> metadata = ansProperties.getClientMetadata();
metadata.put(MANAGEMENT_PORT, managementPort.toString());
String contextPath = env.getProperty("management.context-path");
String address = env.getProperty("management.address");
String contextPath = env
.getProperty("management.server.servlet.context-path");
String address = env.getProperty("management.server.address");
if (!StringUtils.isEmpty(contextPath)) {
metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath);
}
@@ -63,6 +64,11 @@ public class AnsRegistration implements Registration, ServiceInstance {
metadata.put(MANAGEMENT_ADDRESS, address);
}
}
String serverPort = env.getProperty("server.port");
if (null != serverPort) {
this.setPort(Integer.valueOf(serverPort));
}
}
@Override

View File

@@ -16,24 +16,23 @@
package org.springframework.cloud.alicloud.ans.registry;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.ipms.NodeReactor;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.ipms.NodeReactor;
/**
* @author xiaolongzuo
*/
public class AnsServiceRegistry implements ServiceRegistry<AnsRegistration> {
private static Logger logger = LoggerFactory.getLogger(AnsServiceRegistry.class);
private static Log log = LogFactory.getLog(AnsServiceRegistry.class);
private static final String SEPARATOR = ",";
@@ -41,11 +40,11 @@ public class AnsServiceRegistry implements ServiceRegistry<AnsRegistration> {
public void register(AnsRegistration registration) {
if (!registration.isRegisterEnabled()) {
logger.info("Registration is disabled...");
log.info("Registration is disabled...");
return;
}
if (StringUtils.isEmpty(registration.getServiceId())) {
logger.info("No service to register for client...");
log.info("No service to register for client...");
return;
}
@@ -63,13 +62,14 @@ public class AnsServiceRegistry implements ServiceRegistry<AnsRegistration> {
NamingService.regDom(dom, registration.getHost(), registration.getPort(),
registration.getRegisterWeight(dom), registration.getCluster(),
tags);
logger.info("INFO_ANS_REGISTER, {} {}:{} register finished", dom,
registration.getAnsProperties().getClientIp(),
registration.getAnsProperties().getClientPort());
log.info("INFO_ANS_REGISTER, " + dom + " "
+ registration.getAnsProperties().getClientIp() + ":"
+ registration.getAnsProperties().getClientPort()
+ " register finished");
}
catch (Exception e) {
logger.error("ERR_ANS_REGISTER, {} register failed...{},", dom,
registration.toString(), e);
log.error("ERR_ANS_REGISTER, " + dom + " register failed..."
+ registration.toString() + ",", e);
}
}
}
@@ -77,10 +77,10 @@ public class AnsServiceRegistry implements ServiceRegistry<AnsRegistration> {
@Override
public void deregister(AnsRegistration registration) {
logger.info("De-registering from ANSServer now...");
log.info("De-registering from ANSServer now...");
if (StringUtils.isEmpty(registration.getServiceId())) {
logger.info("No dom to de-register for client...");
log.info("No dom to de-register for client...");
return;
}
@@ -89,11 +89,11 @@ public class AnsServiceRegistry implements ServiceRegistry<AnsRegistration> {
registration.getPort(), registration.getCluster());
}
catch (Exception e) {
logger.error("ERR_ANS_DEREGISTER, de-register failed...{},",
registration.toString(), e);
log.error("ERR_ANS_DEREGISTER, de-register failed..."
+ registration.toString() + ",", e);
}
logger.info("De-registration finished.");
log.info("De-registration finished.");
}
@Override

View File

@@ -17,7 +17,9 @@
package org.springframework.cloud.alicloud.ans.ribbon;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.alicloud.ans.migrate.MigrateOnConditionMissingClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import com.netflix.client.config.IClientConfig;
@@ -25,13 +27,15 @@ import com.netflix.loadbalancer.ServerList;
/**
* @author xiaolongzuo
* @author pbting
*/
@Configuration
@Conditional(MigrateOnConditionMissingClass.class)
public class AnsRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config) {
public ServerList<?> ansRibbonServerList(IClientConfig config) {
AnsServerList serverList = new AnsServerList(config.getClientName());
return serverList;
}

View File

@@ -16,14 +16,19 @@
package org.springframework.cloud.alicloud.ans.ribbon;
import java.util.Collections;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.core.Host;
import com.netflix.loadbalancer.Server;
/**
* @author xiaolongzuo
* @author pbting
*/
public class AnsServer extends Server {
@@ -34,7 +39,8 @@ public class AnsServer extends Server {
public AnsServer(final Host host, final String dom) {
super(host.getIp(), host.getPort());
this.host = host;
this.metadata = Collections.emptyMap();
this.metadata = new HashMap();
this.metadata.put("source", "ANS");
metaInfo = new MetaInfo() {
@Override
public String getAppName() {
@@ -48,16 +54,44 @@ public class AnsServer extends Server {
@Override
public String getServiceIdForDiscovery() {
return null;
return dom;
}
@Override
public String getInstanceId() {
return null;
return AnsServer.this.host.getIp() + ":" + dom + ":"
+ AnsServer.this.host.getPort();
}
};
}
@Override
public boolean isAlive() {
return true;
}
/**
*
* @param timeOut Unit: Seconds
* @return
*/
public boolean isAlive(long timeOut) {
try {
String hostName = this.host.getHostname();
hostName = hostName != null && hostName.trim().length() > 0 ? hostName
: this.host.getIp();
Socket socket = new Socket();
socket.connect(new InetSocketAddress(hostName, this.host.getPort()),
(int) TimeUnit.SECONDS.toMillis(timeOut));
socket.close();
return true;
}
catch (IOException e) {
return false;
}
}
@Override
public MetaInfo getMetaInfo() {
return metaInfo;
@@ -71,4 +105,9 @@ public class AnsServer extends Server {
return metadata;
}
@Override
public String toString() {
return "AnsServer{" + "metaInfo=" + metaInfo + ", host=" + host + ", metadata="
+ metadata + '}';
}
}

View File

@@ -16,16 +16,17 @@
package org.springframework.cloud.alicloud.ans.ribbon;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.ans.core.NamingService;
import com.alibaba.ans.shaded.com.taobao.vipserver.client.core.Host;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractServerList;
import java.util.ArrayList;
import java.util.List;
/**
* @author xiaolongzuo
* @author pbting
*/
public class AnsServerList extends AbstractServerList<AnsServer> {
@@ -60,10 +61,12 @@ public class AnsServerList extends AbstractServerList<AnsServer> {
List<AnsServer> result = new ArrayList<AnsServer>(hosts.size());
for (Host host : hosts) {
if (host.isValid()) {
result.add(hostToServer(host));
AnsServer ansServer = hostToServer(host);
if (ansServer.isAlive(3)) {
result.add(ansServer);
}
}
}
return result;
}

View File

@@ -0,0 +1,20 @@
package org.springframework.cloud.alicloud.ans.ribbon;
import org.springframework.cloud.alicloud.ans.migrate.MigrateRibbonBeanPostProcessor;
import org.springframework.cloud.alicloud.ans.migrate.MigrateOnConditionClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import com.netflix.client.config.IClientConfig;
@Configuration
@Conditional(MigrateOnConditionClass.class)
public class MigrateRibbonCofiguration {
@Bean
public MigrateRibbonBeanPostProcessor migrateBeanPostProcessor(IClientConfig clientConfig) {
return new MigrateRibbonBeanPostProcessor(clientConfig);
}
}

View File

@@ -34,6 +34,7 @@ import org.springframework.context.annotation.Configuration;
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnRibbonAns
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = AnsRibbonClientConfiguration.class)
@RibbonClients(defaultConfiguration = { AnsRibbonClientConfiguration.class,
MigrateRibbonCofiguration.class })
public class RibbonAnsAutoConfiguration {
}

View File

@@ -1,6 +1,8 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.alicloud.ans.endpoint.AnsEndpointAutoConfiguration,\
org.springframework.cloud.alicloud.ans.ribbon.RibbonAnsAutoConfiguration,\
org.springframework.cloud.alicloud.ans.AnsAutoConfiguration
org.springframework.cloud.alicloud.ans.AnsAutoConfiguration,\
org.springframework.cloud.alicloud.ans.migrate.MigrateEndpointAutoConfiguration,\
org.springframework.cloud.alicloud.ans.migrate.MigrationAutoconfiguration
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
org.springframework.cloud.alicloud.ans.AnsDiscoveryClientAutoConfiguration
org.springframework.cloud.alicloud.ans.AnsDiscoveryClientAutoConfiguration