vlambda博客
学习文章列表

JDK并发源码分析:ReentrantLock原理

JDK并发源码分析:ReentrantLock原理

ReentrantLock原理

JDK并发源码分析:ReentrantLock原理

1、类的继承层次

    如下为Concurrent包中与互斥锁(ReentrantLock)相关类的继承层次
JDK并发源码分析:ReentrantLock原理
    Lock是一个接口,其定义如下:
  
    
    
  
public interface Lock{   void lock();   void lockInterruptibly() throws InterruptedException;   boolean tryLock();   boolean tryLock(long time, TimeUnit unit)    throws InterruptedException;   void unlock();   Condition newCondition(); }
    ReentrantLock本身没有代码逻辑, 核心实现在其内部类Sync中。
  
    
    
  
public class ReentrantLock implements Lockjava.io.Serializable { private final Sync sync; public void lock() { sync.lock(); } public void unlock() { sync.release(1);     } }
JDK并发源码分析:ReentrantLock原理

2、锁的公平性和非公平性

    Sync是一个抽象类,它有两个子类FairSync与NonFairSync,分别对应公平锁和非公平锁。从ReentrantLock的构造函数可以看出, 默认为非公平锁,可执行为公平锁!
  
    
    
  
public ReentrantLock() { sync = new NonfairSync();     } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
    公平锁就是 按照先来后到的顺序拿锁,非公平就是 直接插队到前面抢锁。不同于现实生活,这里 默认为非公平锁,其实是为了提高效率,减少线程切换!
JDK并发源码分析:ReentrantLock原理

