美团Cat监控集成Hystrix线程池链路分析
Cat-client 提供给业务以及中间层埋点的底层SDK。
Cat-consumer 用于实时分析从客户端提供的数据。
Cat-home 作为用户给用户提供展示的控制端。
链路监控的重要性不言而喻,他可以帮助我们遇到问题时快速的定位问题,也能帮助我们找到系统的优化点。
在未集成Hystrix线程池的时候一切都是没有问题的,但是使用了Hystrix线程池之后, Cat的链路就断了。
只有一个最终调用的结果耗时,这绝对不是我们想要的。
所以基于这个起因,本文就来介绍Cat在集成Hystrix线程池实现链路监控上的一些的尝试。
public class KapiHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
HystrixPlugins.reset();
//注册新的并发策略
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
} catch (Exception e) {
LoggerUtil.logger().error("构建hystrix并发策略异常:", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier, HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) {
}
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
public <T> WrappedCallable<T> wrapCallable(Callable<T> callable) {
return this.delegate.wrapCallable(callable);
}
}
public static <T> T newTransaction(Callable<T> function, String type, String name, Map<String, Object> data) throws Exception {
Transaction transaction = Cat.newTransaction(type, name);
if (data != null && !data.isEmpty()) {
data.forEach(transaction::addData);
}
try {
T result = function.call();
transaction.setStatus(Message.SUCCESS);
return result;
} catch (Exception e) {
Cat.logError(e);
if (e.getMessage() != null) {
Cat.logEvent(type + "_Error", name + e.getMessage());
}
transaction.setStatus(e);
throw e;
} finally {
transaction.complete();
}
}
也就是上面Cat.newTransaction(type, name)之处。
继续看Cat.newTransaction(type, name)源码。
public static Transaction newTransaction(String type, String name) {
if (isEnabled()) {
try {
return getProducer().newTransaction(type, name);
} catch (Exception var3) {
errorHandler(var3);
return NullMessage.TRANSACTION;
}
} else {
return NullMessage.TRANSACTION;
}
}
public Transaction newTransaction(String type, String name) {
if (!this.manager.hasContext()) {
this.manager.setup();
}
DefaultTransaction transaction = new DefaultTransaction(type, name, this.manager);
this.manager.start(transaction, false);
return transaction;
}
直到进入到DefaultMessageProducer的newTransaction(String type, String name)方法。
在这里可以看到,判断manager(DefaultMessageManager)的hasContext()这个方法是否为空,如果为空,就调用setup()方法。
进入hasContext()这个方法。
private ThreadLocal<DefaultMessageManager.Context> context = new ThreadLocal();
public boolean hasContext() {
DefaultMessageManager.Context context = (DefaultMessageManager.Context)this.context.get();
return context != null;
}
public void setup() {
DefaultMessageManager.Context ctx = new DefaultMessageManager.Context(this.domain, this.hostName, this.ip);
double samplingRate = this.configService.getSamplingRate();
if (samplingRate < 1.0D && this.hitSample(samplingRate)) {
ctx.tree.setHitSample(true);
}
this.context.set(ctx);
}
public class DefaultMessageManager implements MessageManager {
......
private ThreadLocal<DefaultMessageManager.Context> context = new ThreadLocal();
private static MessageManager INSTANCE = new DefaultMessageManager();
}
而且,我在DefaultMessageManager的接口处也发现了这么一句话。
/**
* Message manager to help build CAT message.
* <p>
* <p>
* Notes: This method is reserved for internal usage only. Application developer should never call this method directly.
*/
public interface MessageManager {
}