vlambda博客
学习文章列表

并发编程——并发容器和线程池(三)

爪哇缪斯推荐搜索

    今天是并发编程系列的最后一篇文章,针对并发容器和线程池这两部分内容进行介绍。其中SynchronousQueue源码解析部分,还没写完。后续弄完会再补发相关文章。下面是本篇文章的大纲,如下所示:

并发编程——并发容器和线程池(三)


一、并发容器

1.1> 概述

  • JDK提供的这些容器大部分在java.util.concurrent包中。我们挑选出一些比较有代表性的并发容器类,来感受一下JDK自带的并发集合带来的“快感”。

并发编程——并发容器和线程池(三)

1.2> ConcurrentHashMap

  • 详情请见:

1.3> ConcurrentLinkedQueue

  • ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。

  • ConcurrentLinkedQueue算是在高并发环境中性能最好的队列。底层由单向链表组成,每个节点结构如下所示:

    并发编程——并发容器和线程池(三)

  • 构造函数中,创建了一个空节点作为链表中的第一个Node节点

    并发编程——并发容器和线程池(三)

1.3.1> add

  • 向容器中添加元素,源码如下所示:

并发编程——并发容器和线程池(三)

  • 关于t != (t = tail)的判断,首先,“!=”并不是原子操作,它是可以被中断的。也就是说,在执行“!=”时,会先取得t的值,再执行t=tail,并取得新的t值。然后比较这两个值是否相等。下面例子演示了这种情况:

并发编程——并发容器和线程池(三)

  • 添加节点如下所示:

并发编程——并发容器和线程池(三)

【解释】

    • 我们从上面的图可以看到,对tail的更新是会产生滞后的,也就是每次更新都会跳跃两个元素。这么做的目的,就是为了减少cas操作的次数。例如,我们完全可以在上述代码中通过 if(p.casNext(null, newNode) && casTail(t, newNode)) 这种方式,保证最新节点拼接到链表末尾,并且tail指针永远指向末尾,但是,由于CAS一般都用在无限自旋的场景中,那么对于效率的损耗就比较大了。而通过两次操作才更新一次tail,可以有效减少性能消耗。


1.3.2> remove

  • 删除元素操作,源码如下所示:

并发编程——并发容器和线程池(三)

  • 删除节点对链表的操作,如下图所示:

并发编程——并发容器和线程池(三)


1.3.3> 哨兵

  • 我们来演示一下哨兵节点产生的原因:

并发编程——并发容器和线程池(三)


1.4> CopyOnWriteArrayList

  • 在大多数的应用场景中,读操作的比例远远大于写操作。那么,当执行读操作的时候,对数据是没有修改的,所以,无须对数据进行加锁操作。而针对于写操作的场景中,则需要加锁来保证数据的正确性。

  • 而CopyOnWriteArrayList就可以满足上面所说的场景,即:读操作是不加锁的。而写操作也不会阻塞读的操作,它采用了CopyOnWrite方式来解决写操作的问题,即:写入操作时,进行一次自我复制产生一个副本,写操作就在副本中执行,写完之后,再将副本替换原来的数据。这样,就可以在写数据的同时不影响读数据操作。

  • 读操作源码如下所示:

    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)

【解释】

    • 读操作比较简单,就是从数组中获取对应下标为index的元素,而由于读操作并不需要加锁,所以,get方法就是一个普通的不加锁的方法。

  • 写操作源码如下所示:

并发编程——并发容器和线程池(三)


并发编程——并发容器和线程池(三)

【解释】

    • 执行写操作时,首先进行lock加锁,然后复制原数组创建一个长度加1的新数组,即:副本数组。执行新增操作时,都是针对副本数组进行操作的。当操作新增操作完毕后,将副本数组替换旧的数组。由于array是volatile的,所以当替换后,在多线程之间是可见的。

    • 这样做的特点,就是,当执行写操作的时候,针对的是副本数组;而读操作,一直是针对着原数组;所以,写操作是不会阻塞读操作的。