3、锁实现的基本原理

    Sync的父类为 Abstract Queued Synchronizer,简称为(AQS), 被称作队列同步器,这个类非常关键!
    ReentranLock锁具备Synchronized功能,即可以阻塞一个线程。为此它需要有以下几个功能:
  1. 需要 一个State变量,标记该锁的状态。对State的操作要确保线程安全,也就是会用到CAS。
  2. 需要 记录当前是那个线程持有锁!
  3. 需要底层 支持对一个线程进行阻塞或唤醒操作!
  4. 需要有 一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS。
     针对1和2, AbstractQueuedSynchronizer及其父类已经体现:
  
    
    
  
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{      /** * The synchronization state. */ private volatile int state; }
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { /** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread; }
    这里需要说明的是,s tate不仅可以是0(无),1(有),为了支持锁的可重入性可以大于1,比如同一个线程调用了五次lock,state就会为5,然后调用了五次unlock,state就会减为0!
     针对3,在 UnSate类中,提供了阻塞或唤醒线程的一对操作原语也就是park/upark。LockSupport对这一原语进行了简单封装!
  
    
    
  
public class LockSupport{   ...     public static void park() { UNSAFE.park(false, 0L); }     public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }   ... }
    在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread thread),即传入一个被阻塞的线程,就可以 精确唤醒被阻塞的线程!
     针对4,AQS中利用双向链表和CAS实现了一个阻塞队列,如下所示:
  
    
    
  
public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer {     ...     static final class Node {      volatile Node prev;       volatile Node next;       volatile Thread thread;//每个Node关联一个被阻塞的线程     }      private transient volatile Node head;      private transient volatile Node tail;      ...  }
     阻塞队列是整个AQS核心中的核心,head指向双向链表的头部,tail指向双向链表的尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把Head向后移一个位置!
JDK并发源码分析:ReentrantLock原理
    初始的时候,head=tail=NULL,然后往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向整个空Node,之后,在后面加入被阻塞的线程对象。所以, 当head=tail的时候,说明队列为空!
JDK并发源码分析:ReentrantLock原理

4、公平与非公平Lock的实现差异

    非公平锁一上来就尝试修改state值,也就是抢锁,不考虑队列中有没有其他线程在排队,是非公平的!而公平锁不抢,而是通过acquire方法去拿锁。
  
    
    
  
    static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);         } }       static final class FairSync extends Sync { final void lock() { acquire(1); }     }
    acquire是AQS的一个模板方法,如下所示:
  
    
    
  
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
    tryAcquire()再次尝试拿锁,被NonFairSync和FairSync分别实现!acquireQueued()目的是把线程放入阻塞队列,然后阻塞该线程!
  
    
    
  
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {             //无人持有锁,开始抢锁 if (compareAndSetState(0, acquires)) {                   //拿锁成功,设置拥有者为当前线程 setExclusiveOwnerThread(current); return true; } }             //当前线程已经拿到锁了,再次重入,直接累加state变量 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
    对于公平锁来说,仅仅是多了一行if判断,即!hasQueuedPredecessors(),就是 当c==0的时候,并且排在队列的第1个时,才去抢锁,否则继续排队,这才叫公平!
  
    
    
  
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
JDK并发源码分析:ReentrantLock原理

5、阻塞队列与唤醒机制

    下面进入锁的最关键部分,即acquireQueued(..)函数内部一探究竟!
  
    
    
  
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
    先说addWaiter(..)函数,就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注意的是,只是把Thread对象放入到一个队列中,线程并未阻塞。
  
    
    
  
private Node addWaiter(Node mode) {         Node node = new Node(Thread.currentThread(), mode);
Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
    在addWaiter()函数把Thread对象加入阻塞队列之后的工作就要靠acquire Queued()函数完成。 线程一旦进入accquireQueued()就会被无限期阻塞,即使有其他线程调用interrupt()函数也不能将其唤醒, 除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(..)返回。
  
    
    
  
final boolean acquireQueued(final Node node, long arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();                 //如果自己在队列头部,则尝试拿锁 if (p == head && tryAcquire(arg)) {                 //拿锁成功,出队列,同时把node的thread变量置为NULL setHead(node); p.next = null; // help GC failed = false; return interrupted; }                 //自己调用park()阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    首先,acquireQueued函数有一个返回值,表示什么意思呢?虽然该函数不会中断响应,但是他 会记录被阻塞期间有没有其他线程向他发送中断信号,如果有,则该函数返回true,否则返回false。
    阻塞发生在下面这个函数中:
  
    
    
  
    private final boolean parkAndCheckInterrupt()             //LockSupport 会响应中断!            LockSupport.park(this);             return Thread.interrupted();     }
    LockSupport.park(this);函数返回只有两种情况,一种是 其他线程调用了LockSupport.unpark()另一种是 其他线程调用了当前线程的t.interrupt(),也就是说 LockSupport.park(this)会响应中断
    也正是 因为LockSupport.park()可能被中断唤醒,acquireQueued(..)函数才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。
    被唤醒之后, 通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。
JDK并发源码分析:ReentrantLock原理

6、unLock实现分析

    对于 unLock来说并不区分公平还是非公平。
  
    
    
  
public void unlock() { sync.release(1); } public final boolean release(int arg) {         //[1]、释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0)         //[2]、唤醒队列中的后继者 unparkSuccessor(h); return true; } return false; }     protected final boolean tryRelease(int releases) { int c = getState() - releases;             //只有所得拥有者才有资格调用unlock函数,否则抛出异常! if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;             //每调用1次,tryRelase state减1,             // 直至减到0,代表锁可以被释放 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free;     }           AQS private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0)             compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }     
    unlock的例子还是比较简单的。
JDK并发源码分析:ReentrantLock原理

7、lockInterruptibly实现分析

    我们再来扩展一下 lock 不能被中断,l ockInterruptibly()可以被中断,原因是检测到被中断直接抛异常。
  
    
    
  
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; }                 //[!] 检测到被中断直接抛异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
    明白了accquireQueued(..)原理,此处就很简单了。当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号, 直接抛出InterruptedException,跳出for循环,整个函数返回。

end



JDK并发源码分析:ReentrantLock原理

JDK并发源码分析:ReentrantLock原理

JDK并发源码分析:ReentrantLock原理
JDK并发源码分析:ReentrantLock原理