vlambda博客
学习文章列表

AQS源码分析(一) - exclusive mode(独占模式 )

AQS全名为AbstractQueuedSynchronizer


  • 注释解读

首先让我们来看下类的部分注释信息
 Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely onfirst-in-first-out (FIFO) wait queues.   This class is designed tobe a useful basis for most kinds of synchronizers that rely on asingle atomic {@code intvalue to represent state.
由注释信息可以知道,AQS被设计为用一个int值来表示状态,同时依靠FIFO队列(CLH队列)来实现的阻塞同步器;
This class supports either or both a default exclusive mode and a shared mode.
此段说明此类默认支持 exclusive (独占模式,例如:ReentrantLock) 和 shared(共享模式,例如:CountDownLatch);

  • 内部结构介绍


 1. int类型的状态: 代表共享的资源
 2.FIFO队列:此处的队列为一个变种的CLH队列( CLH队列此处不详细介绍 ), 其中节点的结构如下:
 
static final class Node {  //节点等待的信号值 volatile int waitStatus;  //前一个节点  volatile Node prev;  //后一个节点 volatile Node next; //当前线程  volatile Thread thread;  //Condition队列的下一个节点  Node nextWaiter;}
队列需要一个虚拟的head节点才能启动,但是此处并不会直接创建出head,只有在出现竞争的情况下才进行创建;
默认head持有资源拥有执行权限;
队列的运作原理:出列更新设置head节点,入列设置tail节点;
下图展示了多个线程竞争一个资源时,队列中各节点的状态

下面我们以 ReentrantLock, CountDownLatch为例,分别看一下独占和共享模式AQS是如何执行的。

在看源码之前首先看一下需要子类重写的几个方法及其作用:

//独占模式尝试获取资源,获取成功则返回true,获取失败返回falseprotected boolean tryAcquire(int arg)//独占模式尝试释放资源,释放成功返回true,释放失败返回falseprotected boolean tryRelease(int arg)//共享模式,尝试获取资源,返回值负数代表失败;0表示成功,没有剩余可用资源;正数表示成功,有剩余资源protected int tryAcquireShared(int arg)//共享模式,尝试释放资源,释放后需要唤醒等待则为true,不需要唤醒则为falseprotected boolean tryReleaseShared(int arg)
  • ReentrantLock执行流程

1. 创建ReentrantLock 对象public ReentrantLock() {  //默认创建非公平的锁 sync = new NonfairSync();}2.调用lock()锁定资源public void lock() {  //调用NonfairSync的lock();  sync.lock();} static final class NonfairSync extends Sync { final void lock() {            //步骤1:首先通过CAS操作将AQS中的资源state设置为1 if (compareAndSetState(0, 1))                //步骤2:将独占线程设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); else                //步骤3:调用AQS中方法来获取资源 acquire(1); }        //步骤4:尝试获取资源 protected final boolean tryAcquire(int acquires) {            //父类Sync中的nonfairTryAcquire() return nonfairTryAcquire(acquires);        }}

 只有一个线程获取资源则在步骤2之后获取资源成功后就可以拥有执行权限了,此时另一个线程继续获取执行权限则会进入到步骤3,我们继续看代码:

//AQS中的acquire方法public final void acquire(int arg) {        //tryAcquire为步骤4的方法尝试获取资源,        //因为其他线程占有资源,尝试获取失败,需要再执行acquireQueued()        // addWaiter为向FIFO队列增加等待的NODE if (!tryAcquire(arg) &&            //步骤6 acquireQueued 在队列中获取有资源执行权限的线程 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}//NonfairSync父类Sync中的nonfairTryAcquire方法final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();        if (c == 0) {        //如果当前资源未被占用,则可以直接CAS操作占用资源,        //不公平锁也是在这体现,新的线程可以直接抢占资源,不需要进入FIFO队列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {            //同一个线程只需要把占有资源数加上就行            //由此可以ReentrantLock为一个可重入锁 int nextc = c + acquires;            if (nextc < 0// overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; }        //当前资源被其他线程占用则获取资源失败 return false; }//AQS中的addWaiterprivate Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;    //tail初始为null if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }    //步骤5 enq(node); return node;}//步骤5调用的方法 向FIFO队列中添加private Node enq(final Node node) {    //自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize            //如果tail节点为null,则初始化一个虚拟head,开始下一次自旋 if (compareAndSetHead(new Node())) tail = head; } else {            //将要新加的节点的前置节点设置为tail, node.prev = t;            //CAS操作将tail改为新增的节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}//步骤6调用的final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;        //自旋操作 for (;;) { final Node p = node.predecessor();            //当前节点的前置节点,如果前置节点为head,则尝试获取资源            //如果前置节点不是head说明还有节点在队列中            //FIFO在此处体现,先保证前置节点执行玩后再执行 if (p == head && tryAcquire(arg)) {            //如果当前线程获得执行资源则将当前节点设置为head setHead(node); p.next = null; // help GC failed = false; return interrupted; }            //判断是够需要阻塞 if (shouldParkAfterFailedAcquire(p, node) &&                //步骤7阻塞当前节点线程 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}

到此步骤7处第二个竞争资源的线程就被阻塞了,下面为步骤7后AQS的队列:

同理,如果此时再有第三个线程尝试获取资源,则队列信息如下:

    到此,ReentrantLock的非公平锁竞争资源及阻塞的过程就完成了;接下来我们继续看下线程1释放资源后,后续线程的唤醒操作;

//调用ReentrantLock中的unlockpublic void unlock() { //调用AQS中的release sync.release(1);}public final boolean release(int arg) {    //调用需要子类(ReentrantLock.Sync)实现的尝试释放 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0)           //步骤8唤醒后续节点 unparkSuccessor(h); return true; } return false;}protected final boolean tryRelease(int releases) { int c = getState() - releases;    //只有独占线程才能进行释放操作 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;    //当资源完全释放后当前独占线程置为空 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free;}private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0)        compareAndSetWaitStatus(node, ws, 0); Node s = node.next;    //后继节点如果为空或者状态是被取消,则从tail往前到当前节点未知一直找到可唤醒的节点 if (s == null || s.waitStatus > 0) { s = null;        //此处从tail开始往前遍历的原因于addWaiter和enq两个方法有关        //因为新增tail为先赋值tail为新节点,在将原tail的next指向新的tail        //如果不从tail开始可能会有节点遗漏的问题        //这点可以在Node.next属性的注释中看到,next指针是不可靠的 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null)        //步骤9对节点的线程进行唤醒操作 LockSupport.unpark(s.thread);}
此时当线程1执行unlock()后,会经过步骤9唤醒后续等待线程即线程2,此时线程2在步骤7处阻塞,被唤醒继续自旋执行,尝试获取锁之后就获得了资源的执行权限;
  • 小结

    我们以ReentrantLock的非公平锁运行的流程讲解了一下在发生资源竞争时AQS独占模式的运行原理和队列信息的变化,以及当资源释放后后续线程的唤醒流程;可以看出lock的资源被抽象为state,当state=0时代表资源可以获取,当重入锁定时state会进行累加操作;
    AQS还提供了ConditionObject当不满足条件时挂起线程,当满足条件时唤醒线程,后续我们继续讨论;
    Doug Lea 大神关于AQS的论文研究:  h ttp://gee.cs.oswego.edu/dl/papers/aqs.pdf