vlambda博客
学习文章列表

Condition图解与源码分析

1、Condition定义

Condition是一个接口,定义在juc中(java.util.concurrent.locks.Condition),它的主要功能类似于wait()/notify(),但是Condition其实现比wait()/notify()使用更加灵活,简洁、适用场景更加丰富。

2、Condition之于Lock与wait()/notify()之于synchronized

2.1 wait()/notify()之于synchronized

java.lang.Object中定义了一组监视器方法,例如wait()、wait(long timeout)、wait(long timeout, int nanos)、notify()、notifyAll()而object是任何对象的超类,所以任意java对象都拥有这组监视器方法。这些方法再配合上synchronized同步关键字,我们就可以实现等待/通知机制。

2.2 Condition之于Lock

Condition接口中提供了await()、await(long time, TimeUnit unit)、signal()、signalAll()等方法的定义,这些方法的实现配合上Lock也可实现等待/通知机制。

2.3 Object监视器方法与Condition接口两者简要对比

注意:在Condition对象中,与wait()、notify()、和notifyAll()对应的方法分别是await()、signal()、signalAll()方法。Condition对Object进行了扩展(隐藏关系),所有Condition包含wait()和notify()方法。

对比项 Object监视器方法 Condition
前置条件 获取锁 获取锁
调用方式 object.wait() Lock.newCondition()获取Conditon对象,condition.await()
等待队列 一个 多个
超时等待 支持 支持
等待至某个具体时间 不支持 支持
不响应中断 不支持 支持
唤醒一个线程 支持 支持
唤醒全部线程 支持 支持

对于上述对比中最重要的区别在于:

  1. Condition提供更加丰富的wait()机制,例如基于指定时间的限时等待

  2. 对于每个Lock,可以存在任意数量的Condition对象。

3、Condition接口定义与使用案例

3.1 Condition的接口定义

Condition接口中定义的方法如下,具体方法定义的含义请看方法上的注释

 1public interface Condition {
2
3    /**
4     * 当前线程进入等待状态,直到被通知(signal)或者中断
5     * 返回的前提:
6     * 1、其他线程调用该Condition的signal()或者signalAll()方法&&获取到锁
7     * 2、其他线程中断了该线程
8     */

9    void await() throws InterruptedException;
10
11    /**
12     * 等待直至被通知,不响应中断
13     */

14    void awaitUninterruptibly();
15
16    /**
17     * 等待通知、中断或者指定等待时间到则返回
18     */

19    boolean await(long time, TimeUnit unit) throws InterruptedException;
20
21    /**
22     * 等待通知、中断或者规定线程截止日期已经过去则返回
23     */

24    boolean awaitUntil(Date deadline) throws InterruptedException;
25
26    /**
27     * 唤醒一个等待线程,线程返回的前提是该线程获取到与Condition有关的锁
28     */

29    void signal();
30
31    /**
32     * 唤醒所有等待线程,线程返回的前提是该线程获取到与Condition有关的锁
33     */

34    void signalAll();
35}

3.2 Lock接口定义

获取Condition必须通过Lock的newCondition()方法,这个在如下接口定义中可以找到结论。

 1/**
2 * Lock接口中定义了Condition的获取方式
3 */

4public interface Lock {
5
6    void lock();
7
8    void lockInterruptibly() throws InterruptedException;
9
10    boolean tryLock();
11
12    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
13
14    void unlock();
15
16    /**
17     * 调用 newCondition()方法返回一个Condition
18     */

19    Condition newCondition();
20}

3.3 使用案例

通过Lock和Condition来实现一个有界缓存队列,生产线程的向队列中添加数据,当队列满了的时候put()操作会被阻塞;反之,消费线程不断的从队列中取出数据,当队列为空时,take()操作会被阻塞。

 1package com.lizba.p6;
