vlambda博客
学习文章列表

HystrixCommand使用及创建线程池问题

目录
      一、背景
       二、问题
      三、HystrixCommand使用、线程池分析
      总结

一、背景
目前Java服务主流的构建方式是使用SpringBoot,在具体的业务场景中,很多也都会使用SpingCloud搭建服务框架,在使用SpringCloud后也必然会用到Hystrix做熔断降级,说起Hystrix就离不开核心注解:@HystrixCommand,@HystrixCommand注解可以配置的除了常用的groupKey、commandKey、fallbackMethod等,还有一个很关键的就是threadPoolKey,就是使用Hystrix线程隔离策略时的线程池Key。
@HystrixCommand注解源码如下:
/** * This annotation used to specify some methods which should be processes as hystrix commands. */@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface HystrixCommand {
/** * The command group key is used for grouping together commands such as for reporting, * alerting, dashboards or team/library ownership. * <p/> * default => the runtime class name of annotated method * * @return group key */ String groupKey() default "";
/** * Hystrix command key. * <p/> * default => the name of annotated method. for example: * <code> * ... * @HystrixCommand * public User getUserById(...) * ... * the command name will be: 'getUserById' * </code> * * @return command key */ String commandKey() default "";
/** * The thread-pool key is used to represent a * HystrixThreadPool for monitoring, metrics publishing, caching and other such uses. * * @return thread pool key */ String threadPoolKey() default ""; ......省略
从@HystrixCommand的源码注释来看:
1、groupKey的默认值是使用@HystrixCommand标注的方法所在的类名2、commandKey的默认值是@HystrixCommand标注的方法名,即每个方法会被当做一个HystrixCommand
3、threadPoolKey没有默认值,其实是和 groupK ey保持一致
二、问题
如果我们使用Hystrix的过程中,直接在类中的方法上只添加@HystrixCommand注解,则每一个类都会独立创建一个线程池。
如下只 添加@HystrixCommand注解

线程状态总览:

HystrixCommand使用及创建线程池问题

线程池使用情况如下图所示:

HystrixCommand使用及创建线程池问题

由此可见,创建了很多的线程,而且,大部分线程处于等待、限时等待状态,使用率并不高。线程池也创建了很多,默认线程池名称为hystrix-类名-*,而且线程池中的线程都处于空闲状态,过多的线程白白消耗CPU的调度资源,会带来一定的开销。

三、HystrixCommand使用、线程池分析
正确的使用方法常用的一般有两种:
①、第一种就是只使用@HystrixCommand注解
所有的属性都在HystrixCommand注解中进行指定,这样每个方法都有一个自己独立的线程池配置和Command配置,如下:

HystrixCommand使用及创建线程池问题

在自己的HystrixCommand注解中可以指定groupKey/threadPoolKey使用哪个线程池,也可以指定commandKey使用哪个超时指令,还有fallbackMethod执行失败回调降级方法,当然还有commandProperties/threadPoolProperties这两个可以定制线程池参数和Command参数。

②、第二种可以java类注解和方法注解结合使用

如下:

在类上添加DefaultProperties注解

HystrixCommand使用及创建线程池问题

在方法上添加HystrixCommand注解

HystrixCommand使用及创建线程池问题

这样在方法上添加的HystrixCommand注解可以不添加任何属性,然后在类上@DefaultProperties上指定自己想要用的属性就行了,比如指定线程池,指定CommandKey等,而如果既在类上添加了@DefaultProperties以及其属性,又在方法上添加了@HystrixCommand及其属性,则以HystrixCommand为准。

以上是@HystrixCommand注解的使用说明,接下来简单介绍Hystrix线程池的创建。

在使用HystrixCommand注解后,如果我们想看Hystrix的源码,其中一个重要的切入点就是HystrixCommand的AOP切入逻辑,这块逻辑在HystrixCommandAspect这个类中,

被@HystrixCommand标注的方法都会被AOP拦截,在aop拦截逻辑中有获取groupKey、commandKey以及默认threadPoolKey和创建线程池等等逻辑。具体过程不在这里介绍了,直接看线程池的创建,线程池的创建最终会执行到AbstractCommand#initThreadPool()方法。

其源码逻辑大致如下:

private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { // get the default implementation of HystrixThreadPool return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; }}
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name();
    // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; }
    // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key);}
先根据threadPoolKey尝试从threadPools这个ConcurrentHashMap<String, HystrixThreadPool>中获取线程池,即从线程池缓存中获取,有就直接返回之前的缓存,如果没有,synchromized对HystrixThreadPool类上锁后,再次判断还是没有threadPoolKey的缓存,就 new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); //threadPoolProperties HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); //并发策略 this.queueSize = properties.maxQueueSize().get(); //线程池队列大小
//创建HystrixThreadPoolMetrics,其中concurrencyStrategy.getThreadPool()会创建线程池 this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);}
// concurrencyStrategy.getThreadPool()时会创建ThreadPoolExecutorpublic ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); //是否允许maximumSize生效 final int dynamicCoreSize = threadPoolProperties.coreSize().get(); //动态coreSize final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); //大于coreSize的线程,未使用的保活时间 final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); //线程队列最大值 final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();         if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);        }     else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}

至此,在这里就真正创建了JUC包下面的ThreadPoolExecutor线程池

其中我们需要特别注意一下这个抽象类HystrixConcurrencyStrategy

这个抽象类中有以下这些方法:

HystrixCommand使用及创建线程池问题

到这里我们应该感到高兴,其实这里是我们的一个扩展点:

我们在这里可以进行创建线程池的自定义操作、创建工作队列的自定义操作,以及线程执行前的一些自定义操作,也就是说可以根据我们的具体业务需求进行定制。

比如:

①、tid的透传

②、Hystrix线程池的监控接入

总之,是给我们留了可扩展的入手点。


总结

          ①、groupKey的默认值是使用@HystrixCommand标注的方法所在的类名
          ②、commandKey的默认值是@HystrixCommand标注的方法名,即每个方法会被当做一个HystrixCommand
          ③、threadPoolKey没有默认值,和groupKey保持一致
          ④、可以通过在类上加@DefaultProperties( threadPoolKey="xxx" )设置默认的threadPoolKey然后在方法上直接加@HystrixCommand
          ⑤、可以只通过@HystrixCommand( threadPoolKey="xxx" ) 指定当前HystrixCommand实例的threadPoolKey等其他属性
          ⑥、HystrixConcurrencyStrategy给我们提供了很多扩展点,可以根据业务需要进行定制扩展

以上是个人的亲身经历及总结经验,个人之见,难免考虑不全,如果大家有更好的建议欢迎大家私信留言。

如果觉得对你有一点点帮助,希望能够动动小手,你的点赞是对我最大的鼓励支持