vlambda博客
学习文章列表

JDK源码系列:AQS(队列同步器)原理

         


        大家好,好久不见,今天看下JDK中的JUC包中AQS(AbstractQueuedSynchronizer 队列同步器)的实现原理。


JUCL下的锁和synchronized提供的锁的区别


1、锁的获取和释放是显示的靠程序员用代码来控制的,增加了灵活性,可以实现更加复杂的应用场景

2、尝试非堵塞式的获取锁

3、可中断的获取锁

4、可超时的获取锁

5、等待队列可按条件分类(Condition),这样可以实现更加精确的按组唤醒操作


LOCK接口API


方法名称

描述

void  lock()

堵塞式的获取锁(不响应中断)

void lockInterruptibly()

堵塞式的获取锁(响应中断)

boolean tryLock();

非堵塞式的获取锁(立刻响应,成功返回true,失败返回false)

boolean tryLock(long time, TimeUnit unit)

堵塞式的带超时时间的可响应中断的获取锁

void unlock();

释放当前线程持有的锁

Condition newCondition();

返回一个绑定到了当前lock实例上的新的条件对象,这是用来做队列分组的条件,每个条件对象都对应一个队列


AQS的地位


AQS = AbstractQueuedSynchronizer 队列同步器

AQS是JDK5.0 引入的一个抽象类,它对常见的lock场景进行了抽象,目的是对各种场景的lock提供基础支持,使锁实现起来更加容易(排队与唤醒功能)。

AQS位于java.util.concurrent.locks包中,可以看出它就是为lock服务的,ReentrantLock(独占式可重入锁)、Semaphore(共享式锁)、ReentrantReadWriteLock(混合式锁)、等都是基于它来实现的,还有CountDownLatch和ThreadPoolExecutor.Worker中也有AQS的影子。


AQS提供的能力


1、使用了一个int成员变量来表示同步状态

2、提供了一个FIFO队列来支持竞争线程的排队工作

3、通过模板方法设计模式来对外提供能力(子类覆盖某些步骤抽象方法)

4、定义三个方法来修改 同步状态值(getState()、setState(int newState)、compareAndSetState(int expect,int update))

5、支持独占式的获取同步状态,也支持共享式的获取同步状态

6、支持条件等待队列


AQS中方法


1、需要锁的实现类来实现的方法


注意下面的方法虽然被实现类重写,但是并不是为调用者准备的,它们的范围是protected,其实就是 模板方法 里面的一个步骤而已,不能直接对客户端开放,所以不是 public的。

方法名称

描述

protected boolean tryAcquire(int arg)

独占式获取同步状态,通过getState查询当前状态,定义自己的逻辑,通过compareAndSetState

设置新的状态,true表示加锁成功

protected boolean tryRelease(int arg)

独占式释放同步状态,可以安全的使用getState和setState方法

protected int tryAcquireShared(int arg)

共享式获取同步状态,返回>=0的值表示加锁成功,<0 的值 表示加锁失败

protected boolean tryReleaseShared(int arg)

共享式释放同步状态(需要CAS确保安全)

protected boolean isHeldExclusively()

是否在独占模式下被线程占用,一般用来判断是否被当前线程独占


2、客户端可以直接调用的API(可以叫模板方法)


Lock接口方法

AQS中的模板方法(独占式)

AQS中的模板方法(共享式)

public void lock()

public final void acquire(int arg)

public final void acquireShared(int arg)

public void lockInterruptibly()

public final void acquireInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)

public boolean tryLock()

尝试获取锁,不堵塞(不涉及线程排队,所以并不需要AQS提供模板方法),自己用AQS提供的那三个方法改变state状态即可,如果失败直接返回false

和独占式同理,子类自己实现即可

public boolean tryLock(long timeout, TimeUnit unit)

public final boolean tryAcquireNanos(int arg, long nanosTimeout),涉及到等待了,所以需要排队,这是和上面那个方法的区别

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

public void unlock()

public final boolean release(int arg)

public final boolean releaseShared(int arg)

public Condition newCondition()

返回一个ConditionObject 对象

不支持,也就是Condition对象只在独占式锁里才支持


通过对API的分析可以知道,AQS主要提供了:

1、独占式的获取与释放同步状态

2、共享式的获取与释放同步状态

3、查询同步队列中的线程排队情况

独占式锁:同一时刻只能由一个线程获取到,其它线程只能进入队列排队,等待。

共享式锁:同一时刻可以由多个线程持有。

混合式锁:当有读锁被其它线程持有期间,写锁线程只能堵塞等待前面所有的读锁释放,写锁堵塞之后进来的读锁只能排队到写锁线程后面,当读锁释放完毕后写锁获取成功,写锁释放完毕后,排在写锁后面的读锁可以同时获取到读锁。


AQS中内部类Node


这是AQS中用到的队列节点的定义,比较复杂,有多种模式和状态,这是因为要用它来支撑多种需求场景

