vlambda博客
学习文章列表

面试官:在项目中,你是如何使用线程池的?

大家好,我是田哥

前两天,有位星友(知识星球里的朋友简称)私信我,问在项目中如何使用线程池,关于线程池的原理和八股文相关的都可以背,但是要是问到你们项目中是怎么用的,心里总是有点慌。

话不多说,我们直接步入正题。

创建线程池的方式

我在这篇文章中聊过线程池相关的:

《阿里巴巴JAVA开发手册》有这样一条强制规定:线程池不允许使用Executors去创建,而应该通过ThreadPoolExecutor方式,这样处理方式更加明确线程池运行规则,规避资源耗尽风险。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(11,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

上面这两种方式创建线程池使用的阻塞队列是LinkedBlockingQueue

/**
 * A constant holding the maximum value an {@code int} can
 * have, 2<sup>31</sup>-1.
 */

//2的31次方,然后在减1 2147483647
@Native public static final int   MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

不设大小理论上队列容量无上限,所以可能会堆积大量请求从而导致OOM

以上这种方式在,咱们就不聊了。

项目中如何用

在项目中,我们通常有两种方式创建线程池:

  • 第一种:静态方式
  • 第二种:使用 Spring Boot创建线程池

比如说我们项目中需要处理用户登录日志,但是此时不想因为记录登录日志耽搁了登录。

如果我们使用同步的方式,可能会因为一些不太需要实时结果的,并且又耗时的业务可能会导致整个业务变慢:


耗时:200ms=100ms+100ms

如果使用线程池做了异步化后,直接创建个任务丢到线程池里,这样就减少了后面那100ms的等待时间。


在实际项目中,也有很多项目使用消息队列来做异步化,这个看项目情况来,比如:开发成本、后期运维成本等。

静态方式

静态方式就是当做一种工具类使用,代码实现如下:

package com.tianwc.myblog.pool;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//这里的相关只是一个演示,大家在定参数时,还是要以具体情况来
public class ThreadPoolUtil {
    //获取CPU核数
    static int cpuNums = Runtime.getRuntime().availableProcessors();
    /** 线程池核心池的大小*/
    private static int corePoolSize = 10;
    /** 线程池的最大线程数*/
    private static int maximumPoolSize = cpuNums * 5;
    //阻塞队列容量
    private static int queueCapacity = 100;
    //活跃时间,
    private static int keepAliveTimeSecond = 300;

    public static ExecutorService httpApiThreadPool = null;

    static{
        System.out.println("创建线程数:"+corePoolSize+",最大线程数:"+maximumPoolSize);
        //建立10个核心线程,线程请求个数超过20,则进入队列等待
        httpApiThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeSecond,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build());
    }
}

关于线程池参数设置,可以参考:面试小抄中并发编程部分有详细说明。

在业务代码中的使用:

ThreadPoolUtil.httpApiThreadPool.submit(new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("======== 登录日志记录=====start=======");
                    try {
                        // TODO: 2022/4/14 业务处理
                        Thread.sleep(1000L);
                        System.out.println("userId=" + userId);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("========登录日志记录------end=======");
                }
            }));

很简单吧!

但是这种方式存在很多问题,很多项目也是这么在用的。

比如想动态修改线程池参数,这种方式就不好处理了

我们再来看看Spring Boot创建方式;

配置文件

我们可以把线程池相关参数配置在配置文件中application.yaml(application.properties)

threadpool:
  corePoolSize: 8
  maxPoolSize: 16
  queueCapacity: 5
  keepAliveSeconds: 300

然后创建线程池:

package com.tianwc.myblog.pool;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync //开启异步请求
public class ThreadPoolConfig {

    @Resource
    private Environment env;

    //创建线程池
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setThreadNamePrefix("--------------全局线程池-----------------");
        pool.setCorePoolSize(Integer.parseInt(env.getProperty("threadpool.corePoolSize")));
        pool.setMaxPoolSize(Integer.parseInt(env.getProperty("threadpool.maxPoolSize")));
        pool.setKeepAliveSeconds(Integer.parseInt(env.getProperty("threadpool.queueCapacity")));
        pool.setQueueCapacity(Integer.parseInt(env.getProperty("threadpool.keepAliveSeconds")));
        // 直接在execute方法的调用线程中运行
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        pool.initialize();
        return pool;
    }
}

我们在项目中使用:

package com.tianwc.myblog.pool;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class AsynchronousTask {

    @Async("taskExecutor")
    public void recordLoginLog(Long userId){
        System.out.println("======== 登录日志记录=====start=======");
        try {
            // TODO: 2022/4/14 业务处理
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("========登录日志记录------end=======");
    }
}

然后在登录的代码中使用:

@Resource
private AsynchronousTask asynchronousTask;

public Boolean login(){
    //登录处理
    Long userId=10001;
    boolean isSucc=true;
    if(isSucc){
        asynchronousTask.recordLoginLog(userId);
    }
    System.out.println("=====登录成功====");
}

输出日志:

=====登录成功====
======== 登录日志记录=====start=======
userId=10001
========登录日志记录------end=======

好了,以上就是我们项目中通常使用的方式,另外,注意,在项目中通常是将注解@EnableAsync 放到项目启动类上。

后记

关于线程池的实际使用,建议给大家看看美团的线程池技术方案,感兴趣的自己搜搜。

文中很多线程池相关的知识没有介绍,因为之前有一篇文章已经介绍过了,这里就不赘述了。

推荐阅读:

最后,还是提一嘴:需要面试辅导的,记得找我。

详情请点击原文链接