vlambda博客
学习文章列表

关于线程池,面试的时候你时候还打怵,这里我有话要说,保证让你对线程池的各个参数一边就懂,浅显易懂的哦。

关于线程池,大家相信一定有所耳闻即使在日常工作中没有实际的应用,但是在面试过程中一定有被问到过。别说你没有面试过...... 首先我们先简单的了解下线程池的大概含义:

线程池其实就是一种多线程处理形式,处理过程中将任务添加到队列中,然后再创建线程自动启动这些任务,这里要注意线程池中的所有线程都是后台线程,其实主要就是为了更好的处理任务,以及更高效的利用CPU。大致就是这些了;这里就不做更多的阐述了,可以自行百度百科一下哈。主要是这里也不是重点。你懂的🤪

重点来了。面试的时候面试官一定这样问过你:

面试官🤓:请问你有了解过线程池吗?你可以简单聊一下嘛。

派大星🙋🏻‍♂️:这个当然有了解过,线程池嘛,线程池可以看做是线程的集合。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务执行(而不是销毁)。这样就实现了线程的重用。这就是线程池的主要作用,以及线程池的存在的意义。

假设我们不使用线程池的时候,每当新来一个任务都需要开辟任务,请看如下的代码:

看的出来,每开创一个socket连接请求都会,新建一个线程Thread,按道理来说这时没有问题的哦,但是还是有一定的弊端的,比如:

每次创建线程,都会消耗时间有一定的系统资源的浪费,并且线程是有生命周期的,每次创建和销毁也是需要时间的。并且线程数也是占用系统CPU资源的,大量的空闲线程也会行用过多内存,给垃圾回收器带来压力,从而会造成OutOfMemoryError 异常

面试官🤓:好的,你说的我大致了解了,那么其实你并有正面回答我的问题的。

派大星🙋🏻‍♂️:我了解,其实我上述说的就是为了引出线程池,因为我上面有提到,线程池会有各种的弊端从而导致OutOfMemoryError 异常等,所以才推出了线程池。关于线程池的基本概念上面也大概讲述了,接下来我大致说一下JDK的线程池,看下图:

关于线程池,面试的时候你时候还打怵,这里我有话要说,保证让你对线程池的各个参数一边就懂,浅显易懂的哦。

JDK给我们提供的线程池API正如上图所示:

  • Executor:是基础
  • ExecutorService:提供了线程池生命周期的管理方法
  • AbstrzctExecutorServic:提供了默认实现
  • ThreadPoolExecutor:便是最常用的线程池
  • 图中的其余的两个便是定时执行的或者是延期执行的线程池,后续会给大家下一个简单的例子供大家了解。

面试官🤓:嗯,你对线程池的的基本信息掌握的还可以,那你能给我说一下,关于线程池的创建方式嘛?

派大星🙋🏻‍♂️:好的,其实实际中的线程创建方式是有很多的,但是我的理解主要分为两大类

  1. 利用Executors去创建,以及通过其衍生的多种方法。
  2. 便是利用ThreadPoolExecutor去创建。

关于以上的两种创建方式我们可以先简单的聊一下第一种通过Executors以及其衍生方式创建的线程池。如图所示:

以上大概分为4种分别是:

  1. newFixedThreadPool

创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待.适用于任务量已知,相对耗时的任务

  1. newCachedThreadPool

核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着全部是救急线程(60s后可以回收)救急线程可以无限创建.

  1. newSingleThreadExecutor

希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放。

  1. newScheduledThreadPool

主要是用来执行定时任务

关于Executors及其衍生方式创建的线程池大致就介绍这么多,如果有需要了解更多关于线程池的笔记可以加我私聊欧。

面试官🤓:嗯,还可以,那你来给我聊一聊关于ThreadPoolExecutor创建线程池的方式吧。以及它构造方法中的几个参数吧。

派大星🙋🏻‍♂️:好的。。

好了上面的第一种创建方式的线程池已经介绍完了,但是对于面试官来说肯定是远远不够的。接下来我给大家说一下第二种方式:

关于ThreadPoolExecutor创建线程池的方式通过ThreadPoolExecutor的方式创建线程是阿里巴巴规范强烈推荐的。接下来我们聊一下吧。

关于其构造方法中的的几个参数分别是 参数说明:

  • corePoolSize:核心线程数目(最多保留的线程数)
  • maximumPoolSize:最大线程数目 (核心线程数+救急线程数)
  • keepAliveTime:生存时间 - 针对救急线程
  • unit:时间单位,针对的是救急线程
  • workQueue:阻塞队列
  • threaFactory:线程工厂 - 可以为线程创建时起个好名字
  • handler:拒绝策略 达到最大线程数后还有新的任务便会执行拒绝策略
  • jdk有核心线程和救急线程

