源码分析J.U.C之AQS底层原理
AQS:Abstract Queue Synchronizer,抽象队列同步器。
Abstract : 采用了模板方法设计模式暴露出上锁逻辑给到子类,让子类加锁,所以延伸出ReentrantLock、ReentrantReadWriterLock等等锁。
Queue:没有抢到锁的线程进入阻塞队列
Synchronizer:同步器
AQS的核心功能是当线程在子类中抢锁失败之后进入阻塞队列,并让当前线程睡眠,等待被唤醒继续抢占锁。它提供了独占锁和共享锁两种类型。
独占锁:当一个线程占据一把锁的时候,其它线程只能排队等待,只能等持有这把锁的线程执行完之后,才能再次抢锁。
共享锁:当一个线程占据一把共享锁的时候,其它线程还能够持有这把共享锁,但是共享锁和独占锁是互斥的,共享锁锁的是读操作,互斥锁锁的是写操作,读操作多线程下是安全的,写操作多线程下是不安全的。
1、独占锁
核心流程:当一个线程抢锁的失败的时候,再挣扎强一次,如果再失败,再去睡眠。这里有个注意点:唤醒下一个节点,并不是从head节点往后找,而是从tail节点往前找,直到找到head节点的下一个节点。为什么?下文有解释。
acquire()方法
//获取锁,独占锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取锁,交给子类实现
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
获取写锁(互斥锁)的代码。tryAcquire方法由子类来完成,该方法也称之为模板方法,为何如此设计?这时因为AQS无法知道子类如何定义获取锁的操作。假如子类判断当前线程没有获取到锁,那么如何?排队去。addWaiter(Node.EXCLUSIVE)方法用于排队站位,acquireQueued方法用于将线程阻塞,此时不难猜出这里面肯定调用了tryAcquire(arg),可以想想为什么?因为在任何过程中,都有可能别的线程已经释放了锁。
addWaiter方法
将阻塞线程节点放入阻塞队列。
private Node addWaiter(Node mode) {
// 创建等待节点:当前线程对象 + 锁模式(互斥锁?共享锁)
Node node = new Node(Thread.currentThread(), mode);
// 快速加入到队列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// (特别注意:当上面的CAS成功后,有一瞬间 这里的pred.next并没有关联。会导致什么问题?
// 有一瞬间,你通过head引用遍历的时候,是到达不了最后一个节点的,
// A(head) -> B(旧tail) <- C(tail)。如何获取最新的节点呢?
// 通过tail指针往前遍历即可,这个在下文有具体的问题
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 懒加载时,head和tail 分别代表aqs的头尾节点
// 通过CAS实现原子初始化操作,直接用一个空节点实现初始化,此时head和tail指向同一个空节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
当加入阻塞队列后,调用该方法考虑是否将当前线程进行阻塞。在看该方法时,请考虑一个情况:假如在添加到阻塞队列后,当前锁状态是无锁时, 怎么办?那么一定是去尝试获取锁。
// node节点为阻塞队列中的线程节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当前节点的前驱节点
final Node p = node.predecessor();
// 假如p是头结点,有可能此时头结点释放了锁,那么尝试调用tryAcquire,让子类抢一下锁
if (p == head && tryAcquire(arg)) {
// 获取成功,更新头结点,释放引用
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 如果前驱节点不是头结点或者抢锁失败,何如?那就先判断是否应该被阻塞,如果阻塞呢?
// 调用parkAndCheckInterrupt方法来阻塞当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 阻塞当前线程。响应中断的方式阻塞线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 判断是否是通过中断的方式来唤醒的。1、unpark 2、interrupt
}
shouldParkAfterFailedAcquire方法
该方法用于判断当前线程节点是否应该阻塞。无非就是找到一个可靠(活着有效)的节点,然后将当前线程节点作为其后继节点即可。
// pred是前驱节点,node是当前线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驱节点是SIGNAL,那么此时可以安全的睡眠(因为SIGNAL状态,代表了上一个线程是活的,它可以通知你,所以当前线程节点可以安全的阻塞了)
return true;
if (ws > 0) {
// 前一个节点是CANCEL无效节点,那咋整?一直往前找,直到找到(有效节点)
do {
// 前驱节点的前驱节点,也即:踩着死亡节点往前跳
Node predPrev = pred.prev;
// 将前驱的前驱当成当前线程节点的前驱节点
pred = predPrev;
// 更新引用
node.prev = pred;
} while (pred.waitStatus > 0);
// 更新找到的前驱有效节点的next引用,指向当前节点
pred.next = node;
} else {
// 正常节点,那么CAS 将其变为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
release方法
释放写锁(互斥锁)的代码。
public final boolean release(int arg) {
if (tryRelease(arg)) {//子类释放锁成功
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继节点,也就是头节点的下一个节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从尾部寻找可唤醒节点,为什么?
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
为什么node.next可能是null?为什么需要从尾部寻找可唤醒的节点?
以上图片是从 中获取,其中说到尾插法怎么保证了线程安全,第一步:设置当前节点的prev节点为tail节点,然后将tail节点的引用指向了当前节点,最后将旧的tail节点的.next指向了当前节点,其中第二步是cas操作,可以保证原子性,但是当第二步骤成功之后的一瞬间,此时还没有执行第三步骤,此时的node.next还没有来得及执行,所以此时node.next有可能是null,那怎么才能够保证获取到有效的下一个节点呢?因为第二步是cas操作,所以第二步是将tail引用指向了当前节点,如果执行到了这一步骤,那么第一步骤肯定是执行完了,所以尾节点.prev可以保证能够找到有效的前驱结点。
2、共享锁
共享锁核心流程,state=3代表同时有3个线程可以抢到锁,绿色代表抢到锁,红色代表进入阻塞中,白色准备抢锁。
第一种情况:当一个白色read准备抢锁,也就是子类调用tryAcquireShared()方法返回的值小于0就是代表没有抢到锁,没有抢到锁进入阻塞队列,进入阻塞队列调用addWaiter()方法,然后挣扎一下再去尝试获取锁,实在是获取不到,则阻塞线程,阻塞之前循环寻找一个有效节点(wsStatus=Node.SINGAL)当做前驱节点。
第二种情况:当一个白色read准备抢锁并进入阻塞队列,此时有线程释放锁,这个时候再一次抢锁成功,此时调用了setHeadAndPropagate()方法,把当前节点设置为head节点,然后准备唤醒下一个线程,最终调用doReleaseShare()方法唤醒下一个节点。当下一个节点被唤醒之后,将会自旋进行抢锁
可以看到head节点唤醒下一个节点的时候,下一个节点开始自旋,然后判断上一个节点是否是head节点,如果是则尝试获取锁,如果此时抢锁成功并且还可以抢锁,也就是r>0,则继续唤醒下一个节点,调用setHeadAndPropagate(),将当前节点设置为head节点,继续唤醒下一个节点。
总结一下:head节点唤醒下一个节点之后,此时下一个节点苏醒,那么就将它设置成header节点,然后继续唤醒下一个节点,反复如此,所有的共享锁将会被唤醒。
第三种情况是:当一个线程抢锁,此时直接抢锁成功,不需要进入阻塞队列和睡眠,此时不管共享锁还没有没有资源,不做任何唤醒操作。那唤醒操作还会在哪里?也就是当一个线程执行完之后,释放锁的时候,此时就唤醒阻塞队列中的节点,此时的步骤和上面的第二种情况的后续逻辑一样,唤醒一个之后,就会重复唤醒其它共享节点。
所以唤醒后续节点的时机有两个地方:1、当一个线程抢锁失败,进入阻塞队列,再一次抢锁成功时,挨个唤醒后续节点。2、当一个线程抢锁成功并释放锁的时候,会唤醒正在阻塞的节点。这里有个点,就是当两个线程同时释放锁的时候,先看下源码
释放锁的逻辑是:
1、获取head临时变量。
2、判断Node状态为SINGAL。
3、CAS设置成0。
4、唤醒后序节点。
当CAS被两个个Read线程同时执行的时候,CAS保证只有一个线程能够执行成功,其它线程执行失败,但是执行失败的线程将会循环在执行一次。但是此时Node的状态被第一个线程改成了0,如果此时第一个线程尝试获取锁成功了,但是还没有执行setHeadAndPropagate(),此时线程1会将head的状态变成-3,也就是PROPAGATE,主要的目的在于:
让第一个线程可以继续唤醒下一个节点,因为第二个线程没有执行unPark唤醒的代码,而是直接设置node的状态为PROPAGATE,所以此时如果不设置PROPAGATE,那么head的状态就是0,那么此时第一个线程不会执行唤醒后续的操作。
具体源码分析如下
acquireShared方法
public final void acquireShared(int arg) {
// 由子类判断是否获取锁成功,若不成功?阻塞
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
boolean failed = true;
// 和互斥锁一样,只不过呢?这里添加的是共享模式的锁节点
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 刚好这个节点就在头结点后面,可能头结点很快就释放锁了,那么尝试获取锁
if (p == head) {
//针扎一下,尝试抢锁,和独占锁的流程类似,尝试获取一次
int r = tryAcquireShared(arg);
// 如果获取锁成功,尝试唤醒后面的共享节点(因为共享锁是可以多线程同时获取)
// A(head 写)->B(读)->C(读)
if (r >= 0) {
//此时说明获取锁成功了,唤醒其它g共享锁
setHeadAndPropagate(node, r); // 将当前获取锁的共享节点更新头部,然后唤醒后继节点
p.next = null; // help GC
if (interrupted) // 如果在等待过程中发生了中断,由于中断而被唤醒,那么置位当前线程的中断标志位
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 用于更新头结点,并且唤醒后继共享节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || // 信号量还有多余的,那么直接唤醒。eg:A:2 head->B-C
h == null || // 不可能发生
h.waitStatus < 0 || // SIGNAL 表明必须唤醒后继节点 那么直接唤醒
(h = head) == null || // 不可能发生。A B (获取锁) head -> D
h.waitStatus < 0) { // SIGNAL 表明必须唤醒后继节点 那么直接唤醒
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// 释放共享锁节点。eg:当前状态:A B (获取锁) head -> C -> D
private void doReleaseShared() {
for (;;) {
Node h = head; // 保存head临时变量(两个状态:head没有被更新、head被更新)
if (h != null && h != tail) { // 链表中还存在等待节点
int ws = h.waitStatus;
// 理应要唤醒后面的节点,因为SIGNAL的语义就是必须唤醒后面的节点(正常状态)
if (ws == Node.SIGNAL) {
// CAS将该状态由SIGNAL改为0,表示唤醒成功,那么如果失败呢?此时表明多个线程同时释放了共享锁,都需要同时唤醒后继节点,此时state 状态为 2 表明了资源数可用为2,需要唤醒后面两个等待的共享节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 直接唤醒后继节点即可
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // PROPAGATE = -3 若CAS成功,表明A释放了一个共享锁时唤醒了C,此时在C还没有更新头节点之前,A马上又释放了一个共享锁,且CAS成功将头结点的状态改为了PROPAGATE,此时完美保证了第一个状态唤醒的正确性
continue;
}
if (h == head) // 头结点至今没被改变过,且 h != null && h != tail 成功,那么直接退出循环,没有节点了
break;
}
}