vlambda博客
学习文章列表

源码分析ReentrantLock实现原理

ReentrantLock基本用法

ReentrantLock重入锁,区别于Synchronized关键,ReentrantLock是在Java Api层面上的锁实现。和Synchronized关键相比,ReentrantLock提供了更高级的功能,例如指定是否为公平锁、限时等待、响应中断等。

常用方法

ReentrantLock lock = new ReentrantLock(); // 构造函数,默认非公平锁;如要指定公平,传入truelock.lock(); // 获取锁,如果没有获取成功,则进入等待lock.unlock(); // 释放锁,需要事先获取到锁lock.lockInterruptibly(); // 可中断的获取锁,需要接收一个中断通知lock.tryLock(); // 尝试获取锁,获取成功返回true,失败立即返回false,不进行等待lock.tryLock(1000, TimeUnit.MILLISECONDS); // 带时间的尝试,获取失败,等待指定时间lock.isLocked(); // 判断该锁失败是锁定状态,state!=0lock.isHeldByCurrentThread(); // 判断锁是否被当前线程持有 exclusiveOwnerThread == Thread.currentThreadlock.getHoldCount(); // 锁进入的次数,获取的是state的值,每进入一次,state+1lock.isFair(); // 是否为公平

类结构

ReentrantLock的源码实现

下面将从非公平锁、公平锁、释放锁等实现源码来分析。

非公平锁获取锁的实现

ReentrantLock的默认实现是非公平锁。

public ReentrantLock() { sync = new NonfairSync();}

所以非公平锁的lock方法实现,在NonfairSync类中。

static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}

NonfairSynclock方法实现中,首先会通过一个CAS的操作设置state的值,如果此时刚好有线程释放锁且当前线程仍然持有cpu时间片,那么就会进行插队(非公平性)拿到这边锁,并将锁的独占线程设置为当前线程。如果获取锁失败,则通过acquire方法再次进行尝试。

acquire实现

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

acquire方法有好几个逻辑,包括tryAcquire尝试获取锁、addWaiter创建等待节点、acquireQueued在队列内获取锁等操作。

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1 if (compareAndSetState(0, acquires)) { // 2 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 3 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}

1、首先判断锁的状态是否为未锁定状态,

2、如果是,则通过CAS操作,设置锁的状态值为acquires。并设置锁的独占线程为当前线程。

3、如果锁已经被占,则判锁的独占线程是否为当前线程。如果是,则锁的状态值累加acquires。因为ReentrantLock是可重入锁,所以会判断锁的状态值是否溢出,防止某个线程陷入死循环的加锁。

tryAcquire方法会返回获取锁成功或失败,如果成功,那么线程获取到锁。lock方法结束。否则就需要将当前线程入队,进行排队等待获取锁。

acquireQueued实现

acquireQueued方法是在队列内部获取锁的实现,在执行它的逻辑之前, 先要执行addWaiter方法,创建等待节点。

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node;}

NodeAbstractQueuedSynchronizer的内部类。AQS是抽象队列同步器,它在内部维护了一个称之为CLH的双向队列,队列的每个元素是Node,Node中包含了当前线程、等待状态、前驱后继节点等信息。

static final class Node { static final Node SHARED = new Node(); // 表示节点是共享模式 static final Node EXCLUSIVE = null; // 表示节点是独占模式 static final int CANCELLED = 1; // 表示该节点的线程处于取消状态 static final int SIGNAL = -1; // 表示该节点的下一个节点需要被挂起 static final int CONDITION = -2; // 表示该节点等待一个condition唤醒 static final int PROPAGATE = -3; // 表示下一个acquireShared无条件传播 volatile int waitStatus; // 等待状态,如果是Sync节点,默认值为0,如果是condition队列的节点,默认职位CONDITION(-2)。使用CAS进行原子的更新 volatile Node prev; // 指向前驱节点,当前节点依赖它检查状态 volatile Node next; // 指向后继节点,当前节点释放是需要把它唤醒 volatile Thread thread; // 当前Thread Node nextWaiter; // 执行下一个等待condition条件的节点,}

Node的状态说明:

  • SINGLE

    后继节点处于阻塞状态(被挂起),当前节点在释放锁或者取消获取锁后,必须唤醒后继节点。为了避免竞争,acquire方法必须首先表明需要一个single信息,然后原子的去重试,如果重试失败,继续阻塞。

  • CANCELLED

    当前节点由于超时或者中断被取消。所有的节点不会丢弃这个状态,而且,一个处于取消状态的节点永远不会再次进入阻塞。

