源码分析ReentrantLock实现原理
ReentrantLock基本用法
ReentrantLock重入锁,区别于Synchronized关键,ReentrantLock是在Java Api层面上的锁实现。和Synchronized关键相比,ReentrantLock提供了更高级的功能,例如指定是否为公平锁、限时等待、响应中断等。
常用方法
ReentrantLock lock = new ReentrantLock(); // 构造函数,默认非公平锁;如要指定公平,传入truelock.lock(); // 获取锁,如果没有获取成功,则进入等待lock.unlock(); // 释放锁,需要事先获取到锁lock.lockInterruptibly(); // 可中断的获取锁,需要接收一个中断通知lock.tryLock(); // 尝试获取锁,获取成功返回true,失败立即返回false,不进行等待lock.tryLock(1000, TimeUnit.MILLISECONDS); // 带时间的尝试,获取失败,等待指定时间lock.isLocked(); // 判断该锁失败是锁定状态,state!=0lock.isHeldByCurrentThread(); // 判断锁是否被当前线程持有 exclusiveOwnerThread == Thread.currentThreadlock.getHoldCount(); // 锁进入的次数,获取的是state的值,每进入一次,state+1lock.isFair(); // 是否为公平
类结构
ReentrantLock的源码实现
下面将从非公平锁、公平锁、释放锁等实现源码来分析。
非公平锁获取锁的实现
ReentrantLock的默认实现是非公平锁。
public ReentrantLock() {sync = new NonfairSync();}
所以非公平锁的lock方法实现,在NonfairSync类中。
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
NonfairSync的lock方法实现中,首先会通过一个CAS的操作设置state的值,如果此时刚好有线程释放锁且当前线程仍然持有cpu时间片,那么就会进行插队(非公平性)拿到这边锁,并将锁的独占线程设置为当前线程。如果获取锁失败,则通过acquire方法再次进行尝试。
acquire实现
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
acquire方法有好几个逻辑,包括tryAcquire尝试获取锁、addWaiter创建等待节点、acquireQueued在队列内获取锁等操作。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 1if (compareAndSetState(0, acquires)) { // 2setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 3int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
1、首先判断锁的状态是否为未锁定状态,
2、如果是,则通过CAS操作,设置锁的状态值为acquires。并设置锁的独占线程为当前线程。
3、如果锁已经被占,则判锁的独占线程是否为当前线程。如果是,则锁的状态值累加acquires。因为ReentrantLock是可重入锁,所以会判断锁的状态值是否溢出,防止某个线程陷入死循环的加锁。
tryAcquire方法会返回获取锁成功或失败,如果成功,那么线程获取到锁。lock方法结束。否则就需要将当前线程入队,进行排队等待获取锁。
acquireQueued实现
acquireQueued方法是在队列内部获取锁的实现,在执行它的逻辑之前, 先要执行addWaiter方法,创建等待节点。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
Node是AbstractQueuedSynchronizer的内部类。AQS是抽象队列同步器,它在内部维护了一个称之为CLH的双向队列,队列的每个元素是Node,Node中包含了当前线程、等待状态、前驱后继节点等信息。
static final class Node {static final Node SHARED = new Node(); // 表示节点是共享模式static final Node EXCLUSIVE = null; // 表示节点是独占模式static final int CANCELLED = 1; // 表示该节点的线程处于取消状态static final int SIGNAL = -1; // 表示该节点的下一个节点需要被挂起static final int CONDITION = -2; // 表示该节点等待一个condition唤醒static final int PROPAGATE = -3; // 表示下一个acquireShared无条件传播volatile int waitStatus; // 等待状态,如果是Sync节点,默认值为0,如果是condition队列的节点,默认职位CONDITION(-2)。使用CAS进行原子的更新volatile Node prev; // 指向前驱节点,当前节点依赖它检查状态volatile Node next; // 指向后继节点,当前节点释放是需要把它唤醒volatile Thread thread; // 当前ThreadNode nextWaiter; // 执行下一个等待condition条件的节点,}
Node的状态说明:
SINGLE
后继节点处于阻塞状态(被挂起),当前节点在释放锁或者取消获取锁后,必须唤醒后继节点。为了避免竞争,acquire方法必须首先表明需要一个single信息,然后原子的去重试,如果重试失败,继续阻塞。
CANCELLED
当前节点由于超时或者中断被取消。所有的节点不会丢弃这个状态,而且,一个处于取消状态的节点永远不会再次进入阻塞。
CONDITION
当前节点处于一个condition队列,不会用于sync队列。
PROPAGATE
releaseShared需要传播到其它节点,这是在
doReleaseShared方法中设置的,以确保能传播。
关于prev和next
prev
prev指向前驱节点,当前节点依赖它做状态检查。在入队的时候被赋值,出队的时候被置为null。如果prev节点被取消,需要为当前节点往前找一个未取消的节点,而且一定是可以找到的,因为头节点不可能处于Cancelled状态。一个节点只有成功获取到了锁,才能成为头节点。一个被取消的节点不可能成功的拿到锁,一个线程只能取消自身,而不能取消其它节点。
next
next指向后继节点,当前节点的线程释放锁的时候,需要唤醒它。入队时的时候被赋值,当要避开被取消的节点时需要调整next指向,出队时被置为null。
上面介绍了AQS中Node类的重要属性,我们继续回到addWaiter方法:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); // 1// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) { // 2node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node); // 3return node;}
addWaiter方法接收一个表示Node模式的操作,这里讲的是ReentrantLock,所以mode是独占模式。
1、首先会创建一个Node节点,thread指向当前线程,mode为独占模式。
2、将tail为节点赋值给pred,如果pred不为null,那么就表示当前等待队列已经被初始化,并且已经设置好了header节点。因为是处于多线程环境,所以会通过CAS的操作去设置尾节点(tail)的值。
3、如果pred为null,也就是尾节点tail为null,那么就表示当前等待队列还未被初始化,那么调用enq方法进行队列的创建。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node())) // 1tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) { // 2t.next = node;return t;}}}}
这里使用了自旋的操作,在for循环里不断的尝试,把node节点入队。这里使用自旋,是因为是多线程环境,可能存在多个线程都在对队列进行初始化。
1、使用乐观锁进行设置队列的头。注意,队列的头节点没有存储Thread信息,头节点是一个虚拟节点,不参与锁竞争,初始状态值waitStatus=0;
2、如果在第一步设置头节点失败,那么就意味着其它线程在当前线程之前初始化好了队列,那么当前线程就进入队列尾部。
当前线程入队之后,需要在队列内部继续进行获取锁的操作,这个逻辑在acquireQueued方法中。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true; // 表示获取锁成功与否try {boolean interrupted = false; // 表示线程是否被中断for (;;) {final Node p = node.predecessor(); // 当前线程所在节点的前驱节点if (p == head && tryAcquire(arg)) { // 1setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 2interrupted = true;}} finally {if (failed) // 3cancelAcquire(node);}}
1、判断p是否为头节点,前面讲到头节点是一个虚拟节点,不参与锁竞争。然后调用tryAcquire方法尝试获取锁。如果获取锁成功,设置头节点为当前线程的节点,在设置之前,会将node中的Thread信息置为nul。
2、如果上一步获取锁还是失败(因为是非公平锁,有可能比新来的线程抢占了),那么调用shouldParkAfterFailedAcquire方法看是否需要将当前线程挂起。如果不需要挂起,那么继续进入自旋尝试获取锁。如果需要被挂起,那么调用parkAndCheckInterrupt方法将线程挂起,并检测线程是否中断。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) // 1return true;if (ws > 0) { // 2do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else { // 3compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
1、只有当当前节点的前驱节点的waitStatus=Node.SINGAL时,返回true,表示当前线程需要等待一个唤醒条件,那么当前节点的线程可以挂起。
2、如果waitStatus>0,那么表示当前节点的前驱节点状态为CANCELLED,那么需要为当前节点需要一个非CANCELLED的节点作为前驱节点。
3、表示waitStatus要么是0,要么是PROPAGATE. 表示当前线程需要一个信号,但是先不挂起,在挂起之前再重试一次。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
在执行了LockSupport.park(this)这行代码之后,线程就会进入阻塞。要么被唤醒,要么被中断。如果被唤醒,返回false,否则返回true;Thread.interrupted()会返回中断标识并清除中断标识。
我们再回到acquireQueued方法。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {interrupted = true;}
如果线程被唤醒,那么interrupted还是false,否则会被置为true,表示当前线程被中断,但是当前线程并不退出锁的竞争,而是继续尝试获取锁(不响应中断),在成功获取锁之后,在返回中断标识。
最后在回到acquire方法
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
在tryAcquire获取锁失败,并且acquireQueued方法返回true(中断标识为true),则执行selfInterrupt方法,中断线程。
注意,Thread.interrupted方法只是返回中断标识,且会清除中断标识,真正中断线程是调用Thread类的interrupt方法。
cancelAcquire实现
acquireQueued方法在finally里面还有判断failed的逻辑,如果failed为true,那么就会执行cancelAcquire方法,那么来看下该方法做了那么工作。
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null; // 1Node pred = node.prev;while (pred.waitStatus > 0) // 2node.prev = pred = pred.prev;Node predNext = pred.next; // 3node.waitStatus = Node.CANCELLED; // 4if (node == tail && compareAndSetTail(node, pred)) { // 5compareAndSetNext(pred, predNext, null);} else {int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) { // 6Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else { // 7unparkSuccessor(node);}node.next = node; // help GC // 8}}
1、首先将当前节点的thread置为null。
2、然后从当前节点往前找,找到状态不为cancelled的节点,将当前节点的prev指向该节点。
3、取出剔除cancelled状态之后的前驱节点的后继节点,其实predNext指向的是当前节点。
4、将当前节点的waitStatus值置为CANCELLED。
前4步执行完之后,当前节点所处的位置就有三种可能,在头节点的下一个节点、在尾节点、既不是头节点的下一个节点也不是尾节点。5,6,7的是这三种情况的说明。
5、是尾节点,并且通过CAS操作尾节点成功(因为是多线程环境,可能存在新的线程入队了,那么当前节点就不会是尾节点)。然后将pred的next节点置为null。
6、既不是头节点的后继节点,也不是尾节点。那么需要确保将当前节点的前驱节点状态为SIGNAL,且前驱节点的Thread信息不能为null。取出当前节点的后继节点,赋值给next,如果next不为null且next的状态值不为CANCELLED,那么就设置当前节点的前驱节点的后继节点为next。最后设置将当前节点的后继节点指向自己。
7、是头节点的后继节点。因为当前节点是头节点的后继节点,当前节点要取消获取锁操作,那么就需要唤醒它的后继节点。调用unparkSuccessor方法。最后设置将当前节点的后继节点指向自己。
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 如果ws<0将当前node节点的状态值设为0Node s = node.next; // s指向当前节点的后继节点if (s == null || s.waitStatus > 0) { // 如果节点为空,或者后继节点也是CANCELLED状态s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0) // 从尾节点开始往前找,找到第一个状态不为CANCELLED的节点,赋值给ss = t;}if (s != null) // 如果找到的节点不为空,则唤醒该节点的ThreadLockSupport.unpark(s.thread);}
从上面的对节点的处理可以看出,每种情况都不会对操作当前节点的前驱节点。这是因为在前面分析的
shouldParkAfterFailedAcquire方法,判断节点是否需要挂起的逻辑中,如果当前处理的节点的前驱节点状态为CANCELLED,将CANCELLED状态的节点处理掉。这样的话,就降低了cancelAcquire方法的实现复杂度。
非公平锁流程概括
公平锁的实现
ReentrantLock要指定为公平锁,需要在构造函数传入执行公平性参数。
ReentrantLock fairLock = new ReentrantLock(true); // 默认为非公平锁,true表示为公平锁
公平锁的lock方法内部实现是调用了acquire方法。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
这个和非公平锁一样,但是不同点是tryAcquire方法的实现,在AQS中,该方法抛出一个UnsupportedOperationException,所以要看其子类的具体实现,这里是公平锁,所以看FairSync的tryAcquire的实现。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 1if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) { // 2setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 3int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
1、判断锁的状态值是否是未锁定。
2、判断当前队列里是否存在排队的线程,如果当前线程前没有排队线程,则尝试通过CAS获取锁,如果获取成功,设置锁的独占线程为当前线程。
3、如果锁已经处于锁定状态,则判断占有锁的线程是否为当前线程,如果是当前线程,则对锁的state值累加acquires。表示重入过程。
和非公平锁最重要的区别在于,公平锁在获取锁之前,会判断队列中是否存在排队线程。
hasQueuedPredecessors实现
public final boolean hasQueuedPredecessors() {Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
在分析非公平锁入队过程中,我们知道其实在初始化头节点的时候,头节点是一个虚节点,不存储线程信息,只是用于通知下一个线程。所以只有当 h != t 成立,才能说明队列中可能存在等待的线程,具体是否存在就需要看 ((s = h.next) == null || s.thread != Thread.currentThread()) 的判断:
当(s=h.next)==null 成立,说明有线程正在对队列进行初始化,因为队列是双向的,在入队的时候,新node的前驱指针先指向尾节点,在通过cas更新尾节点为新node,最后原尾节点的后继指针指向节点。所以才会出现h.next为空。
如果h.next不为空,那么就需要看h.next节点的线程是不是当前线程。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
入队分析:
公平锁的剩余实现就和非公平锁的实现是一样的了,最重要的就是在获取锁之前,公平锁会判断当前队列是否存在排队的线程。
释放锁的实现
在使用ReentrantLock时,需要显示的释放锁,而且一般都是在finally代码块中释放。如果没有正确的释放锁,那么就会产生死锁。
ReentrantLock lock = new ReentrantLock();try {lock.lock();// do somthing...} finally {lock.unlock();}
下面就从源码的角度分析释放锁的过程。ReentrantLock的lock方法会调用AQS的release方法实现。
public final boolean release(int arg) {if (tryRelease(arg)) { // 1Node h = head;if (h != null && h.waitStatus != 0) // 2unparkSuccessor(h);return true;}return false;}
1、首先会调用tryRelease方法更新锁的状态值。tryRelease具体有AQS的子类Sync实现。
protected final boolean tryRelease(int releases) {int c = getState() - releases; // 将锁的状态值减掉对应释放锁传的值if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断释放锁的线程是不是占有锁的线程,如果不是就抛异常,防止错误的释放锁,所以只有当占用锁了,才能进行释放锁throw new IllegalMonitorStateException();boolean free = false;if (c == 0) { // 只有当锁的状态值为0时,才表示锁被释放了。ReentrantLock是可重入锁,所以在使用时重入了多少次,就需要释放多少次。free = true;setExclusiveOwnerThread(null); // 将锁的占有线程置为null}setState(c); // 更新锁的值return free;}
2、当在队列中获取到锁之后,那么头节点会更新为获取到锁的线程节点,并将线程信息置为null。现在当前线程要释放锁,所以要唤醒它的下一个节点。
unparkSuccessor方法在上面已经分析过了,简单来说就是找到队列中一个状态waitStatus值不为CANCELLED的节点,然后将它唤醒。
可中断的获取锁实现
在获取锁的过程中,如果线程被中断了,线程会感知到,然后退出锁的竞争,并抛出异常。
ReentrantLock lock = new ReentrantLock();lock.lockInterruptibly();
lockInterruptibly方法会调用AQS的acquireInterruptibly实现:
public final void acquireInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}
tryAcquire方法在上面已经分析过了,这里不在说明。主要看doAcquireInterruptibly方法:
private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException(); // 响应中断}} finally {if (failed)cancelAcquire(node);}}
和非响应中断不同的区别在于,如果线程被中断了,就会抛出一个InterruptedException。当然在抛出异常之前,会执行finally代码块的逻辑,此时failed的值就是true了,那么就会执行cancelAcquire方法,该方法在上面也分析过了。
tryLock的实现
tryLock表示仅仅是try一下,不管能不能获取到锁都会立即返回。带时间参数的表示带时间的尝试,获取到锁立即返回,没有则等待一段时间再返回。
ReentrantLock lock = new ReentrantLock();lock.tryLock();lock.tryLock(1000, TimeUnit.MILLISECONDS);
tryLock方法调用 Sync的nonfairTryAcquire实现。
public boolean tryLock() {return sync.nonfairTryAcquire(1);}
NonfairTryAcquire是非公平锁的获取实现,和非公平锁不同的时候,如果获取失败就返回,不入队。
带时间的tryLock方法调用AQS的tryAcquireNanos方法实现
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}
当线程被中断,抛出InterruptedException。tryAcquire的实现,要看锁是公平的还是非公平锁,具体就是FairSync和NonFairSync的实现。如果tryAcquire获取失败就看doAcquireNanos能否成功。
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L) // 如果等待时间已经到了,直接返回falsereturn false;final long deadline = System.nanoTime() + nanosTimeout; // 记录等待截止时间final Node node = addWaiter(Node.EXCLUSIVE); // 将当前线程入队等待boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L) // 等待的时间已经到了,返回获取锁失败,failed=truereturn false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold) // 如果需要挂起并且剩余等待时间大于最大自旋时间。就将线程挂起LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted()) // 如果线程被中断了,那就抛出异常throw new InterruptedException();}} finally {if (failed) // 线程超时获取锁失败,或者被中断,那么就要退出队列cancelAcquire(node);}}
END。
