diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java index 59253059..338d1c92 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/hystrix/SeataHystrixConcurrencyStrategy.java @@ -16,23 +16,104 @@ package com.alibaba.cloud.seata.feign.hystrix; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; +import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; +import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; +import com.netflix.hystrix.strategy.properties.HystrixProperty; import io.seata.core.context.RootContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; /** * @author xiaojing */ public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { + private final Logger logger = LoggerFactory + .getLogger(SeataHystrixConcurrencyStrategy.class); private HystrixConcurrencyStrategy delegate; public SeataHystrixConcurrencyStrategy() { - this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy(); - HystrixPlugins.reset(); - HystrixPlugins.getInstance().registerConcurrencyStrategy(this); + try { + this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy(); + if (this.delegate instanceof SeataHystrixConcurrencyStrategy) { + return; + } + HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins + .getInstance().getCommandExecutionHook(); + HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance() + .getEventNotifier(); + HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance() + .getMetricsPublisher(); + HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance() + .getPropertiesStrategy(); + logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, + propertiesStrategy); + HystrixPlugins.reset(); + HystrixPlugins.getInstance().registerConcurrencyStrategy(this); + HystrixPlugins.getInstance() + .registerCommandExecutionHook(commandExecutionHook); + HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); + HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); + HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); + } + catch (Exception ex) { + logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex); + } + } + + private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier, + HystrixMetricsPublisher metricsPublisher, + HystrixPropertiesStrategy propertiesStrategy) { + if (logger.isDebugEnabled()) { + logger.debug("Current Hystrix plugins configuration is [" + + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier [" + + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "]," + + "propertiesStrategy [" + propertiesStrategy + "]," + "]"); + logger.debug("Registering Seata Hystrix Concurrency Strategy."); + } + } + + @Override + public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, + HystrixProperty corePoolSize, + HystrixProperty maximumPoolSize, + HystrixProperty keepAliveTime, TimeUnit unit, + BlockingQueue workQueue) { + return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, + keepAliveTime, unit, workQueue); + } + + @Override + public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, + HystrixThreadPoolProperties threadPoolProperties) { + return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties); + } + + @Override + public BlockingQueue getBlockingQueue(int maxQueueSize) { + return this.delegate.getBlockingQueue(maxQueueSize); + } + + @Override + public HystrixRequestVariable getRequestVariable( + HystrixRequestVariableLifecycle rv) { + return this.delegate.getRequestVariable(rv); } @Override @@ -52,7 +133,8 @@ public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy return wrappedCallable; } - return new SeataContextCallable<>(wrappedCallable); + return new SeataContextCallable<>(wrappedCallable, + RequestContextHolder.getRequestAttributes()); } private static class SeataContextCallable implements Callable { @@ -61,19 +143,24 @@ public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy private final String xid; - SeataContextCallable(Callable actual) { + private final RequestAttributes requestAttributes; + + SeataContextCallable(Callable actual, RequestAttributes requestAttribute) { this.actual = actual; + this.requestAttributes = requestAttribute; this.xid = RootContext.getXID(); } @Override public K call() throws Exception { try { + RequestContextHolder.setRequestAttributes(requestAttributes); RootContext.bind(xid); return actual.call(); } finally { RootContext.unbind(); + RequestContextHolder.resetRequestAttributes(); } }