并发编程—AQS源码分析
大家好,我是宇哥
AbstractQueuedSynchronizer源码分析
类的继承关系
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
中AbstractOwnableSynchronizer抽象类的源码如下:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {// 版本序列号private static final long serialVersionUID = 3737899427754241961L;// 构造方法protected AbstractOwnableSynchronizer() { }// 独占模式下的线程private transient Thread exclusiveOwnerThread;// 设置独占线程protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}// 获取独占线程protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}}
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。
AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍。
static final class Node {// 模式,分为共享与独占// 共享模式static final Node SHARED = new Node();// 独占模式static final Node EXCLUSIVE = null;// 结点状态// CANCELLED,值为1,表示当前的线程被取消// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行// 值为0,表示当前节点在sync队列中,等待着获取锁static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;// 结点状态volatile int waitStatus;// 前驱结点volatile Node prev;// 后继结点volatile Node next;// 结点所对应的线程volatile Thread thread;// 下一个等待者Node nextWaiter;// 结点是否在共享模式下等待final boolean isShared() {return nextWaiter == SHARED;}// 获取前驱结点,若前驱结点为空,抛出异常final Node predecessor() throws NullPointerException {// 保存前驱结点Node p = prev;if (p == null) // 前驱结点为空,抛出异常throw new NullPointerException();else // 前驱结点不为空,返回return p;}// 无参构造方法Node() { // Used to establish initial head or SHARED marker}// 构造方法Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// 构造方法Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。
CANCELLED,值为1,表示当前的线程被取消。SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。值为0,表示当前节点在sync queue中,等待着获取锁。
// 内部类public class ConditionObject implements Condition, java.io.Serializable {// 版本号private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */// condition队列的头结点private transient Node firstWaiter;/** Last node of condition queue. */// condition队列的尾结点private transient Node lastWaiter;/*** Creates a new {@code ConditionObject} instance.*/// 构造方法public ConditionObject() { }// Internal methods/*** Adds a new waiter to wait queue.* @return its new wait node*/// 添加新的waiter到wait队列private Node addConditionWaiter() {// 保存尾结点Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION// 清除状态为CONDITION的结点unlinkCancelledWaiters();// 将最后一个结点重新赋值给tt = lastWaiter;}// 新建一个结点Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null) // 尾结点为空// 设置condition队列的头结点firstWaiter = node;else // 尾结点不为空// 设置为节点的nextWaiter域为node结点t.nextWaiter = node;// 更新condition队列的尾结点lastWaiter = node;return node;}/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {// 循环do {if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空// 设置尾结点为空lastWaiter = null;// 设置first结点的nextWaiter域first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/private void doSignalAll(Node first) {// condition队列的头结点尾结点都设置为空lastWaiter = firstWaiter = null;// 循环do {// 获取first结点的nextWaiter域结点Node next = first.nextWaiter;// 设置first结点的nextWaiter域为空first.nextWaiter = null;// 将first结点从condition队列转移到sync队列transferForSignal(first);// 重新设置firstfirst = next;} while (first != null);}/*** Unlinks cancelled waiter nodes from condition queue.* Called only while holding lock. This is called when* cancellation occurred during condition wait, and upon* insertion of a new waiter when lastWaiter is seen to have* been cancelled. This method is needed to avoid garbage* retention in the absence of signals. So even though it may* require a full traversal, it comes into play only when* timeouts or cancellations occur in the absence of* signals. It traverses all nodes rather than stopping at a* particular target to unlink all pointers to garbage nodes* without requiring many re-traversals during cancellation* storms.*/// 从condition队列中清除状态为CANCEL的结点private void unlinkCancelledWaiters() {// 保存condition队列头结点Node t = firstWaiter;Node trail = null;while (t != null) { // t不为空// 下一个结点Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态// 设置t节点的额nextWaiter域为空t.nextWaiter = null;if (trail == null) // trail为空// 重新设置condition队列的头结点firstWaiter = next;else // trail不为空// 设置trail结点的nextWaiter域为next结点trail.nextWaiter = next;if (next == null) // next结点为空// 设置condition队列的尾结点lastWaiter = trail;}else // t结点的状态为CONDTION状态// 设置trail结点trail = t;// 设置t结点t = next;}}// public methods/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。public final void signal() {if (!isHeldExclusively()) // 不被当前线程独占,抛出异常throw new IllegalMonitorStateException();// 保存condition队列头结点Node first = firstWaiter;if (first != null) // 头结点不为空// 唤醒一个等待线程doSignal(first);}/*** Moves all threads from the wait queue for this condition to* the wait queue for the owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。public final void signalAll() {if (!isHeldExclusively()) // 不被当前线程独占,抛出异常throw new IllegalMonitorStateException();// 保存condition队列头结点Node first = firstWaiter;if (first != null) // 头结点不为空// 唤醒所有等待线程doSignalAll(first);}/*** Implements uninterruptible condition wait.* <ol>* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* </ol>*/// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断public final void awaitUninterruptibly() {// 添加一个结点到等待队列Node node = addConditionWaiter();// 获取释放的状态int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) { //// 阻塞当前线程LockSupport.park(this);if (Thread.interrupted()) // 当前线程被中断// 设置interrupted状态interrupted = true;}if (acquireQueued(node, savedState) || interrupted) //selfInterrupt();}/** For interruptible waits, we need to track whether to throw* InterruptedException, if interrupted while blocked on* condition, versus reinterrupt current thread, if* interrupted while blocked waiting to re-acquire.*//** Mode meaning to reinterrupt on exit from wait */private static final int REINTERRUPT = 1;/** Mode meaning to throw InterruptedException on exit from wait */private static final int THROW_IE = -1;/*** Checks for interrupt, returning THROW_IE if interrupted* before signalled, REINTERRUPT if after signalled, or* 0 if not interrupted.*/private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}/*** Throws InterruptedException, reinterrupts current thread, or* does nothing, depending on mode.*/private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/// // 等待,当前线程在接到信号或被中断之前一直处于等待状态public final void await() throws InterruptedException {if (Thread.interrupted()) // 当前线程被中断,抛出异常throw new InterruptedException();// 在wait队列上添加一个结点Node node = addConditionWaiter();//int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 阻塞当前线程LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}/*** Implements timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态public final long awaitNanos(long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);final long deadline = System.nanoTime() + nanosTimeout;int interruptMode = 0;while (!isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {transferAfterCancelledWait(node);break;}if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;nanosTimeout = deadline - System.nanoTime();}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return deadline - System.nanoTime();}/*** Implements absolute timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* <li> If timed out while blocked in step 4, return false, else true.* </ol>*/// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态public final boolean awaitUntil(Date deadline)throws InterruptedException {long abstime = deadline.getTime();if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {if (System.currentTimeMillis() > abstime) {timedout = transferAfterCancelledWait(node);break;}LockSupport.parkUntil(this, abstime);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return !timedout;}/*** Implements timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* <li> If timed out while blocked in step 4, return false, else true.* </ol>*/// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0public final boolean await(long time, TimeUnit unit)throws InterruptedException {long nanosTimeout = unit.toNanos(time);if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);final long deadline = System.nanoTime() + nanosTimeout;boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {timedout = transferAfterCancelledWait(node);break;}if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;nanosTimeout = deadline - System.nanoTime();}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return !timedout;}// support for instrumentation/*** Returns true if this condition was created by the given* synchronization object.** @return {@code true} if owned*/final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {return sync == AbstractQueuedSynchronizer.this;}/*** Queries whether any threads are waiting on this condition.* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.** @return {@code true} if there are any waiting threads* @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/// 查询是否有正在等待此条件的任何线程protected final boolean hasWaiters() {if (!isHeldExclusively())throw new IllegalMonitorStateException();for (Node w = firstWaiter; w != null; w = w.nextWaiter) {if (w.waitStatus == Node.CONDITION)return true;}return false;}/*** Returns an estimate of the number of threads waiting on* this condition.* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.** @return the estimated number of waiting threads* @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/// 返回正在等待此条件的线程数估计值protected final int getWaitQueueLength() {if (!isHeldExclusively())throw new IllegalMonitorStateException();int n = 0;for (Node w = firstWaiter; w != null; w = w.nextWaiter) {if (w.waitStatus == Node.CONDITION)++n;}return n;}/*** Returns a collection containing those threads that may be* waiting on this Condition.* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.** @return the collection of threads* @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/// 返回包含那些可能正在等待此条件的线程集合protected final Collection<Thread> getWaitingThreads() {if (!isHeldExclusively())throw new IllegalMonitorStateException();ArrayList<Thread> list = new ArrayList<Thread>();for (Node w = firstWaiter; w != null; w = w.nextWaiter) {if (w.waitStatus == Node.CONDITION) {Thread t = w.thread;if (t != null)list.add(t);}}return list;}}
此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下
public interface Condition {// 等待,当前线程在接到信号或被中断之前一直处于等待状态void await() throws InterruptedException;// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断void awaitUninterruptibly();//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态long awaitNanos(long nanosTimeout) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0boolean await(long time, TimeUnit unit) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态boolean awaitUntil(Date deadline) throws InterruptedException;// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。void signal();// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。void signalAll();}
Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。
类的属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizerimplements java.io.Serializable {// 版本号private static final long serialVersionUID = 7373984972572414691L;// 头结点private transient volatile Node head;// 尾结点private transient volatile Node tail;// 状态private volatile int state;// 自旋时间static final long spinForTimeoutThreshold = 1000L;// Unsafe类实例private static final Unsafe unsafe = Unsafe.getUnsafe();// state内存偏移地址private static final long stateOffset;// head内存偏移地址private static final long headOffset;// state内存偏移地址private static final long tailOffset;// tail内存偏移地址private static final long waitStatusOffset;// next内存偏移地址private static final long nextOffset;// 静态初始化块static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}}
