vlambda博客
学习文章列表

多线程(二) | 彻底搞懂线程池-Executors

上篇文章,我们讲解了通过Thread和 Runnable 使用线程的方法,并且演示了如何创建一个线程并启动,今天我们来聊一聊多线程中的线程池。

一、为什么要使用线程池

我们使用多线程的一个一般步骤是:先创建一个线程,然后线程执行线程任务,线程任务执行完毕后,线程会被销毁。

但是线程的创建和销毁是比较耗费资源的。在高并发场景之下,如果需要开启大量的线程,而每个线程它执行任务所需的时间很短的情况下,那么线程的频繁的创建和销毁就会成为性能的瓶颈。所以线程池应运而生。线程池的主要功能就是可以用来管理线程,让线程能够复用,避免重复创建线程,并且我们可以根据需求,合理配置线程池中的各种参数。

通俗地说就是当有任务的时候,我们就通过线程池来获取线程,任务完成后再把线程归还给线程池供其他任务使用。线程池会帮我们维护指定数量的活跃可用线程,避免了重复创建造成的资源浪费。

二、Executors用法

Java中提供了Executors类,本身是个工厂模式,相当于就是一个线程池工厂(注意是线程池工厂,不是线程工厂),里边提供了多种不同的线程池可供我们直接使用。

2.1 newSingleThreadExecutor

创建一个单线程的线程池。创建线程池的方法都是Executors中的静态方法,我们可以直接使用类名调用就能获取,得到线程池的类型为ExecutorService,可以调用里面的submit方法,传入Runnable类型的线程任务来执行。

我们通过案例给大家演示一下 :

package com.lsqingfeng.action.knowledge.multithread.pools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @className: SingleThreadExecutorDemo
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-07 14:00
 */

public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        // 获取一个单线程的线程池:
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        // 给定两个线程任务:
        Runnable r1 = () -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "正在执行线程任务1");
        };

        Runnable r2 = () -> {
            System.out.println(Thread.currentThread().getName() + "正在执行线程任务2");
        };

        // 将线程任务提交给先线程池,通过线程池执行任务,就不需要我们自己创建线程了,只关注任务即可
        // 查看结果:任务2等待任务1完成后才执行,说明只有一个线程可用
        singleThreadExecutor.submit(r1);
        singleThreadExecutor.submit(r2);

    }

}

执行结果:

pool-1-thread-1正在执行线程任务1(2s后)
pool-1-thread-1正在执行线程任务2

看到它们线程的名称都是一样的。并且根据执行的时间也可以看出,两个线程任务是被同一个线程执行的。

2.2 newFixedThreadPool

创建一个指定数量的线程池。我们可以通过传入参数,来设置该线程池中有多少个活跃线程。当线程池中有可用线程池,提交的任务就会立即执行,如果当前线程中没有可用线程,则会将任务放入到一个队列中,直到有线程可用。我们还是通过代码来演示。

package com.lsqingfeng.action.knowledge.multithread.pools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @className: FixThreadExecutorDemo
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-07 14:42
 */

public class FixThreadExecutorDemo {

    public static void main(String[] args) {
        // 获取FixThread 线程池: 指定活跃线程数量2
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 创建两个任务:
        // 给定一个线程任务:
        Runnable r1 = () -> {
            try {
                // 睡两秒
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "正在执行线程任务1");
        };

        Runnable r2 = () -> {
            System.out.println(Thread.currentThread().getName() + "正在执行线程任务2");
        };

        // 将任务已提交给线程池两次:
        executorService.submit(r1);
        executorService.submit(r1);

        // 再提交任务2,看任务2是否是在2秒后执行,如果是,说明没有可用线程,只能等2秒后才有线程能用
        executorService.submit(r2);

    }

}

结果:

pool-1-thread-1正在执行线程任务1(2s后运行的)
pool-1-thread-2正在执行线程任务1 pool-1-thread-1正在执行线程任务2

我们继续验证,把活跃数量改为3, 这个时候,看看任务2是否会先执行。

pool-1-thread-3正在执行线程任务2 pool-1-thread-2正在执行线程任务1 pool-1-thread-1正在执行线程任务1

果然,此时任务2就先执行了,因为有活跃线程可用。

注意:大家在运行上述代码的时候,打印结果后程序还没有结束,这是因为线程池就是一直活跃在等待接收任务的状态,所以程序不会结束,要想结束我们需要调用 shutdown方法将线程池关闭。

2.3 newCachedThreadPool

