搜文章
推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > 懂一点架构 > 聊聊高并发:理解Java自带线程池ThreadPoolExecutor

聊聊高并发:理解Java自带线程池ThreadPoolExecutor

懂一点架构 2017-10-30

    线程池主要解决两个问题:一方面当执行大量异步任务时候线程池能够提供较好的性能,这是因为使用线程池可以使每个任务的调用开销减少(因为线程池线程是可复用的)。另一方面线程池提供了一种资源限制和管理的手段,比如当执行一系列任务时候对线程的管理,每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目。   

    基于Executor接口中将任务提交和任务执行解耦的设计,ExecutorService和其各种功能强大的实现类提供了非常简便方式来提交任务并获取任务执行结果,封装了任务执行的全部过程。Executors工具可以创建普通的线程池以及schedule调度任务的调度池。使用Executors最常用的莫过于是使用Executors.newFixedThreadPool(int)这个方法,因为它既可以限制数量,而且线程用完后不会一直被cache住。


先来看一下构造器:

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue<Runnable> workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler) {

    if (corePoolSize < 0 ||

        maximumPoolSize <= 0 ||

        maximumPoolSize < corePoolSize ||

        keepAliveTime < 0)

        throw new IllegalArgumentException();

    if (workQueue == null || threadFactory == null || handler == null)

        throw new NullPointerException();

    this.corePoolSize = corePoolSize;

    this.maximumPoolSize = maximumPoolSize;

    this.workQueue = workQueue;

    this.keepAliveTime = unit.toNanos(keepAliveTime);

    this.threadFactory = threadFactory;

    this.handler = handler;

}


corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Runnable放入到等待队列workQueue中了,我们把这些Runnable就叫做要去执行的任务吧。

workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),这个队列默认是一个无界队列,所以在生产者疯狂生产的时候,考虑如何控制的问题。

threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可;


通常你得到线程池后,会调用其中的:submit方法或execute方法去操作;其实你会发现,submit方法最终会调用execute方法来进行操作,只是他提供了一个Future来托管返回值的处理而已,当你调用需要有返回值的信息时,你用它来处理是比较好的;这个Future会包装对Callable信息,并定义一个Sync对象(),当你发生读取返回值的操作的时候,会通过Sync对象进入锁,直到有返回值的数据通知。


来看看execute最为核心的方法吧:

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    /*

     * Proceed in 3 steps:

     *

     * 1. If fewer than corePoolSize threads are running, try to

     * start a new thread with the given command as its first

     * task.  The call to addWorker atomically checks runState and

     * workerCount, and so prevents false alarms that would add

     * threads when it shouldn't, by returning false.

     *

     * 2. If a task can be successfully queued, then we still need

     * to double-check whether we should have added a thread

     * (because existing ones died since last checking) or that

     * the pool shut down since entry into this method. So we

     * recheck state and if necessary roll back the enqueuing if

     * stopped, or start a new thread if there are none.

     *

     * 3. If we cannot queue task, then we try to add a new

     * thread.  If it fails, we know we are shut down or saturated

     * and so reject the task.

     */

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))g个、

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

}

核心逻辑:如果当前running线程数量小于corePoolSize,则新创建一个worker,并把该runnable作为该worker的firstTask进行执行;如果线程池中的running线程数量已经达到corePoolSize,则将新的Runnable放入到等待队列workQueue中了,线程池中的每个worker不断从workQueue中take或者poll数据进行任务执行。


来看下addWorker的源码:

private boolean addWorker(Runnable firstTask, boolean core) {

    boolean workerStarted = false;

    Worker w = null;

    try {

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            t.start();

            workerStarted = true;

        }

    } finally {

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}


Worker是一个内部类,本身也是也一个Runnable。在初始化的时候,会把自身作为参数构造一个Thread,然后供调用者执行start(),最终执行的依然是run()方法:


我们再来看一下runWorker方法:

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

        while (task != null || (task = getTask()) != null) {

            try {

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    task.run();

               finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                w.completedTasks++;

                w.unlock();

            }

        }

        completedAbruptly = false;

    } finally {

        processWorkerExit(w, completedAbruptly);

    }

}


其中,getTask()即是从workQueue中不断take或者poll数据进行实际任务执行。


private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }

        try {

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}


好了,整个执行流程就是这样。总结成一幅图就是:


补充说明:

1,如果仅仅是一个Runnable任务,可以直接调用Executor的execute(Runnable command)即可;如何想要执行一个带返回结果的任务,则必须不能使用Runnable任务,而要使用Callable任务,同时需要调用ExecutorService的submit(Callable<T> task)。


2,除了newFixedThreadPool可以生成固定大小的线程池,newCachedThreadPool可以生成一个无界、可以自动回收的线程池,newSingleThreadScheduledExecutor可以生成一个单个线程的线程池。newScheduledThreadPool还可以生成支持周期任务的线程池。一般用户场景下各种不同设置要求的线程池都可以这样生成,不用自己new一个线程池出来。

各自的适用场景。

FixedThreadPool:适用于为了满足资源管理需求,而需要限制当前线程的数量的应用场景,它适用于负载比较重的服务器。

SingleThreadExecutor:适用于需要保证执行顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的场景。

CachedThreadPool:大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。


3,我们可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。



版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《聊聊高并发:理解Java自带线程池ThreadPoolExecutor》的版权归原作者「懂一点架构」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

关注懂一点架构微信公众号

懂一点架构微信公众号:gh_fba925235d45

懂一点架构

手机扫描上方二维码即可关注懂一点架构微信公众号

懂一点架构最新文章

精品公众号随机推荐