美团Cat监控集成Hystrix线程池链路分析
Cat-client 提供给业务以及中间层埋点的底层SDK。
Cat-consumer 用于实时分析从客户端提供的数据。
Cat-home 作为用户给用户提供展示的控制端。
链路监控的重要性不言而喻,他可以帮助我们遇到问题时快速的定位问题,也能帮助我们找到系统的优化点。
在未集成Hystrix线程池的时候一切都是没有问题的,但是使用了Hystrix线程池之后, Cat的链路就断了。
只有一个最终调用的结果耗时,这绝对不是我们想要的。
所以基于这个起因,本文就来介绍Cat在集成Hystrix线程池实现链路监控上的一些的尝试。
public class KapiHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {private HystrixConcurrencyStrategy delegate;public RequestAttributeHystrixConcurrencyStrategy() {try {this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {return;}HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);HystrixPlugins.reset();//注册新的并发策略HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);HystrixPlugins.getInstance().registerConcurrencyStrategy(this);} catch (Exception e) {LoggerUtil.logger().error("构建hystrix并发策略异常:", e);}}private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier, HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) {}public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);}public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {return this.delegate.getBlockingQueue(maxQueueSize);}public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {return this.delegate.getRequestVariable(rv);}public <T> WrappedCallable<T> wrapCallable(Callable<T> callable) {return this.delegate.wrapCallable(callable);}}
public static <T> T newTransaction(Callable<T> function, String type, String name, Map<String, Object> data) throws Exception {Transaction transaction = Cat.newTransaction(type, name);if (data != null && !data.isEmpty()) {data.forEach(transaction::addData);}try {T result = function.call();transaction.setStatus(Message.SUCCESS);return result;} catch (Exception e) {Cat.logError(e);if (e.getMessage() != null) {Cat.logEvent(type + "_Error", name + e.getMessage());}transaction.setStatus(e);throw e;} finally {transaction.complete();}}
也就是上面Cat.newTransaction(type, name)之处。
继续看Cat.newTransaction(type, name)源码。
public static Transaction newTransaction(String type, String name) {if (isEnabled()) {try {return getProducer().newTransaction(type, name);} catch (Exception var3) {errorHandler(var3);return NullMessage.TRANSACTION;}} else {return NullMessage.TRANSACTION;}}public Transaction newTransaction(String type, String name) {if (!this.manager.hasContext()) {this.manager.setup();}DefaultTransaction transaction = new DefaultTransaction(type, name, this.manager);this.manager.start(transaction, false);return transaction;}
直到进入到DefaultMessageProducer的newTransaction(String type, String name)方法。
在这里可以看到,判断manager(DefaultMessageManager)的hasContext()这个方法是否为空,如果为空,就调用setup()方法。
进入hasContext()这个方法。
private ThreadLocal<DefaultMessageManager.Context> context = new ThreadLocal();public boolean hasContext() {DefaultMessageManager.Context context = (DefaultMessageManager.Context)this.context.get();return context != null;}
public void setup() {DefaultMessageManager.Context ctx = new DefaultMessageManager.Context(this.domain, this.hostName, this.ip);double samplingRate = this.configService.getSamplingRate();if (samplingRate < 1.0D && this.hitSample(samplingRate)) {ctx.tree.setHitSample(true);}this.context.set(ctx);}
public class DefaultMessageManager implements MessageManager {......private ThreadLocal<DefaultMessageManager.Context> context = new ThreadLocal();private static MessageManager INSTANCE = new DefaultMessageManager();}
而且,我在DefaultMessageManager的接口处也发现了这么一句话。
/*** Message manager to help build CAT message.* <p>* <p>* Notes: This method is reserved for internal usage only. Application developer should never call this method directly.*/public interface MessageManager {}