  • CONDITION

    当前节点处于一个condition队列,不会用于sync队列。

  • PROPAGATE

    releaseShared需要传播到其它节点,这是在doReleaseShared方法中设置的,以确保能传播。

关于prev和next

  • prev

    prev指向前驱节点,当前节点依赖它做状态检查。在入队的时候被赋值,出队的时候被置为null。如果prev节点被取消,需要为当前节点往前找一个未取消的节点,而且一定是可以找到的,因为头节点不可能处于Cancelled状态。一个节点只有成功获取到了锁,才能成为头节点。一个被取消的节点不可能成功的拿到锁,一个线程只能取消自身,而不能取消其它节点。

  • next

    next指向后继节点,当前节点的线程释放锁的时候,需要唤醒它。入队时的时候被赋值,当要避开被取消的节点时需要调整next指向,出队时被置为null。

上面介绍了AQS中Node类的重要属性,我们继续回到addWaiter方法:

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 1 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { // 2 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); // 3 return node;}

addWaiter方法接收一个表示Node模式的操作,这里讲的是ReentrantLock,所以mode是独占模式。

1、首先会创建一个Node节点,thread指向当前线程,mode为独占模式。

2、将tail为节点赋值给pred,如果pred不为null,那么就表示当前等待队列已经被初始化,并且已经设置好了header节点。因为是处于多线程环境,所以会通过CAS的操作去设置尾节点(tail)的值。

3、如果pred为null,也就是尾节点tail为null,那么就表示当前等待队列还未被初始化,那么调用enq方法进行队列的创建。

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 1 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 2 t.next = node; return t; } } }}

这里使用了自旋的操作,在for循环里不断的尝试,把node节点入队。这里使用自旋,是因为是多线程环境,可能存在多个线程都在对队列进行初始化。

1、使用乐观锁进行设置队列的头。注意,队列的头节点没有存储Thread信息,头节点是一个虚拟节点,不参与锁竞争,初始状态值waitStatus=0;

2、如果在第一步设置头节点失败,那么就意味着其它线程在当前线程之前初始化好了队列,那么当前线程就进入队列尾部。

当前线程入队之后,需要在队列内部继续进行获取锁的操作,这个逻辑在acquireQueued方法中。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // 表示获取锁成功与否 try { boolean interrupted = false; // 表示线程是否被中断 for (;;) { final Node p = node.predecessor(); // 当前线程所在节点的前驱节点 if (p == head && tryAcquire(arg)) { // 1 setHead(node);  p.next = null; // help GC failed = false;  return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 2 interrupted = true; } } finally { if (failed) // 3 cancelAcquire(node); }}

1、判断p是否为头节点,前面讲到头节点是一个虚拟节点,不参与锁竞争。然后调用tryAcquire方法尝试获取锁。如果获取锁成功,设置头节点为当前线程的节点,在设置之前,会将node中的Thread信息置为nul。

2、如果上一步获取锁还是失败(因为是非公平锁,有可能比新来的线程抢占了),那么调用shouldParkAfterFailedAcquire方法看是否需要将当前线程挂起。如果不需要挂起,那么继续进入自旋尝试获取锁。如果需要被挂起,那么调用parkAndCheckInterrupt方法将线程挂起,并检测线程是否中断。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 1 return true; if (ws > 0) { // 2 do {  node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 3 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}

1、只有当当前节点的前驱节点的waitStatus=Node.SINGAL时,返回true,表示当前线程需要等待一个唤醒条件,那么当前节点的线程可以挂起。

2、如果waitStatus>0,那么表示当前节点的前驱节点状态为CANCELLED,那么需要为当前节点需要一个非CANCELLED的节点作为前驱节点。

3、表示waitStatus要么是0,要么是PROPAGATE. 表示当前线程需要一个信号,但是先不挂起,在挂起之前再重试一次。

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}

在执行了LockSupport.park(this)这行代码之后,线程就会进入阻塞。要么被唤醒,要么被中断。如果被唤醒,返回false,否则返回true;Thread.interrupted()会返回中断标识并清除中断标识。

我们再回到acquireQueued方法。

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true;}

如果线程被唤醒,那么interrupted还是false,否则会被置为true,表示当前线程被中断,但是当前线程并不退出锁的竞争,而是继续尝试获取锁(不响应中断),在成功获取锁之后,在返回中断标识。

最后在回到acquire方法

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

