diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java index 59253059..94185c8a 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java @@ -16,23 +16,93 @@ package com.alibaba.cloud.seata.feign.hystrix; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; +import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; +import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; +import com.netflix.hystrix.strategy.properties.HystrixProperty; import io.seata.core.context.RootContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; /** * @author xiaojing */ public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { + private final Logger logger = LoggerFactory.getLogger(SeataHystrixConcurrencyStrategy.class); private HystrixConcurrencyStrategy delegate; public SeataHystrixConcurrencyStrategy() { - this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy(); - HystrixPlugins.reset(); - HystrixPlugins.getInstance().registerConcurrencyStrategy(this); + try { + this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy(); + if (this.delegate instanceof SeataHystrixConcurrencyStrategy) { + return; + } + HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook(); + HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); + HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher(); + HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); + logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy); + HystrixPlugins.reset(); + HystrixPlugins.getInstance().registerConcurrencyStrategy(this); + HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); + HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); + HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); + HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); + } catch (Exception ex) { + logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex); + } + } + + private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier, + HystrixMetricsPublisher metricsPublisher, + HystrixPropertiesStrategy propertiesStrategy) { + if (logger.isDebugEnabled()) { + logger.debug("Current Hystrix plugins configuration is [" + "concurrencyStrategy [" + this.delegate + "]," + + "eventNotifier [" + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "]," + + "propertiesStrategy [" + propertiesStrategy + "]," + "]"); + logger.debug("Registering Seata Hystrix Concurrency Strategy."); + } + } + + @Override + public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty corePoolSize, + HystrixProperty maximumPoolSize, + HystrixProperty keepAliveTime, TimeUnit unit, + BlockingQueue workQueue) { + return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, + workQueue); + } + + @Override + public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, + HystrixThreadPoolProperties threadPoolProperties) { + return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties); + } + + @Override + public BlockingQueue getBlockingQueue(int maxQueueSize) { + return this.delegate.getBlockingQueue(maxQueueSize); + } + + @Override + public HystrixRequestVariable getRequestVariable(HystrixRequestVariableLifecycle rv) { + return this.delegate.getRequestVariable(rv); } @Override @@ -44,15 +114,14 @@ public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy Callable wrappedCallable; if (this.delegate != null) { wrappedCallable = this.delegate.wrapCallable(c); - } - else { + } else { wrappedCallable = c; } if (wrappedCallable instanceof SeataContextCallable) { return wrappedCallable; } - return new SeataContextCallable<>(wrappedCallable); + return new SeataContextCallable<>(wrappedCallable, RequestContextHolder.getRequestAttributes()); } private static class SeataContextCallable implements Callable { @@ -61,19 +130,23 @@ public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy private final String xid; - SeataContextCallable(Callable actual) { + private final RequestAttributes requestAttributes; + + SeataContextCallable(Callable actual, RequestAttributes requestAttribute) { this.actual = actual; + this.requestAttributes = requestAttribute; this.xid = RootContext.getXID(); } @Override public K call() throws Exception { try { + RequestContextHolder.setRequestAttributes(requestAttributes); RootContext.bind(xid); return actual.call(); - } - finally { + } finally { RootContext.unbind(); + RequestContextHolder.resetRequestAttributes(); } }