vlambda博客
学习文章列表

线程池:业务代码常见的问题

线程池:业务代码常见的问题

在这里插入图片描述

在程序中,我们会使用各种池优化缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定策略调整池中缓存的对象数量,实现动态伸缩。


由于线程的创建比较昂贵,随意、没有控制地创建大量线程会造成性能问题,因此短平快的任务一般优先考虑使用线程池来处理,而不是直接创建线程



1. 线程池的声明需要手动进行

Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池。《阿里 巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。这一条规则的背后,是大量血淋淋的生产事故,最典 型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

阿里巴巴文档:

在这里插入图片描述

测试 OOM问题 :

  • 来初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提 交任务,每个任务都会创建一个比较大的字符串然后休眠一小时
public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100000);
        System.out.println("开始执行");
        for (int i = 0; i < 100000000; i++) {
            executorService.execute(() -> {
                String payload = IntStream.rangeClosed(11000000)
                        .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                System.out.println("等待一小时开始");
                try {
                    TimeUnit.HOURS.sleep(1);
                }catch (Exception e){
                    log.info(payload);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1,TimeUnit.HOURS);
    }

结果:java.lang.OutOfMemoryError 错误

在这里插入图片描述

首先我们看下 newFixedThreadPool 方法的源码,发现,线程池的工作队列直接 new 了一个 LinkedBlockingQueue,

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

点进去查看 LinkedBlockingQueue构造方法 是一个  Integer.MAX_VALUE长度的队列,可以认为是无界的

 /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

虽然 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界但。如果任务较多并且执行较慢但话,队列可能会快速积压,撑爆内存导致OOM


测试newCachedThreadPool

如果我们把 newFixedThreadPool 改成 newCachedThreadPool方法来获取线程池。程序运行不久后,同样会看到 OOM 异常

java.lang.OutOfMemoryError: unable to create new native thread

源码:

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

这种线程池的最大线程数是 Integer.MAX_VALUE ,认为是没有上限的,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。由于我们的任务需要一小时才能完成,大量的任务进来后会创建大量的线程,我们知道线程是分配一定的内存空间做为线程栈,比如 1MB,因此无限创建线程必然会导致OOM


我们不建议使用 Executors 提供的两种快捷的线程池,原因如下:
  • 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合要求,一般都需要设置有界的工作队列和 可控的线程数。
  • 任何时候,都于根伟自定义线程池指定有意思的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量CPU 、线程执行出现异常等问题时,我们往往会抓取线程栈,此时,有意义的线程名称,就可以方便我们定位问题。

总结 线程池工作行为:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果队列已经满了,则在总线程数不大于maximumPoolSize的前提下,则创建新的线程
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

2.确认线程池是否在复用

  • 在生产环境中,监控一直报警当前使用线程数太多,一会又将下来,但是当前用户访问量也不是很大

通过代码排查 发现项目中使用了 Executors.newCachedThreadPool(); 创建线程池使用,我们知道newCachedThreadPool 会在需要时创建必要多的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程,如果业务操作量较大,的确有可能一下子开启几千个线程

源码发现

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 它的核心线程数是0,而最大线程数 Integer的最大值,一般来说机器都没那么大内存给它不断使用,而 keepAliveTime 是60秒,也就是在 60秒之后所有的线程都是可以回收的,采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
  • 所以我们在使用线程池要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:

    1. 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队 列。
    1. 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理 由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过 多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做 缓冲。