1.5> BlockingQueue

  • 阻塞队列常用方法

并发编程——并发容器和线程池(三)

  • 由于使用offer方法时,如果队列已经满了,那么则无法插入成功,会立即返回false;同样的,当我们调用poll方法的时候,如果队列中是空的,则也会立即返回false;而比起立即返回的情况,我们更关注于put和take这种插入或移除失败,在当前阻塞的情况是如何实现的。下面我们就医ArrayBlockingQueue和LinkedBlockingQueue为例。

1.5.1> ArrayBlockingQueue

  • 构造函数,默认采用非公平锁

并发编程——并发容器和线程池(三)


并发编程——并发容器和线程池(三)

【解释】

    • 由操作函数入参capacity来指定底层存储元素数组长度的大小。

    • 并且初始化了需要加锁使用的ReentrantLock实例,默认采用的是非公平锁。

    • notEmpty用于执行take时进行await()等待操作,put时进行signal()唤醒操作。

    • notFull用于执行take时进行signal()唤醒操作,put时进行await()等待操作。

  • put方法

    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)

    【解释】

      • 在执行put方法逻辑之前,首先尝试获得可中断锁——即:lock.lockInterruptibly(),当执行interrupt操作时,该锁可以被中断。

      • 如果数组中元素的个数(count)等于数组的长度了,也就说明队列满了,那么就在该线程上执行等待操作——notFull.await();

      • 如果队列没有满,则调用enqueue(e)方法执行入列操作。

      • 入列操作首先会将待插入值x放入数组下标为putIndex的位置上,然后再将putIndex加1,来指向下一次插入的下标位置。此处需要注意的是,如果加1后的putIndex等于了数组的长度,那么说明已经越界了(因为putIndex是从0开始的),那么此时将putIndex置为0,即:待插入的指针指向了数组的头部。做循环式插入。

      • 最后,执行count++来计算元素总个数,并且调用notEmpty.signal()方法来解除阻塞(即:当队列为空的时候,执行take方法会被notEmpty.await()阻塞)

    • take方法

    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)

    【解释】

      • take方法跟上面我们看的put方法类似,区别是出队的指针是takeIndex。如果队列中为空,那么当调用take方法执行出队操作时,就会执行notEmpty.await()方法执行等待操作,并释放锁资源。当调用put方法向队列中放入元素之后 ,会调用notEmpty.signal方法对等待的线程执行唤醒操作。那么线程继续执行出队操作,执行完毕后,会调用notFull.signal方法来唤醒在notFull上面await的线程。


    1.5.2> LinkedBlockingQueue

    • 构造函数,默认长度为2^31,大概21亿多

    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)

    【解释】

      • 在构造函数中,创建一个空的节点,作为整个链表的头节点。

    • put源码和注释如下所示:

    并发编程——并发容器和线程池(三)

    • take源码和注释如下所示:

    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)

    1.5.3> SynchronousQueue

    • 详情请见:【源码解析】SynchronousQueue.pdf(待更新)


    二、线程池

    2.1> ThreadPoolExecutor

    • 详情请见:

    2.2> ForkJoinPool

    2.2.1> 概述

    • ForkJoinPool可以给我们提供分而治之的功能,当我们有大量任务需要处理的时候,我们可以将其分为N个批次,然后每个批次开启子线程去并发执行,当子线程都执行完毕后,再对结果进行汇总计算。这种思想就类似于Hadoop的MapReduce方式。其中,fork表示开启一个执行分支,即:创建子线程去执行某些任务。而join我们在前面也介绍过,它具有等待的含义,也就是使用fork()后系统多了一个执行分支或执行线程,所以需要等待这个分支执行完毕,才能进行最后结果汇总的计算。如下图所示:

    并发编程——并发容器和线程池(三)

    • 如果我们随意的去fork线程,那么就会导致系统开启了很多子线程而造成系统开销过大,从而影响系统的性能。所以,JDK为我们提供了ForkJoinPool线程池用来解决这个问题。它采用对于fork()方法并不着急开启线程,而是提交给ForkJoinPool线程池去进行处理,从而节省系统开支。由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。因此,在实际执行过程中,可能過到这么一种情况:线程A已经把自己的任务都执行完成了, 而线程B还有一堆任务等着处理,此时,线程A就会“帮助”线程 B,从线程B的任务队列中拿一个任务过来处理,尽可能地达到平街。从而显示了这种互相帮助的精神。但是,其中一个值得注意的地方是,当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试因执行自己的任务时,则是从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。如下图所示:

    并发编程——并发容器和线程池(三)

    2.2.2> RecursiveTask执行有返回值任务

    • 通过RecursiveTask的子类,实现带返回值的计算

    并发编程——并发容器和线程池(三)

    并发编程——并发容器和线程池(三)

    2.2.1> RecursiveAction执行无返回值任务

    • 通过RecursiveAction的子类,实现不带返回值的计算

    并发编程——并发容器和线程池(三)


    三、Future

    • JDK内置的Future模式

    并发编程——并发容器和线程池(三)

    • 可以通过调用线程池的submit方法,返回Future,然后调用get方法来获得子线程计算的结果值,如下所示:

    并发编程——并发容器和线程池(三)


    3.1> FutureTask

    • FutureTask是RunnableFuture的一个具体实现,它内部有一个内部类Sync,它赋值内部逻辑的实现。而Sync会最终调用Callable接口,完成实际数据的组装工作。

    • Callable接口有一个call()方法,通过方法内部的计算,可以将结果返回出来。这个Callable接口也是这个Future框架和应用程序直接的重要桥梁。我们可以将需要实现的逻辑在call()方法中实现。通常,我们会使用Callable实例构造一个FutureTask实例,并将它提交给线程池。

    • 下面是具体实现的例子:

    并发编程——并发容器和线程池(三)

    【解释】

      • 我们把需要实现的逻辑在Callable接口的call方法中实现。

      • 当构造FutureTask时,将Callable实例传给它,告诉FutureTask去做什么事情可以有返回值。

      • 然后,我们将FutureTask提交(submit)给线程池。显然,作为一个简单的任务提交,这里必然是立即返回的,因此程序不会阻塞。

      • 接下来,我们不用关系数据是如何计算和产生的,我们放手去做其他事情(例如:上面例子中Sleep了2秒钟),然后,当我们需要计算的结果时,调用FutureTask的get()方法获得计算结果。


    3.2> CompletableFuture

    • 在Java 8中,新增了CompletableFuture类作为Future的增强类。它实现了CompletionStage接口,该接口有38个方法,是为了函数式编程中的流式调用准备的。

    3.2.1> 执行通知

    • 如果需要向CompletableFuture请求一个数据,但是需要数据准备好才能发起这个请求,那么此时,我们就可以利用手动设置CompletableFuture的完成状态。下面例子中,我们获取并打印被通知的值:

    并发编程——并发容器和线程池(三)

    3.2.2> 执行异步任务

    • 可以通过supplyAsyncrunAsync来执行异步任务,具体方法如下所示:

    并发编程——并发容器和线程池(三)


    并发编程——并发容器和线程池(三)

    【解释】

      • supplyAsync()方法用于那些需要有返回值的场景;

      • runAsync()方法用于没有返回值的场景;

      • 在这两个方法中,都分别有一个方法可以接收一个Executor参数。这就使我们可以让Supplier<U>或者Runnable在指定的线程池中工作。如果不确定,则在默认的系统公共的ForkJoinPool.common线程池中执行。

      • 在Java 8中,新增了ForkJoinPool.commonPool()方法。它可以获得一个公共的ForkJoin线程池。这个公共线程池中的所有线程都是Daemon线程。这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。

    并发编程——并发容器和线程池(三)



    • 下面是异步任务的例子: