JDK源码系列:AQS(队列同步器)原理
大家好,好久不见,今天看下JDK中的JUC包中AQS(AbstractQueuedSynchronizer 队列同步器)的实现原理。
JUCL下的锁和synchronized提供的锁的区别
1、锁的获取和释放是显示的靠程序员用代码来控制的,增加了灵活性,可以实现更加复杂的应用场景
2、尝试非堵塞式的获取锁
3、可中断的获取锁
4、可超时的获取锁
5、等待队列可按条件分类(Condition),这样可以实现更加精确的按组唤醒操作
LOCK接口API
方法名称 |
描述 |
void lock() |
堵塞式的获取锁(不响应中断) |
void lockInterruptibly() |
堵塞式的获取锁(响应中断) |
boolean tryLock(); |
非堵塞式的获取锁(立刻响应,成功返回true,失败返回false) |
boolean tryLock(long time, TimeUnit unit) |
堵塞式的带超时时间的可响应中断的获取锁 |
void unlock(); |
释放当前线程持有的锁 |
Condition newCondition(); |
返回一个绑定到了当前lock实例上的新的条件对象,这是用来做队列分组的条件,每个条件对象都对应一个队列 |
AQS的地位
AQS = AbstractQueuedSynchronizer 队列同步器
AQS是JDK5.0 引入的一个抽象类,它对常见的lock场景进行了抽象,目的是对各种场景的lock提供基础支持,使锁实现起来更加容易(排队与唤醒功能)。
AQS位于java.util.concurrent.locks包中,可以看出它就是为lock服务的,ReentrantLock(独占式可重入锁)、Semaphore(共享式锁)、ReentrantReadWriteLock(混合式锁)、等都是基于它来实现的,还有CountDownLatch和ThreadPoolExecutor.Worker中也有AQS的影子。
AQS提供的能力
1、使用了一个int成员变量来表示同步状态
2、提供了一个FIFO队列来支持竞争线程的排队工作
3、通过模板方法设计模式来对外提供能力(子类覆盖某些步骤抽象方法)
4、定义三个方法来修改 同步状态值(getState()、setState(int newState)、compareAndSetState(int expect,int update))
5、支持独占式的获取同步状态,也支持共享式的获取同步状态
6、支持条件等待队列
AQS中方法
1、需要锁的实现类来实现的方法
注意下面的方法虽然被实现类重写,但是并不是为调用者准备的,它们的范围是protected,其实就是 模板方法 里面的一个步骤而已,不能直接对客户端开放,所以不是 public的。
方法名称 |
描述 |
protected boolean tryAcquire(int arg) |
独占式获取同步状态,通过getState查询当前状态,定义自己的逻辑,通过compareAndSetState 设置新的状态,true表示加锁成功 |
protected boolean tryRelease(int arg) |
独占式释放同步状态,可以安全的使用getState和setState方法 |
protected int tryAcquireShared(int arg) |
共享式获取同步状态,返回>=0的值表示加锁成功,<0 的值 表示加锁失败 |
protected boolean tryReleaseShared(int arg) |
共享式释放同步状态(需要CAS确保安全) |
protected boolean isHeldExclusively() |
是否在独占模式下被线程占用,一般用来判断是否被当前线程独占 |
2、客户端可以直接调用的API(可以叫模板方法)
Lock接口方法 |
AQS中的模板方法(独占式) |
AQS中的模板方法(共享式) |
public void lock() |
public final void acquire(int arg) |
public final void acquireShared(int arg) |
public void lockInterruptibly() |
public final void acquireInterruptibly(int arg) |
public final void acquireSharedInterruptibly(int arg) |
public boolean tryLock() |
尝试获取锁,不堵塞(不涉及线程排队,所以并不需要AQS提供模板方法),自己用AQS提供的那三个方法改变state状态即可,如果失败直接返回false |
和独占式同理,子类自己实现即可 |
public boolean tryLock(long timeout, TimeUnit unit) |
public final boolean tryAcquireNanos(int arg, long nanosTimeout),涉及到等待了,所以需要排队,这是和上面那个方法的区别 |
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
public void unlock() |
public final boolean release(int arg) |
public final boolean releaseShared(int arg) |
public Condition newCondition() |
返回一个ConditionObject 对象 |
不支持,也就是Condition对象只在独占式锁里才支持 |
通过对API的分析可以知道,AQS主要提供了:
1、独占式的获取与释放同步状态
2、共享式的获取与释放同步状态
3、查询同步队列中的线程排队情况
独占式锁:同一时刻只能由一个线程获取到,其它线程只能进入队列排队,等待。
共享式锁:同一时刻可以由多个线程持有。
混合式锁:当有读锁被其它线程持有期间,写锁线程只能堵塞等待前面所有的读锁释放,写锁堵塞之后进来的读锁只能排队到写锁线程后面,当读锁释放完毕后写锁获取成功,写锁释放完毕后,排在写锁后面的读锁可以同时获取到读锁。
AQS中内部类Node
这是AQS中用到的队列节点的定义,比较复杂,有多种模式和状态,这是因为要用它来支撑多种需求场景
节点的状态用一个int变量来表示,0表示初始值,负值(<0)表示结点处于有效等待状态,而正值(>0)表示结点已被取消
static final class Node {
//共享节点
static final Node SHARED = new Node();
//独占节点
static final Node EXCLUSIVE = null;
//表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
static final int CANCELLED = 1;
//表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
static final int SIGNAL = -1;
//表示结点等待在Condition上,当其他线程调用了Condition的signal()或者signalAll()方法后,
//CONDITION状态的一个结点或者所有节点将从“条件等待队列”转移到“同步队列”中,等待获取同步锁。
static final int CONDITION = -2;
//共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
//读写锁模式下,两个写锁之间的所有读锁都会被唤醒,直到遇到下一个写锁或者到尾结点
static final int PROPAGATE = -3;
//节点的状态,可以为 0,CANCELLED,SIGNAL,CONDITION,PROPAGATE
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前节点绑定的线程信息
volatile Thread thread;
//Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用。
//在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
//在作为等待队列节点使用时,nextWaiter保存后继节点。
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中内部类ConditionObject
条件对象,每次 lock.newCondition()都会产生一个 新的 ConditionObject对象,每个对象里都维护了 一个 等待队列,用来维护基于此condition的等待线程,这样可以做到队列分组,便于更精确的唤醒目标线程。
await 在等待的过程中涉及到中断的响应问题,interruptMode中断模式来记录中断时间,该变量有三个值:
0:代表整个过程中一直没有中断发生,退出await方法后不需要任何动作;
THROW_IE:表示退出await()方法时需要抛出InteruptedException,这种模式对应于中断发生在signal之前,这是一种正常中断。
REINTERRUPT:表示退出await()方法时只需要再自我中断以下, 这种模式对应于中断发生在signal之后获取到同步锁之前, 即中断来的太晚了。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 指向条件队列的首节点 */
private transient Node firstWaiter;
/** 指向条件队列的尾结点 */
private transient Node lastWaiter;
public ConditionObject() { }
// public methods
//唤醒一个条件等待节点,转移到 同步等待队列中
public final void signal() {
if (!isHeldExclusively())//当前线程是否已经得到了独占锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//转移所有的条件等待节点到 同步队列中
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
//不可中断的入队条件等待队列
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
//中断模式,如果中断发生在 条件等待队列 出队之前时,则需要抛出中断异常;
//如果中断时间发生在 重新竞争独占锁的过程中,则执行reinterrupt(自己只设置下中断标志,剩下的交给上层代码处理)
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
//进入条件等待,可中断
public final void await() throws InterruptedException {
}
//进入带超时时间的条件队列,可中断
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
}
//带绝对截止时间的等待,可中断
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
}
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
}
}
AQS基于Unsafe实现CAS操作
//支持原子操作相关
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
//AQS对象中state成员偏移量,相对于对象起始位置
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
//AQS对象中head成员偏移量
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
//AQS对象中tail成员偏移量
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
//Node对象中waitStatus成员偏移量
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
//Node对象中next成员偏移量
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
//为AQS对象修改head,只被enq方法使用
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
//为AQS对象修改tail,只被enq方法使用
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
//为node对象修改waitStatus
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
//为node对象修改next
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
AQS中的字段信息
//指向等待队列的首节点,首节点状态不能是cancelled,只能被setHead方法修改
private transient volatile Node head;
//指向等待队列的尾结点,只能被enq方法修改
private transient volatile Node tail;
//同步状态
private volatile int state;
图解AQS
独占模式
共享模式
总结
1、AQS提供一个int变量作为 同步状态(锁的本质就是一个内存变量),提供了CAS的方式安全修改state变量;
2、AQS提供了排队能力,对没有获取到锁的线程提供一个FIFO队列,来管理这些线程,它通过Unsafe提供的CAS能力来实现多线程下无锁的入队和出队,达到一个高性能的目的;
3、AQS对于入队成功的线程采取 LockSupport.park(thread)提供的能力 使某个线程休眠,降低CPU消耗,在合适的时机再通过 LockSupport.unpark(thread)操作唤醒休眠的线程来继续运行;
4、AQS提供了条件队列的支持(仅支持独占模式),根据条件对象的不同来将等待的线程分类管理(多队列),这样的好处是可以更精细化的管理这些线程,避免不必要的唤醒,减少对CPU的消耗。
5、AQS对独占锁提供线程锁定来支持可重入的特性;
6、公平锁模式下如果前面有排队的就加入到队尾,按先后顺序获取锁;非公平锁模式下不管前面有没有排队的,都会先尝试获取锁,失败后再入队尾;