tryAcquire获取锁失败,并且acquireQueued方法返回true(中断标识为true),则执行selfInterrupt方法,中断线程。

注意,Thread.interrupted方法只是返回中断标识,且会清除中断标识,真正中断线程是调用Thread类的interrupt方法。

cancelAcquire实现

acquireQueued方法在finally里面还有判断failed的逻辑,如果failed为true,那么就会执行cancelAcquire方法,那么来看下该方法做了那么工作。

private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return;
node.thread = null; // 1
Node pred = node.prev; while (pred.waitStatus > 0) // 2 node.prev = pred = pred.prev;
Node predNext = pred.next; // 3
node.waitStatus = Node.CANCELLED; // 4
if (node == tail && compareAndSetTail(node, pred)) { // 5 compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 6 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 7 unparkSuccessor(node); }
node.next = node; // help GC // 8 }}

1、首先将当前节点的thread置为null。

2、然后从当前节点往前找,找到状态不为cancelled的节点,将当前节点的prev指向该节点。

3、取出剔除cancelled状态之后的前驱节点的后继节点,其实predNext指向的是当前节点。

4、将当前节点的waitStatus值置为CANCELLED。

前4步执行完之后,当前节点所处的位置就有三种可能,在头节点的下一个节点、在尾节点、既不是头节点的下一个节点也不是尾节点。5,6,7的是这三种情况的说明。

5、是尾节点,并且通过CAS操作尾节点成功(因为是多线程环境,可能存在新的线程入队了,那么当前节点就不会是尾节点)。然后将pred的next节点置为null。

源码分析ReentrantLock实现原理

6、既不是头节点的后继节点,也不是尾节点。那么需要确保将当前节点的前驱节点状态为SIGNAL,且前驱节点的Thread信息不能为null。取出当前节点的后继节点,赋值给next,如果next不为null且next的状态值不为CANCELLED,那么就设置当前节点的前驱节点的后继节点为next。最后设置将当前节点的后继节点指向自己。

源码分析ReentrantLock实现原理

7、是头节点的后继节点。因为当前节点是头节点的后继节点,当前节点要取消获取锁操作,那么就需要唤醒它的后继节点。调用unparkSuccessor方法。最后设置将当前节点的后继节点指向自己。

private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 如果ws<0将当前node节点的状态值设为0 Node s = node.next; // s指向当前节点的后继节点 if (s == null || s.waitStatus > 0) { // 如果节点为空,或者后继节点也是CANCELLED状态 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // 从尾节点开始往前找,找到第一个状态不为CANCELLED的节点,赋值给s s = t; } if (s != null) // 如果找到的节点不为空,则唤醒该节点的Thread LockSupport.unpark(s.thread); }

源码分析ReentrantLock实现原理

从上面的对节点的处理可以看出,每种情况都不会对操作当前节点的前驱节点。这是因为在前面分析的

shouldParkAfterFailedAcquire方法,判断节点是否需要挂起的逻辑中,如果当前处理的节点的前驱节点状态为CANCELLED,将CANCELLED状态的节点处理掉。这样的话,就降低了cancelAcquire方法的实现复杂度。

非公平锁流程概括

公平锁的实现

ReentrantLock要指定为公平锁,需要在构造函数传入执行公平性参数。

ReentrantLock fairLock = new ReentrantLock(true); // 默认为非公平锁,true表示为公平锁

公平锁的lock方法内部实现是调用了acquire方法。

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

这个和非公平锁一样,但是不同点是tryAcquire方法的实现,在AQS中,该方法抛出一个UnsupportedOperationException,所以要看其子类的具体实现,这里是公平锁,所以看FairSynctryAcquire的实现。

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 2 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 3 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}

1、判断锁的状态值是否是未锁定。

2、判断当前队列里是否存在排队的线程,如果当前线程前没有排队线程,则尝试通过CAS获取锁,如果获取成功,设置锁的独占线程为当前线程。

3、如果锁已经处于锁定状态,则判断占有锁的线程是否为当前线程,如果是当前线程,则对锁的state值累加acquires。表示重入过程。

和非公平锁最重要的区别在于,公平锁在获取锁之前,会判断队列中是否存在排队线程。

hasQueuedPredecessors实现

public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());}

