vlambda博客
学习文章列表

源码分析J.U.C之AQS底层原理

AQS:Abstract Queue Synchronizer,抽象队列同步器。


Abstract : 采用了模板方法设计模式暴露出上锁逻辑给到子类,让子类加锁,所以延伸出ReentrantLock、ReentrantReadWriterLock等等锁。


Queue:没有抢到锁的线程进入阻塞队列


Synchronizer:同步器


AQS的核心功能是当线程在子类中抢锁失败之后进入阻塞队列,并让当前线程睡眠,等待被唤醒继续抢占锁。它提供了独占锁和共享锁两种类型。


独占锁:当一个线程占据一把锁的时候,其它线程只能排队等待,只能等持有这把锁的线程执行完之后,才能再次抢锁。


共享锁:当一个线程占据一把共享锁的时候,其它线程还能够持有这把共享锁,但是共享锁和独占锁是互斥的,共享锁锁的是读操作,互斥锁锁的是写操作,读操作多线程下是安全的,写操作多线程下是不安全的。



1、独占锁 


核心流程:当一个线程抢锁的失败的时候,再挣扎强一次,如果再失败,再去睡眠。这里有个注意点:唤醒下一个节点,并不是从head节点往后找,而是从tail节点往前找,直到找到head节点的下一个节点。为什么?下文有解释。


acquire()方法

//获取锁,独占锁public final void acquire(int arg) { if (!tryAcquire(arg) && //尝试获取锁,交给子类实现 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

获取写锁(互斥锁)的代码。tryAcquire方法由子类来完成,该方法也称之为模板方法,为何如此设计?这时因为AQS无法知道子类如何定义获取锁的操作。假如子类判断当前线程没有获取到锁,那么如何?排队去。addWaiter(Node.EXCLUSIVE)方法用于排队站位,acquireQueued方法用于将线程阻塞,此时不难猜出这里面肯定调用了tryAcquire(arg),可以想想为什么?因为在任何过程中,都有可能别的线程已经释放了锁。


addWaiter方法

将阻塞线程节点放入阻塞队列。