节点的状态用一个int变量来表示,0表示初始值,负值(<0)表示结点处于有效等待状态,而正值(>0)表示结点已被取消

static final class Node {
//共享节点 static final Node SHARED = new Node(); //独占节点 static final Node EXCLUSIVE = null; //表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 static final int CANCELLED = 1; //表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 static final int SIGNAL = -1; //表示结点等待在Condition上,当其他线程调用了Condition的signal()或者signalAll()方法后, //CONDITION状态的一个结点或者所有节点将从“条件等待队列”转移到“同步队列”中,等待获取同步锁。 static final int CONDITION = -2; //共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 //读写锁模式下,两个写锁之间的所有读锁都会被唤醒,直到遇到下一个写锁或者到尾结点 static final int PROPAGATE = -3; //节点的状态,可以为 0,CANCELLED,SIGNAL,CONDITION,PROPAGATE volatile int waitStatus; //前驱节点 volatile Node prev; //后继节点 volatile Node next; //当前节点绑定的线程信息 volatile Thread thread; //Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用。 //在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式; //在作为等待队列节点使用时,nextWaiter保存后继节点。 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}


AQS中内部类ConditionObject


条件对象,每次 lock.newCondition()都会产生一个 新的 ConditionObject对象,每个对象里都维护了 一个 等待队列,用来维护基于此condition的等待线程,这样可以做到队列分组,便于更精确的唤醒目标线程。


await 在等待的过程中涉及到中断的响应问题,interruptMode中断模式来记录中断时间,该变量有三个值:

0:代表整个过程中一直没有中断发生,退出await方法后不需要任何动作;

THROW_IE:表示退出await()方法时需要抛出InteruptedException,这种模式对应于中断发生在signal之前,这是一种正常中断。

REINTERRUPT:表示退出await()方法时只需要再自我中断以下, 这种模式对应于中断发生在signal之后获取到同步锁之前, 即中断来的太晚了。


public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** 指向条件队列的首节点 */ private transient Node firstWaiter; /** 指向条件队列的尾结点 */ private transient Node lastWaiter; public ConditionObject() { }
// public methods //唤醒一个条件等待节点,转移到 同步等待队列中 public final void signal() { if (!isHeldExclusively())//当前线程是否已经得到了独占锁 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
//转移所有的条件等待节点到 同步队列中 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
//不可中断的入队条件等待队列 public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
//中断模式,如果中断发生在 条件等待队列 出队之前时,则需要抛出中断异常; //如果中断时间发生在 重新竞争独占锁的过程中,则执行reinterrupt(自己只设置下中断标志,剩下的交给上层代码处理) /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1;
//进入条件等待,可中断 public final void await() throws InterruptedException { }
//进入带超时时间的条件队列,可中断 public final long awaitNanos(long nanosTimeout) throws InterruptedException { }
//带绝对截止时间的等待,可中断 public final boolean awaitUntil(Date deadline) throws InterruptedException { }
public final boolean await(long time, TimeUnit unit) throws InterruptedException { }
}


AQS基于Unsafe实现CAS操作


//支持原子操作相关private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;
static { try { //AQS对象中state成员偏移量,相对于对象起始位置 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); //AQS对象中head成员偏移量 headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); //AQS对象中tail成员偏移量 tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); //Node对象中waitStatus成员偏移量 waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); //Node对象中next成员偏移量 nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }}
//为AQS对象修改head,只被enq方法使用private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);}
//为AQS对象修改tail,只被enq方法使用private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}
//为node对象修改waitStatusprivate static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);}
//为node对象修改nextprivate static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update);}


AQS中的字段信息


//指向等待队列的首节点,首节点状态不能是cancelled,只能被setHead方法修改private transient volatile Node head;//指向等待队列的尾结点,只能被enq方法修改private transient volatile Node tail;//同步状态private volatile int state;


图解AQS


独占模式


JDK源码系列:AQS(队列同步器)原理


共享模式



总结


1、AQS提供一个int变量作为 同步状态(锁的本质就是一个内存变量),提供了CAS的方式安全修改state变量;


2、AQS提供了排队能力,对没有获取到锁的线程提供一个FIFO队列,来管理这些线程,它通过Unsafe提供的CAS能力来实现多线程下无锁的入队和出队,达到一个高性能的目的;


3、AQS对于入队成功的线程采取 LockSupport.park(thread)提供的能力 使某个线程休眠,降低CPU消耗,在合适的时机再通过 LockSupport.unpark(thread)操作唤醒休眠的线程来继续运行;


4、AQS提供了条件队列的支持(仅支持独占模式),根据条件对象的不同来将等待的线程分类管理(多队列),这样的好处是可以更精细化的管理这些线程,避免不必要的唤醒,减少对CPU的消耗。


5、AQS对独占锁提供线程锁定来支持可重入的特性;


6、公平锁模式下如果前面有排队的就加入到队尾,按先后顺序获取锁;非公平锁模式下不管前面有没有排队的,都会先尝试获取锁,失败后再入队尾;