在分析非公平锁入队过程中,我们知道其实在初始化头节点的时候,头节点是一个虚节点,不存储线程信息,只是用于通知下一个线程。所以只有当 h != t 成立,才能说明队列中可能存在等待的线程,具体是否存在就需要看 ((s = h.next) == null || s.thread != Thread.currentThread()) 的判断:

  • 当(s=h.next)==null 成立,说明有线程正在对队列进行初始化,因为队列是双向的,在入队的时候,新node的前驱指针先指向尾节点,在通过cas更新尾节点为新node,最后原尾节点的后继指针指向节点。所以才会出现h.next为空。

  • 如果h.next不为空,那么就需要看h.next节点的线程是不是当前线程。

 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

入队分析:

公平锁的剩余实现就和非公平锁的实现是一样的了,最重要的就是在获取锁之前,公平锁会判断当前队列是否存在排队的线程。

释放锁的实现

在使用ReentrantLock时,需要显示的释放锁,而且一般都是在finally代码块中释放。如果没有正确的释放锁,那么就会产生死锁。

ReentrantLock lock = new ReentrantLock();try { lock.lock(); // do somthing...} finally { lock.unlock();}

下面就从源码的角度分析释放锁的过程。ReentrantLock的lock方法会调用AQS的release方法实现。

public final boolean release(int arg) { if (tryRelease(arg)) { // 1 Node h = head; if (h != null && h.waitStatus != 0) // 2 unparkSuccessor(h); return true; } return false;}

1、首先会调用tryRelease方法更新锁的状态值。tryRelease具体有AQS的子类Sync实现。

protected final boolean tryRelease(int releases) { int c = getState() - releases; // 将锁的状态值减掉对应释放锁传的值 if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断释放锁的线程是不是占有锁的线程,如果不是就抛异常,防止错误的释放锁,所以只有当占用锁了,才能进行释放锁 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 只有当锁的状态值为0时,才表示锁被释放了。ReentrantLock是可重入锁,所以在使用时重入了多少次,就需要释放多少次。 free = true; setExclusiveOwnerThread(null); // 将锁的占有线程置为null } setState(c); // 更新锁的值 return free; }

2、当在队列中获取到锁之后,那么头节点会更新为获取到锁的线程节点,并将线程信息置为null。现在当前线程要释放锁,所以要唤醒它的下一个节点。

unparkSuccessor方法在上面已经分析过了,简单来说就是找到队列中一个状态waitStatus值不为CANCELLED的节点,然后将它唤醒。

可中断的获取锁实现

在获取锁的过程中,如果线程被中断了,线程会感知到,然后退出锁的竞争,并抛出异常。

ReentrantLock lock = new ReentrantLock();lock.lockInterruptibly();

lockInterruptibly方法会调用AQS的acquireInterruptibly实现:

public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg);}

tryAcquire方法在上面已经分析过了,这里不在说明。主要看doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())  throw new InterruptedException(); // 响应中断 } } finally { if (failed) cancelAcquire(node); }}

和非响应中断不同的区别在于,如果线程被中断了,就会抛出一个InterruptedException。当然在抛出异常之前,会执行finally代码块的逻辑,此时failed的值就是true了,那么就会执行cancelAcquire方法,该方法在上面也分析过了。

tryLock的实现

tryLock表示仅仅是try一下,不管能不能获取到锁都会立即返回。带时间参数的表示带时间的尝试,获取到锁立即返回,没有则等待一段时间再返回。

ReentrantLock lock = new ReentrantLock();lock.tryLock();lock.tryLock(1000, TimeUnit.MILLISECONDS);

tryLock方法调用 Sync的nonfairTryAcquire实现。

public boolean tryLock() { return sync.nonfairTryAcquire(1);}

NonfairTryAcquire是非公平锁的获取实现,和非公平锁不同的时候,如果获取失败就返回,不入队。

带时间的tryLock方法调用AQS的tryAcquireNanos方法实现

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);}

当线程被中断,抛出InterruptedException。tryAcquire的实现,要看锁是公平的还是非公平锁,具体就是FairSync和NonFairSync的实现。如果tryAcquire获取失败就看doAcquireNanos能否成功。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) // 如果等待时间已经到了,直接返回false return false; final long deadline = System.nanoTime() + nanosTimeout; // 记录等待截止时间 final Node node = addWaiter(Node.EXCLUSIVE); // 将当前线程入队等待 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime();  if (nanosTimeout <= 0L) // 等待的时间已经到了,返回获取锁失败,failed=true return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 如果需要挂起并且剩余等待时间大于最大自旋时间。就将线程挂起 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) // 如果线程被中断了,那就抛出异常 throw new InterruptedException(); } } finally { if (failed) // 线程超时获取锁失败,或者被中断,那么就要退出队列 cancelAcquire(node); }}

END。