2
3import org.omg.CORBA.Object;
4
5import java.util.concurrent.locks.Condition;
6import java.util.concurrent.locks.Lock;
7import java.util.concurrent.locks.ReentrantLock;
8
9/**
10 * <p>
11 *      使用Condition来实现的一个有界队列示例代码
12 * </p>
13 *
14 * @Author: Liziba
15 * @Date: 2021/6/26 16:03
16 */

17public class ConditionBoundedBuffer<E{
18
19    /** 定义数组做有界队列容器 */
20    final E[] items;
21    /** 可重入锁ReentrantLock() */
22    final Lock lock = new ReentrantLock();
23    /** 条件:数组不满  notFull ( count < item.length) */
24    final Condition notFull = lock.newCondition();
25    /** 条件:数组不为空 notEmpty ( count > 0) */
26    final Condition notEmpty = lock.newCondition();
27    /**
28     * tail游标,记录当前插入元素到了哪个位置
29     * head游标,记录当前获取元素到了哪个位置
30     * count,记录容器中的元素的个数
31     * */

32    private int tail, head, count;
33
34    public ConditionBoundedBuffer(int size) {
35        this.items = (E[]) new Object[size];
36    }
37
38    /**
39     * 添加元素操作
40     * @param e
41     * @throws InterruptedException
42     */

43    public void put(E e) throws InterruptedException {
44        lock.lock();
45        try {
46            // 当数组满时,调用 notFull.await();使得插入元素的线程阻塞
47            while (count == items.length) {
48                notFull.await();
49            }
50            items[tail] = e;
51            if (++tail == items.length) {
52                tail = 0;
53            }
54            ++count;
55            // 唤醒获取元素的线程
56            notEmpty.signalAll();
57        } finally {
58          lock.unlock();
59        }
60    }
61
62
63    /**
64     * 获取元素操作
65     * @return
66     * @throws InterruptedException
67     */

68    public E take() throws InterruptedException {
69        lock.lock();
70        try {
71            // 当数组为空时,调用notEmpty.await();使得获取元素的线程阻塞
72            while (count == 0) {
73                notEmpty.await();
74            }
75            E ret = items[head];
76            items[head] = null;
77            if (++head == items.length) {
78                head = 0;
79            }
80            --count;
81            // 唤醒插入元素的线程
82            notFull.signalAll();
83            return ret;
84        } finally {
85            lock.unlock();
86        }
87    }
88
89}

上述案例中的tail、head、count有不大看的懂的,我做了个简单的的put(E e)和take()的图解
Condition实现的阻塞队列put(E e)图解

Condition实现的阻塞队列插入元素图解.png

Condition实现的阻塞队列take()图解
Condition图解与源码分析
Condition实现的阻塞队列take()图解.png

4、Condtion 实现源码分析

4.1 互斥锁和读写锁中Condition的构造

4.1.1 AbstractQueuedSynchronizer中的ConditionObject

ReentrantLock与ReentrantReadWriteLock中的静态内部类Sync继承了AbstractQueuedSynchronizer,两者调用的sync.newCondition(),实际上调用的是new ConditionObject(),也就是构造的AbstractQueuedSynchronizer中的ConditionObject对象。

 1public class ConditionObject implements Conditionjava.io.Serializable {\
2
3    /** 双向链表首节点*/
4    private transient Node firstWaiter;
5    /** 双向链表尾节点*/                                                                     
6    private transient Node lastWaiter;
7
8    public ConditionObject() { }
9
10    //...
11}
4.1.2 ReentrantLock中的Condition
 1/**
2 *    ReentrantLock中的newCondition(),调用sync的newCondition();
3 */

4public Condition newCondition() {
5    return sync.newCondition();
6}
7
8
9/**
10 *    sync的中的newCondition()
11 */

12final ConditionObject newCondition() {
13    return new ConditionObject();
14}
4.1.3 ReentrantReadWriteLock中的Condition
 1/**
2 * 读锁不支持newCondition()
3 */

4public static class ReadLock implements Lockjava.io.Serializable 
5    public Condition newCondition() {
6        throw new UnsupportedOperationException();
7    }
8
9
10/**
11 * 写锁中的newCondition()
12 */

