源码分析ReentrantLock实现原理
ReentrantLock基本用法
ReentrantLock重入锁,区别于Synchronized关键,ReentrantLock是在Java Api层面上的锁实现。和Synchronized关键相比,ReentrantLock提供了更高级的功能,例如指定是否为公平锁、限时等待、响应中断等。
常用方法
ReentrantLock lock = new ReentrantLock(); // 构造函数,默认非公平锁;如要指定公平,传入true
lock.lock(); // 获取锁,如果没有获取成功,则进入等待
lock.unlock(); // 释放锁,需要事先获取到锁
lock.lockInterruptibly(); // 可中断的获取锁,需要接收一个中断通知
lock.tryLock(); // 尝试获取锁,获取成功返回true,失败立即返回false,不进行等待
lock.tryLock(1000, TimeUnit.MILLISECONDS); // 带时间的尝试,获取失败,等待指定时间
lock.isLocked(); // 判断该锁失败是锁定状态,state!=0
lock.isHeldByCurrentThread(); // 判断锁是否被当前线程持有 exclusiveOwnerThread == Thread.currentThread
lock.getHoldCount(); // 锁进入的次数,获取的是state的值,每进入一次,state+1
lock.isFair(); // 是否为公平
类结构
ReentrantLock的源码实现
下面将从非公平锁、公平锁、释放锁等实现源码来分析。
非公平锁获取锁的实现
ReentrantLock的默认实现是非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
所以非公平锁的lock
方法实现,在NonfairSync
类中。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
NonfairSync
的lock
方法实现中,首先会通过一个CAS的操作设置state的值,如果此时刚好有线程释放锁且当前线程仍然持有cpu时间片,那么就会进行插队(非公平性)拿到这边锁,并将锁的独占线程设置为当前线程。如果获取锁失败,则通过acquire
方法再次进行尝试。
acquire实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire
方法有好几个逻辑,包括tryAcquire
尝试获取锁、addWaiter
创建等待节点、acquireQueued
在队列内获取锁等操作。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 1
if (compareAndSetState(0, acquires)) { // 2
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 3
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1、首先判断锁的状态是否为未锁定状态,
2、如果是,则通过CAS操作,设置锁的状态值为acquires。并设置锁的独占线程为当前线程。
3、如果锁已经被占,则判锁的独占线程是否为当前线程。如果是,则锁的状态值累加acquires。因为ReentrantLock是可重入锁,所以会判断锁的状态值是否溢出,防止某个线程陷入死循环的加锁。
tryAcquire
方法会返回获取锁成功或失败,如果成功,那么线程获取到锁。lock
方法结束。否则就需要将当前线程入队,进行排队等待获取锁。
acquireQueued实现
acquireQueued
方法是在队列内部获取锁的实现,在执行它的逻辑之前, 先要执行addWaiter
方法,创建等待节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node
是AbstractQueuedSynchronizer
的内部类。AQS是抽象队列同步器,它在内部维护了一个称之为CLH的双向队列,队列的每个元素是Node,Node中包含了当前线程、等待状态、前驱后继节点等信息。
static final class Node {
static final Node SHARED = new Node(); // 表示节点是共享模式
static final Node EXCLUSIVE = null; // 表示节点是独占模式
static final int CANCELLED = 1; // 表示该节点的线程处于取消状态
static final int SIGNAL = -1; // 表示该节点的下一个节点需要被挂起
static final int CONDITION = -2; // 表示该节点等待一个condition唤醒
static final int PROPAGATE = -3; // 表示下一个acquireShared无条件传播
volatile int waitStatus; // 等待状态,如果是Sync节点,默认值为0,如果是condition队列的节点,默认职位CONDITION(-2)。使用CAS进行原子的更新
volatile Node prev; // 指向前驱节点,当前节点依赖它检查状态
volatile Node next; // 指向后继节点,当前节点释放是需要把它唤醒
volatile Thread thread; // 当前Thread
Node nextWaiter; // 执行下一个等待condition条件的节点,
}
Node的状态说明:
SINGLE
后继节点处于阻塞状态(被挂起),当前节点在释放锁或者取消获取锁后,必须唤醒后继节点。为了避免竞争,acquire方法必须首先表明需要一个single信息,然后原子的去重试,如果重试失败,继续阻塞。
CANCELLED
当前节点由于超时或者中断被取消。所有的节点不会丢弃这个状态,而且,一个处于取消状态的节点永远不会再次进入阻塞。
CONDITION
当前节点处于一个condition队列,不会用于sync队列。
PROPAGATE
releaseShared需要传播到其它节点,这是在
doReleaseShared
方法中设置的,以确保能传播。
关于prev和next
prev
prev指向前驱节点,当前节点依赖它做状态检查。在入队的时候被赋值,出队的时候被置为null。如果prev节点被取消,需要为当前节点往前找一个未取消的节点,而且一定是可以找到的,因为头节点不可能处于Cancelled状态。一个节点只有成功获取到了锁,才能成为头节点。一个被取消的节点不可能成功的拿到锁,一个线程只能取消自身,而不能取消其它节点。
next
next指向后继节点,当前节点的线程释放锁的时候,需要唤醒它。入队时的时候被赋值,当要避开被取消的节点时需要调整next指向,出队时被置为null。
上面介绍了AQS中Node类的重要属性,我们继续回到addWaiter
方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 1
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 2
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 3
return node;
}
addWaiter
方法接收一个表示Node模式的操作,这里讲的是ReentrantLock,所以mode是独占模式。
1、首先会创建一个Node节点,thread指向当前线程,mode为独占模式。
2、将tail为节点赋值给pred,如果pred不为null,那么就表示当前等待队列已经被初始化,并且已经设置好了header节点。因为是处于多线程环境,所以会通过CAS的操作去设置尾节点(tail)的值。
3、如果pred为null,也就是尾节点tail为null,那么就表示当前等待队列还未被初始化,那么调用enq
方法进行队列的创建。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 1
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 2
t.next = node;
return t;
}
}
}
}
这里使用了自旋的操作,在for循环里不断的尝试,把node节点入队。这里使用自旋,是因为是多线程环境,可能存在多个线程都在对队列进行初始化。
1、使用乐观锁进行设置队列的头。注意,队列的头节点没有存储Thread信息,头节点是一个虚拟节点,不参与锁竞争,初始状态值waitStatus=0;
2、如果在第一步设置头节点失败,那么就意味着其它线程在当前线程之前初始化好了队列,那么当前线程就进入队列尾部。
当前线程入队之后,需要在队列内部继续进行获取锁的操作,这个逻辑在acquireQueued
方法中。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 表示获取锁成功与否
try {
boolean interrupted = false; // 表示线程是否被中断
for (;;) {
final Node p = node.predecessor(); // 当前线程所在节点的前驱节点
if (p == head && tryAcquire(arg)) { // 1
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 2
interrupted = true;
}
} finally {
if (failed) // 3
cancelAcquire(node);
}
}
1、判断p是否为头节点,前面讲到头节点是一个虚拟节点,不参与锁竞争。然后调用tryAcquire
方法尝试获取锁。如果获取锁成功,设置头节点为当前线程的节点,在设置之前,会将node中的Thread信息置为nul。
2、如果上一步获取锁还是失败(因为是非公平锁,有可能比新来的线程抢占了),那么调用shouldParkAfterFailedAcquire
方法看是否需要将当前线程挂起。如果不需要挂起,那么继续进入自旋尝试获取锁。如果需要被挂起,那么调用parkAndCheckInterrupt
方法将线程挂起,并检测线程是否中断。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 1
return true;
if (ws > 0) { // 2
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 3
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1、只有当当前节点的前驱节点的waitStatus=Node.SINGAL时,返回true,表示当前线程需要等待一个唤醒条件,那么当前节点的线程可以挂起。
2、如果waitStatus>0,那么表示当前节点的前驱节点状态为CANCELLED,那么需要为当前节点需要一个非CANCELLED的节点作为前驱节点。
3、表示waitStatus要么是0,要么是PROPAGATE. 表示当前线程需要一个信号,但是先不挂起,在挂起之前再重试一次。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在执行了LockSupport.park(this)
这行代码之后,线程就会进入阻塞。要么被唤醒,要么被中断。如果被唤醒,返回false,否则返回true;Thread.interrupted()
会返回中断标识并清除中断标识。
我们再回到acquireQueued
方法。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
如果线程被唤醒,那么interrupted还是false,否则会被置为true,表示当前线程被中断,但是当前线程并不退出锁的竞争,而是继续尝试获取锁(不响应中断),在成功获取锁之后,在返回中断标识。
最后在回到acquire
方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在tryAcquire
获取锁失败,并且acquireQueued
方法返回true(中断标识为true),则执行selfInterrupt
方法,中断线程。
注意,Thread.interrupted
方法只是返回中断标识,且会清除中断标识,真正中断线程是调用Thread类的interrupt
方法。
cancelAcquire实现
acquireQueued
方法在finally里面还有判断failed的逻辑,如果failed为true,那么就会执行cancelAcquire
方法,那么来看下该方法做了那么工作。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 1
Node pred = node.prev;
while (pred.waitStatus > 0) // 2
node.prev = pred = pred.prev;
Node predNext = pred.next; // 3
node.waitStatus = Node.CANCELLED; // 4
if (node == tail && compareAndSetTail(node, pred)) { // 5
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // 6
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else { // 7
unparkSuccessor(node);
}
node.next = node; // help GC // 8
}
}
1、首先将当前节点的thread置为null。
2、然后从当前节点往前找,找到状态不为cancelled的节点,将当前节点的prev指向该节点。
3、取出剔除cancelled状态之后的前驱节点的后继节点,其实predNext指向的是当前节点。
4、将当前节点的waitStatus值置为CANCELLED。
前4步执行完之后,当前节点所处的位置就有三种可能,在头节点的下一个节点、在尾节点、既不是头节点的下一个节点也不是尾节点。5,6,7的是这三种情况的说明。
5、是尾节点,并且通过CAS操作尾节点成功(因为是多线程环境,可能存在新的线程入队了,那么当前节点就不会是尾节点)。然后将pred的next节点置为null。
6、既不是头节点的后继节点,也不是尾节点。那么需要确保将当前节点的前驱节点状态为SIGNAL,且前驱节点的Thread信息不能为null。取出当前节点的后继节点,赋值给next,如果next不为null且next的状态值不为CANCELLED,那么就设置当前节点的前驱节点的后继节点为next。最后设置将当前节点的后继节点指向自己。
7、是头节点的后继节点。因为当前节点是头节点的后继节点,当前节点要取消获取锁操作,那么就需要唤醒它的后继节点。调用unparkSuccessor
方法。最后设置将当前节点的后继节点指向自己。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 如果ws<0将当前node节点的状态值设为0
Node s = node.next; // s指向当前节点的后继节点
if (s == null || s.waitStatus > 0) { // 如果节点为空,或者后继节点也是CANCELLED状态
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 从尾节点开始往前找,找到第一个状态不为CANCELLED的节点,赋值给s
s = t;
}
if (s != null) // 如果找到的节点不为空,则唤醒该节点的Thread
LockSupport.unpark(s.thread);
}
从上面的对节点的处理可以看出,每种情况都不会对操作当前节点的前驱节点。这是因为在前面分析的
shouldParkAfterFailedAcquire
方法,判断节点是否需要挂起的逻辑中,如果当前处理的节点的前驱节点状态为CANCELLED,将CANCELLED状态的节点处理掉。这样的话,就降低了cancelAcquire
方法的实现复杂度。
非公平锁流程概括
公平锁的实现
ReentrantLock要指定为公平锁,需要在构造函数传入执行公平性参数。
ReentrantLock fairLock = new ReentrantLock(true); // 默认为非公平锁,true表示为公平锁
公平锁的lock方法内部实现是调用了acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个和非公平锁一样,但是不同点是tryAcquire
方法的实现,在AQS中,该方法抛出一个UnsupportedOperationException,所以要看其子类的具体实现,这里是公平锁,所以看FairSync
的tryAcquire
的实现。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { // 2
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 3
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1、判断锁的状态值是否是未锁定。
2、判断当前队列里是否存在排队的线程,如果当前线程前没有排队线程,则尝试通过CAS获取锁,如果获取成功,设置锁的独占线程为当前线程。
3、如果锁已经处于锁定状态,则判断占有锁的线程是否为当前线程,如果是当前线程,则对锁的state值累加acquires。表示重入过程。
和非公平锁最重要的区别在于,公平锁在获取锁之前,会判断队列中是否存在排队线程。
hasQueuedPredecessors实现
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在分析非公平锁入队过程中,我们知道其实在初始化头节点的时候,头节点是一个虚节点,不存储线程信息,只是用于通知下一个线程。所以只有当 h != t 成立,才能说明队列中可能存在等待的线程,具体是否存在就需要看 ((s = h.next) == null || s.thread != Thread.currentThread()) 的判断:
当(s=h.next)==null 成立,说明有线程正在对队列进行初始化,因为队列是双向的,在入队的时候,新node的前驱指针先指向尾节点,在通过cas更新尾节点为新node,最后原尾节点的后继指针指向节点。所以才会出现h.next为空。
如果h.next不为空,那么就需要看h.next节点的线程是不是当前线程。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
入队分析:
公平锁的剩余实现就和非公平锁的实现是一样的了,最重要的就是在获取锁之前,公平锁会判断当前队列是否存在排队的线程。
释放锁的实现
在使用ReentrantLock时,需要显示的释放锁,而且一般都是在finally代码块中释放。如果没有正确的释放锁,那么就会产生死锁。
ReentrantLock lock = new ReentrantLock();
try {
lock.lock();
// do somthing...
} finally {
lock.unlock();
}
下面就从源码的角度分析释放锁的过程。ReentrantLock的lock方法会调用AQS的release方法实现。
public final boolean release(int arg) {
if (tryRelease(arg)) { // 1
Node h = head;
if (h != null && h.waitStatus != 0) // 2
unparkSuccessor(h);
return true;
}
return false;
}
1、首先会调用tryRelease方法更新锁的状态值。tryRelease具体有AQS的子类Sync实现。
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 将锁的状态值减掉对应释放锁传的值
if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断释放锁的线程是不是占有锁的线程,如果不是就抛异常,防止错误的释放锁,所以只有当占用锁了,才能进行释放锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 只有当锁的状态值为0时,才表示锁被释放了。ReentrantLock是可重入锁,所以在使用时重入了多少次,就需要释放多少次。
free = true;
setExclusiveOwnerThread(null); // 将锁的占有线程置为null
}
setState(c); // 更新锁的值
return free;
}
2、当在队列中获取到锁之后,那么头节点会更新为获取到锁的线程节点,并将线程信息置为null。现在当前线程要释放锁,所以要唤醒它的下一个节点。
unparkSuccessor
方法在上面已经分析过了,简单来说就是找到队列中一个状态waitStatus值不为CANCELLED的节点,然后将它唤醒。
可中断的获取锁实现
在获取锁的过程中,如果线程被中断了,线程会感知到,然后退出锁的竞争,并抛出异常。
ReentrantLock lock = new ReentrantLock();
lock.lockInterruptibly();
lockInterruptibly
方法会调用AQS的acquireInterruptibly
实现:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
tryAcquire
方法在上面已经分析过了,这里不在说明。主要看doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 响应中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}
和非响应中断不同的区别在于,如果线程被中断了,就会抛出一个InterruptedException
。当然在抛出异常之前,会执行finally代码块的逻辑,此时failed的值就是true了,那么就会执行cancelAcquire
方法,该方法在上面也分析过了。
tryLock的实现
tryLock
表示仅仅是try一下,不管能不能获取到锁都会立即返回。带时间参数的表示带时间的尝试,获取到锁立即返回,没有则等待一段时间再返回。
ReentrantLock lock = new ReentrantLock();
lock.tryLock();
lock.tryLock(1000, TimeUnit.MILLISECONDS);
tryLock
方法调用 Sync的nonfairTryAcquire
实现。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
NonfairTryAcquire
是非公平锁的获取实现,和非公平锁不同的时候,如果获取失败就返回,不入队。
带时间的tryLock方法调用AQS的tryAcquireNanos
方法实现
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
当线程被中断,抛出InterruptedException
。tryAcquire的实现,要看锁是公平的还是非公平锁,具体就是FairSync和NonFairSync的实现。如果tryAcquire获取失败就看doAcquireNanos
能否成功。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) // 如果等待时间已经到了,直接返回false
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 记录等待截止时间
final Node node = addWaiter(Node.EXCLUSIVE); // 将当前线程入队等待
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 等待的时间已经到了,返回获取锁失败,failed=true
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 如果需要挂起并且剩余等待时间大于最大自旋时间。就将线程挂起
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted()) // 如果线程被中断了,那就抛出异常
throw new InterruptedException();
}
} finally {
if (failed) // 线程超时获取锁失败,或者被中断,那么就要退出队列
cancelAcquire(node);
}
}
END。