创建一个可缓存的线程池。这个线程池的特点是不对线程的数量做限制,只要有线程任务没有线程来处理,就会创建一个线程,同时该线程池有一个回收的功能,就是如果某个线程超过60秒还没有任务,就会被自动回收掉。

package com.lsqingfeng.action.knowledge.multithread.pools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @className: CachedThreadPoolExecutorDemo
 * @description: 可缓存的线程连接池。无限大(完全取决于操作系统最大允许多少)
 *                超过60秒自动回收
 * @author: sh.Liu
 * @date: 2022-04-07 16:04
 */

public class CachedThreadPoolExecutorDemo {

    public static void main(String[] args) {
        // 创建一个可缓存的线程连接池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

        // 创建线程任务
        Runnable r1 = ()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "正在执行任务");
        };

        // 提交任务
        cachedThreadPool.submit(r1);
        cachedThreadPool.submit(r1);
        cachedThreadPool.submit(r1);

    }
}

执行结果:

三个线程都是不同的代表每次都会创建新的。因为没获取到可用的,就会创建新的。

注意这个红灯,60秒钟后会熄灭。因为执行完任务后,会自动回收60秒内没被使用的线程。

2.4 newScheduledThreadPool

创建一个可用于执行周期性任务的线程池。我们前面使用多线程执行的任务都是一次性的。但是有的时候我们希望可以周期性地执行任务,比如,每5分钟执行一次。这个时候,我们就可以使用周期性的线程池。

这里要注意, 返回的线程池类型和前面的有区别:ScheduledExecutorService 代表周期性的线程池类型。

package com.lsqingfeng.action.knowledge.multithread.pools;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @className: ScheduledThreadPoolDemo
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-07 16:40
 */

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        // 1. 获取周期性线程池, 传入核心线程的大小
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

        // 2. 创建两个线程任务
        Runnable r1 = ()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "正在执行任务1");
        };

        Runnable r2 = ()->{

            System.out.println(Thread.currentThread().getName() + "正在执行任务2");
        };


        // 3. 线程池执行任务:注意调用方式:
        // 延迟三秒执行
        scheduledExecutorService.schedule(r1, 3, TimeUnit.SECONDS);
        scheduledExecutorService.schedule(r1, 3, TimeUnit.SECONDS);

        scheduledExecutorService.schedule(r2, 2, TimeUnit.SECONDS);

        // 线程2秒后开始执行,每三秒执行一次。
        scheduledExecutorService.scheduleAtFixedRate(r2,23, TimeUnit.SECONDS);


    }

}

可以观察到,总共只有两个线程在工作,说明我们的设置是有效的。

2.5 newWorkStealingPool

这是从JDK8开始新增的方法。代表创建了一个抢占式的线程池,底层是使用的ForkJoinPool来实现的。

该线程池维护足以支持给定并行度级别的线程,并可以使用多个队列来减少争用。并行度级别对应于活动参与或可用于参与任务处理的最大线程数。实际的线程数可能会动态增长和收缩。newWorkStealingPool不能保证提交任务的执行顺序。

ForkJoinPool是JDK7中引入的一种新的线程池,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

ForkJoinPool的另外一个特性是它能够实现工作窃取(Work Stealing),在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行。

newWorkStealingPool 会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

newWorkStealingPool的特点:

  1. 可以传入线程的数量,不传入,则默认使用当前计算机中可用的cpu数量
  2. 能够合理的使用CPU进行对任务操作(并行操作)
  3. 适合使用在很耗时的任务中

底层用的ForkJoinPool 来实现的。ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。

代码实现:

package com.lsqingfeng.action.knowledge.multithread.pools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @className: WorkStealingPoolExecutorDemo
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-07 17:17
 */

public class WorkStealingPoolExecutorDemo {

    public static void main(String[] args) {
        // 创建线程池:不传参默认使用cpu个数创建
        ExecutorService executorService = Executors.newWorkStealingPool();

        // 创建任务:
        Runnable r1 = ()->{
            System.out.println(Thread.currentThread().getName() + "正在执行任务");
        };

        // 提交任务:
        executorService.submit(r1);
    }
}

好了,关于Executors中的常用线程池我们就先介绍这么多。但其实在《阿里巴巴开发手册》中是不允许我们直接使用Executors中的工具类的,那么我们应该如何使用呢,下篇文章我们继续研究一个更重要的类- ThreadPoolExecutor。如果文章对你有帮助,就给点个关注点个赞吧。