13public static class WriteLock implements Lockjava.io.Serializable {
14    public Condition newCondition() {
15        return sync.newCondition();
16    }
17}

4.2 await()源码分析

4.2.1 await()位于AbstractQueuedSynchronizer中的ConditionObject

调用Condition的await()或者awaitXxxx()会导致线程构建成Node节点加入Condition的等待队列,并且释放锁。如果线程从await()或者awaitXxxI()方法返回,表明线程又重新获取了Condition相关的锁。

 1/**
2 *    await()方法源码分析,其主要逻辑
3 * 当前线程进入等待状态,直到被通知(signal)或者中断
4 * 返回的前提:
5 * 1、其他线程调用该Condition的signal()或者signalAll()方法&&获取到锁
6 * 2、其他线程中断了该线程
7 */

8public final void await() throws InterruptedException {
9    // 判断当前线程是否被中断,如果被中断了则抛出InterruptedException异常
10    if (Thread.interrupted())
11        throw new InterruptedException();
12    // 添加一个新的等待节点到等待队列,无需CAS因为调用await前提是获取到了锁
13    Node node = addConditionWaiter();
14    // 释放锁,调用await()必须获取锁,此处释放锁可以防止死锁产生
15    int savedState = fullyRelease(node);
16    int interruptMode = 0;
17    // isOnSyncQueue(node)判断当前节点是否加入到同步队列中了,也就是移出了等待队列+    `SWDVVB
18    while (!isOnSyncQueue(node)) {
19        LockSupport.park(this); // 阻塞自己
20        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
21            break;
22    }
23    // 重新获取锁
24    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
25        interruptMode = REINTERRUPT;
26    // 清除取消等待的节点
27    if (node.nextWaiter != null
28        unlinkCancelledWaiters();
29    // 响应中断抛出异常
30    if (interruptMode != 0
31        reportInterruptAfterWait(interruptMode);
32}

await()中的addConditionWaiter()方法

 1/**
2  * 添加一个新的等待节点到等待队列,无需CAS因为调用await前提是获取到了锁
3  */

4private Node addConditionWaiter() {
5    Node t = lastWaiter;
6    // 清除取消等待的节点
7    if (t != null && t.waitStatus != Node.CONDITION) {
8        unlinkCancelledWaiters();
9        t = lastWaiter;
10    }
11    // 构造新的节点Node
12    Node node = new Node(Thread.currentThread(), Node.CONDITION);
13    // 如果尾节点为空免责证明首节点也为空,firstWaiter = node;
14    if (t == null)
15        firstWaiter = node;
16    else
17        t.nextWaiter = node; // 尾节点的下一个节点指向当前节点,此时当前节点成为新的尾节点
18    lastWaiter = node;
19    return node;
20}
4.2.2 总结
  1. await()方法在调用之前,线程一定获取到了锁,因此addConditionWaiter()无需CAS也可以保证线程安全

  2. 在阻塞自己之前,必须先释放锁fullyRelease(node),防止死锁

  3. 线程从wait中被唤醒后,必须通过acquireQueued(node, savedState)重新获取锁

  4. isOnSyncQueue(node)用于判断节点是否在AQS同步队列中(关于同步队列和等待队列文章后面有图解),如果从Condition的等待队列移动到了AQS的同步队列证明被执行了signal()

  5. LockSupport.park(this)阻塞自己之后,线程被唤醒的方式有unpark和中断,通过checkInterruptWhileWaiting(node)判断当前线程被唤醒是否是因为中断,如果中断则退出循环

4.3 signal()源码解析

调用Condition的signal()方法,会唤醒在Condition等待队列中的线程节点(唤醒的是等待时间最长的首节点),唤醒节点之前会将其移至同步队列中(这里要注意先加入同步队列在唤醒该节点,等会画图别混淆)。

 1/**
2 * ConditionObject中的signal()方法
3 */

4public final void signal() {
5    // 判断当前线程是否获取了锁,如果没有则抛出异常
6    if (!isHeldExclusively())
7        throw new IllegalMonitorStateException();
8    // 构建获取等待队列的首节点
9    Node first = firstWaiter;
10    // 如果首节点不为空
11    if (first != null)
12        // 在下面
13        doSignal(first);
14}

signal()中调用多的doSignal(Node first)方法

 1/**
2 *    唤醒首节点
3 */

4private void doSignal(Node first) {
5    do {
6        // 判断当前结束是否是最后一个等待队列中的节点
7        if ( (firstWaiter = first.nextWaiter) == null)
8            lastWaiter = null;
9        first.nextWaiter = null;
10        // 在下面
11    } while (!transferForSignal(first) &&
12             (first = firstWaiter) != null);
13}

doSignal(Node first)中调用的transferForSignal(Node node)方法

 1final boolean transferForSignal(Node node) {
2
3    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
4        return false;
5    // enq将当前节点加入AQS的同步队列,这个我在前面的文中讲过
6    Node p = enq(node);
7    int ws = p.waitStatus;
8    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
9        // 唤醒阻塞中的线程,先加入同步队列,再唤醒
10        LockSupport.unpark(node.thread);
11    return true;
12}

5、Condition实现原理图解

5.1 图解同步队列与等待队列

在文章开头第二大点介绍Condition之于Lock与wati()/notify()之于synchronized时,我们对比过二者,其中很大的一个区别在于Object的监视模型上,一个对象只拥有一个同步队列和等待队列,这样的模型一个很大的问题在于它不太适用于编写带有多个条件谓词的并发对象(可以简单理解为复杂的带高级功能的);而并发包中的Lock中的组合了Condition对象,使得其可以拥有一个同步队列和多个等待队列(一个Condition中有一个等待队列)。下面就通过图来说明Condition的等待队列和同步器中的同步队列和等待队列之间的关系。

Condition等待队列图示

Condition图解与源码分析
Condition之等待队列.png

AQS同步队列与Condition等待队列关系图示
Condition图解与源码分析
AQS同步队列与Condition等待队列关系图示 (1).png

5.2 图解await()方法如何加入等待队列

前面讲过,调用await()方法的前提是获取到了Lock对应的锁,也正是因为这个await()操作是在获取锁的前提下进行的,所以节点的构造并未使用CAS,因为它的前提条件就是线程互斥(安全)的;同时我们在讲述AQS、ReentrantLock和ReentrantReadWriteLock时讲述过其线程竞争锁资源失败,线程将会被构造成同步节点,加入AQS的同步队列中,等待后续的再次竞争或者中断退出等。上图也讲过了同步器AQS中的同步队列和Condition中的等待队列之间的关系,所以加入Condition等待队列的线程,可以理解为在AQS同步器中重新获取到锁的首节点线程被移植(这里的移植不是将以前的节点加入,是通过以前节点的信息构造一个新的线程节点加入到等待队列)到了Condition的等待队列中,其图如下。

await()图解.png

5.3 图解signal()方法如何移出等待队列

signal()方法会将等待队列中的等待时间最长的节点(首节点),移动到同步队列尾部,加入同步队列的代码是 enq(final Node node),通过CAS来线程安全的移动。移动完成之后线程再使用LockSupport.unpark(node.thread);唤醒该节点中等待的线程。线程节点被移动至同步队列中后,线程可以参与同步状态的竞争,如果竞争成功,线程将会从await()方法返回。其图解如下。

signal()方法图解.png

6、总结

Condition中的await()/signal()相比Object中的wait()/notify(),Condition拥有更多的高级特性能够实现更加复杂的等待线程集的场景。但是我们在使用Lock和Condition时在调用await()和signal()要注意必须持有Lock对象(尽管在Lock对象中定义的具体实现,构造一个Condition可以不满足持有Lock对象这个条件)。