private Node addWaiter(Node mode) { // 创建等待节点:当前线程对象 + 锁模式(互斥锁?共享锁) Node node = new Node(Thread.currentThread(), mode); // 快速加入到队列 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // (特别注意:当上面的CAS成功后,有一瞬间 这里的pred.next并没有关联。会导致什么问题? // 有一瞬间,你通过head引用遍历的时候,是到达不了最后一个节点的,            // A(head) ->  B(旧tail) <- C(tail)。如何获取最新的节点呢?            // 通过tail指针往前遍历即可,这个在下文有具体的问题 pred.next = node; return node; } } enq(node); return node;}private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 懒加载时,head和tail 分别代表aqs的头尾节点 // 通过CAS实现原子初始化操作,直接用一个空节点实现初始化,此时head和tail指向同一个空节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}

acquireQueued方法

当加入阻塞队列后,调用该方法考虑是否将当前线程进行阻塞。在看该方法时,请考虑一个情况:假如在添加到阻塞队列后,当前锁状态是无锁时, 怎么办?那么一定是去尝试获取锁。

// node节点为阻塞队列中的线程节点final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 当前节点的前驱节点 final Node p = node.predecessor(); // 假如p是头结点,有可能此时头结点释放了锁,那么尝试调用tryAcquire,让子类抢一下锁 if (p == head && tryAcquire(arg)) { // 获取成功,更新头结点,释放引用 setHead(node); p.next = null; failed = false; return interrupted; } // 如果前驱节点不是头结点或者抢锁失败,何如?那就先判断是否应该被阻塞,如果阻塞呢?            // 调用parkAndCheckInterrupt方法来阻塞当前线程。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
// 阻塞当前线程。响应中断的方式阻塞线程private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); // 判断是否是通过中断的方式来唤醒的。1、unpark 2、interrupt}

shouldParkAfterFailedAcquire方法

该方法用于判断当前线程节点是否应该阻塞。无非就是找到一个可靠(活着有效)的节点,然后将当前线程节点作为其后继节点即可。

// pred是前驱节点,node是当前线程节点private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果前驱节点是SIGNAL,那么此时可以安全的睡眠(因为SIGNAL状态,代表了上一个线程是活的,它可以通知你,所以当前线程节点可以安全的阻塞了) return true; if (ws > 0) { // 前一个节点是CANCEL无效节点,那咋整?一直往前找,直到找到(有效节点) do { // 前驱节点的前驱节点,也即:踩着死亡节点往前跳 Node predPrev = pred.prev; // 将前驱的前驱当成当前线程节点的前驱节点 pred = predPrev; // 更新引用 node.prev = pred; } while (pred.waitStatus > 0); // 更新找到的前驱有效节点的next引用,指向当前节点 pred.next = node; } else { // 正常节点,那么CAS 将其变为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}


release方法

释放写锁(互斥锁)的代码。

public final boolean release(int arg) { if (tryRelease(arg)) {//子类释放锁成功 Node h = head; if (h != null && h.waitStatus != 0)            //唤醒后继节点,也就是头节点的下一个节点 unparkSuccessor(h); return true; } return false;}
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null;      //从尾部寻找可唤醒节点,为什么? for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);}

为什么node.next可能是null?为什么需要从尾部寻找可唤醒的节点?

源码分析J.U.C之AQS底层原理

以上图片是从 中获取,其中说到尾插法怎么保证了线程安全,第一步:设置当前节点的prev节点为tail节点,然后将tail节点的引用指向了当前节点,最后将旧的tail节点的.next指向了当前节点,其中第二步是cas操作,可以保证原子性,但是当第二步骤成功之后的一瞬间,此时还没有执行第三步骤,此时的node.next还没有来得及执行,所以此时node.next有可能是null,那怎么才能够保证获取到有效的下一个节点呢?因为第二步是cas操作,所以第二步是将tail引用指向了当前节点,如果执行到了这一步骤,那么第一步骤肯定是执行完了,所以尾节点.prev可以保证能够找到有效的前驱结点。


2、共享锁


共享锁核心流程,state=3代表同时有3个线程可以抢到锁,绿色代表抢到锁,红色代表进入阻塞中,白色准备抢锁。

源码分析J.U.C之AQS底层原理

第一种情况:当一个白色read准备抢锁,也就是子类调用tryAcquireShared()方法返回的值小于0就是代表没有抢到锁,没有抢到锁进入阻塞队列,进入阻塞队列调用addWaiter()方法,然后挣扎一下再去尝试获取锁,实在是获取不到,则阻塞线程,阻塞之前循环寻找一个有效节点(wsStatus=Node.SINGAL)当做前驱节点。

源码分析J.U.C之AQS底层原理

第二种情况:当一个白色read准备抢锁并进入阻塞队列,此时有线程释放锁,这个时候再一次抢锁成功,此时调用了setHeadAndPropagate()方法,把当前节点设置为head节点,然后准备唤醒下一个线程,最终调用doReleaseShare()方法唤醒下一个节点。当下一个节点被唤醒之后,将会自旋进行抢锁

源码分析J.U.C之AQS底层原理

可以看到head节点唤醒下一个节点的时候,下一个节点开始自旋,然后判断上一个节点是否是head节点,如果是则尝试获取锁,如果此时抢锁成功并且还可以抢锁,也就是r>0,则继续唤醒下一个节点,调用setHeadAndPropagate(),将当前节点设置为head节点,继续唤醒下一个节点。

总结一下:head节点唤醒下一个节点之后,此时下一个节点苏醒,那么就将它设置成header节点,然后继续唤醒下一个节点,反复如此,所有的共享锁将会被唤醒

第三种情况是:当一个线程抢锁,此时直接抢锁成功,不需要进入阻塞队列和睡眠,此时不管共享锁还没有没有资源,不做任何唤醒操作。那唤醒操作还会在哪里?也就是当一个线程执行完之后,释放锁的时候,此时就唤醒阻塞队列中的节点,此时的步骤和上面的第二种情况的后续逻辑一样,唤醒一个之后,就会重复唤醒其它共享节点。


所以唤醒后续节点的时机有两个地方:1、当一个线程抢锁失败,进入阻塞队列,再一次抢锁成功时,挨个唤醒后续节点。2、当一个线程抢锁成功并释放锁的时候,会唤醒正在阻塞的节点。这里有个点,就是当两个线程同时释放锁的时候,先看下源码

源码分析J.U.C之AQS底层原理


释放锁的逻辑是:

1、获取head临时变量。

2、判断Node状态为SINGAL。

3、CAS设置成0。

4、唤醒后序节点。

当CAS被两个个Read线程同时执行的时候,CAS保证只有一个线程能够执行成功,其它线程执行失败,但是执行失败的线程将会循环在执行一次。但是此时Node的状态被第一个线程改成了0,如果此时第一个线程尝试获取锁成功了,但是还没有执行setHeadAndPropagate(),此时线程1会将head的状态变成-3,也就是PROPAGATE,主要的目的在于:

让第一个线程可以继续唤醒下一个节点,因为第二个线程没有执行unPark唤醒的代码,而是直接设置node的状态为PROPAGATE,所以此时如果不设置PROPAGATE,那么head的状态就是0,那么此时第一个线程不会执行唤醒后续的操作。


具体源码分析如下


acquireShared方法

public final void acquireShared(int arg) {    // 由子类判断是否获取锁成功,若不成功?阻塞 if (tryAcquireShared(arg) < 0)  doAcquireShared(arg);}
private void doAcquireShared(int arg) { boolean failed = true; // 和互斥锁一样,只不过呢?这里添加的是共享模式的锁节点 final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 刚好这个节点就在头结点后面,可能头结点很快就释放锁了,那么尝试获取锁 if (p == head) {                //针扎一下,尝试抢锁,和独占锁的流程类似,尝试获取一次 int r = tryAcquireShared(arg); // 如果获取锁成功,尝试唤醒后面的共享节点(因为共享锁是可以多线程同时获取) // A(head 写)->B(读)->C(读) if (r >= 0) {                 //此时说明获取锁成功了,唤醒其它g共享锁 setHeadAndPropagate(node, r); // 将当前获取锁的共享节点更新头部,然后唤醒后继节点 p.next = null; // help GC if (interrupted) // 如果在等待过程中发生了中断,由于中断而被唤醒,那么置位当前线程的中断标志位 selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
// 用于更新头结点,并且唤醒后继共享节点private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || // 信号量还有多余的,那么直接唤醒。eg:A:2 head->B-C h == null || // 不可能发生 h.waitStatus < 0 || // SIGNAL 表明必须唤醒后继节点 那么直接唤醒 (h = head) == null || // 不可能发生。A B (获取锁) head -> D h.waitStatus < 0) { // SIGNAL 表明必须唤醒后继节点 那么直接唤醒 Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}
// 释放共享锁节点。eg:当前状态:A B (获取锁) head -> C -> D private void doReleaseShared() { for (;;) { Node h = head; // 保存head临时变量(两个状态:head没有被更新、head被更新) if (h != null && h != tail) { // 链表中还存在等待节点 int ws = h.waitStatus; // 理应要唤醒后面的节点,因为SIGNAL的语义就是必须唤醒后面的节点(正常状态) if (ws == Node.SIGNAL) { // CAS将该状态由SIGNAL改为0,表示唤醒成功,那么如果失败呢?此时表明多个线程同时释放了共享锁,都需要同时唤醒后继节点,此时state 状态为 2 表明了资源数可用为2,需要唤醒后面两个等待的共享节点 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 直接唤醒后继节点即可 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // PROPAGATE = -3 若CAS成功,表明A释放了一个共享锁时唤醒了C,此时在C还没有更新头节点之前,A马上又释放了一个共享锁,且CAS成功将头结点的状态改为了PROPAGATE,此时完美保证了第一个状态唤醒的正确性 continue; } if (h == head) // 头结点至今没被改变过,且 h != null && h != tail 成功,那么直接退出循环,没有节点了 break; }}