JDK并发源码分析:ReentrantLock原理
ReentrantLock原理
1、类的继承层次
public interface Lock{
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
void unlock();
Condition newCondition();
}
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
}
2、锁的公平性和非公平性
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
3、锁实现的基本原理
-
需要 一个State变量,标记该锁的状态。对State的操作要确保线程安全,也就是会用到CAS。 -
需要 记录当前是那个线程持有锁! -
需要底层 支持对一个线程进行阻塞或唤醒操作! -
需要有 一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
/**
* The synchronization state.
*/
private volatile int state;
}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
}
public class LockSupport{
...
public static void park() {
UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
...
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
...
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;//每个Node关联一个被阻塞的线程
}
private transient volatile Node head;
private transient volatile Node tail;
...
}
4、公平与非公平Lock的实现差异
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//无人持有锁,开始抢锁
if (compareAndSetState(0, acquires)) {
//拿锁成功,设置拥有者为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
//当前线程已经拿到锁了,再次重入,直接累加state变量
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5、阻塞队列与唤醒机制
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果自己在队列头部,则尝试拿锁
if (p == head && tryAcquire(arg)) {
//拿锁成功,出队列,同时把node的thread变量置为NULL
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//自己调用park()阻塞自己
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
//LockSupport 会响应中断!
LockSupport.park(this);
return Thread.interrupted();
}
6、unLock实现分析
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//[1]、释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//[2]、唤醒队列中的后继者
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有所得拥有者才有资格调用unlock函数,否则抛出异常!
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//每调用1次,tryRelase state减1,
// 直至减到0,代表锁可以被释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
7、lockInterruptibly实现分析
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//[!] 检测到被中断直接抛异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
end