救急线程数 = 最大线程数-核心线程数,一旦核心线程数用完,并且阻塞队列也满了,此时又来了任务,jdk会先判断有没有救急线程,如果有救急线程,它会执行任务。执行完就会释放该救急线程,它不会去执行阻塞队列里面的任务

  • 核心线程和救急线程的区别:一旦核心线程被创建它是不会被回收的一直保留,但是救急线程只有在阻塞队列满了并且核心线程也没有空闲才会被创建,否则就会被回收,可以简单理解为核心线程没有生存时间,救急线程有生存时间

  • 救急线程的前提:是配合有界队列,当任务超过了队列的大小时,会创建maximumPoolSize-corePoolSize数目的线程来救急

  • 拒绝策略:1.AbortPolicy:让调用者抛出RejectedExecutionException异常,默认策略。2.CallerRunsPolicy:让调用者运行任务 3.DiscardPolicy:放弃本次任务 4.DiscardOldesPolicy:放弃队列中最早的任务,本任务取而代之。

派大星🙋🏻‍♂️:以上就是关于ThreadPoolExecutor的简单介绍。这些线程池的相关知识点在你面试的时候对你多少有所帮助。

好了为了更好的理解线程池我们可以手动创建一个线程池来模拟一下。

如果不需要的话可以直接跳到最后欧,有惊喜。


@Slf4j(topic = "c.TestPool")
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,(queue,task)->{
            // 死等
//            queue.put(task);
            // 2、带超时时间的等待
//            queue.offer(task,500,TimeUnit.MILLISECONDS);
            // 3、放弃任务执行
//            log.debug("放弃 {}",task);
            // 4、抛出异常
//            throw new RuntimeException("执行失败.."+task);
            // 5、让调用者自己执行任务....
            task.run();
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}",j);
            });
        }
    }
}

/**
 * 拒绝策略
 * @param <T>
 */

@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockingQueue<T> queue,T task);
}

@Slf4j(topic = "c.ThreadPool")
class ThreadPool{
    /**
     * 线程队列
     */

    private BlockingQueue<Runnable> taskQueue;
    /**
     * 线程集合
     */

    private HashSet<Worker> workers = new HashSet<>();
    /**
     * 核心线程数
     */

    private int coreSize;

    /**
     * 获取任务的超时时间
     */


    private long timeout;

    /**
     * 拒绝策略
     */

    private RejectPolicy<Runnable> rejectPolicy;

    /**
     * 执行任务
     */

    public void execute(Runnable task){
        // 当任务数没有超过核心线程数,直接交给worker对象 执行
        // 如果任务数超过 coreSize时,加入任务队列暂存
        synchronized (workers){
            if (workers.size()<coreSize){
                Worker worker = new Worker(task);
                log.debug("新增 worker {} ,{}",worker,task);
                workers.add(worker);
                worker.start();
            }else {
//                log.debug(" 加入任务队列 {}",task);
//                taskQueue.put(task);
                // 1、死等
                // 2、带超时时间的等待
                // 3、放弃任务执行
                // 4、抛出异常
                // 5、让调用者自己执行任务....
                // 将以上的决策全交给线程的使用者,不要写死,具有良好的扩展性,可以使用设计模式中的策略模式
                taskQueue.tryPut(rejectPolicy,task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 时间单位
     */

    private TimeUnit timeUnit;

    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当task不为空,执行任务
            // 2) 当task执行完毕,再接着从任务队列获取任务执行
//            while (task!=null || (task = taskQueue.take() )!=null){
            while (task!=null || (task = taskQueue.poll(timeout,timeUnit) )!=null){ // 带有超时时间
                try {
                    log.debug(" 正在执行...{}",task);
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers){
                log.debug("worker 被移除 {}",this);
                workers.remove(this);
            }
        }
    }
}

@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T>{
    /**
     * 1.任务队列
     */

    private Deque<T> queue = new ArrayDeque<>();
    /**
     * 2.锁
     */

    private ReentrantLock lock = new ReentrantLock();

    /**
     * 3.生产者条件变量
     */

    private Condition fullWaitSet = lock.newCondition();
    /**
     * 4.消费者条件变量
     */

    private Condition emptyWaitSet = lock.newCondition();
    /**
     * 5.容量
     */

    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * 带超时的等待获取
     */

    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()){
                try {
                    // 返回的是剩余的时间
                    if (nanos<=0){
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 阻塞获取
     */

    public T take(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }

    }
    /**
     * 阻塞添加
     */

    public void put(T task){
        lock.lock();
        try {
            while (queue.size() == capacity){
                try {
                    log.debug(" 等待加入任务队列... {}",task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug(" 加入任务队列 {}",task);
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 带有超时时间的阻塞添加
     * @param task
     * @param timeout
     * @param timeUnit
     * @return
     */

    public boolean offer(T task,long timeout,TimeUnit timeUnit){
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity){
                try {
                    log.debug(" 等待加入任务队列... {}",task);
                    if (nanos <= 0){
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug(" 加入任务队列 {}",task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 获取大小
     */

    public int size(){
        lock.lock();
        try{
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否已满
            if (queue.size() == capacity){
                rejectPolicy.reject(this,task);
            }else {// 有空闲
                log.debug("加入任务队列 {}",task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

这是本人整理的线程池的相关思维导图:有需要的可以私聊我JUC并发