vlambda博客
学习文章列表

Java中的锁(可重入锁/不可重入锁)

前言

    昨天看了下concurrentHashMap的底层实现,里面引出了ReentrantLock---可重入锁以及volatile关键字,那么我们今天就来看一下可重入锁和不可重入锁,volatile关键字将在明天和synchronized、Lock一起进行分析。

    首先我们先简单回顾一下CAS算法,CAS算法即compare and swap(比较和交换),是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步。

CAS算法涉及到三个操作数

  • 需要读写的内存值 V

  • 进行比较的值 A

  • 拟写入的新值 B

当且仅当 V 的值等于 A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作(比较和替换是一个原子操作)。一般情况下是一个自旋操作,即不断的重试。

一.自旋锁

    1.自旋锁的定义

    自旋锁:是指当一个线程在获取锁的时候,如果锁已经被其他线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,知道获取到锁才会退出循环。

    获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。它是为实现保护共享资源而提出的一种锁机制。其实,自旋锁与互斥锁比较类似,它们都是为了解决对某项资源的互斥使用。无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个保持者,也就说,在任何时刻最多只能有一个执行单元获得锁。但是两者在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,”自旋”一词就是因此而得名。

    2.Java如何实现自旋锁?

public class SpinLock {private AtomicReference<Thread> cas = new AtomicReference<Thread>();public void lock() {  Thread current = Thread.currentThread();// 利用CASwhile (!cas.compareAndSet(null, current)) {// DO nothing         }     }public void unlock() {         Thread current = Thread.currentThread();         cas.compareAndSet(current, null);     }}

lock()方法利用的CAS,当第一个线程A获取锁的时候,能够成功获取到,不会进入while循环,如果此时线程A没有释放锁,另一个线程B又来获取锁,此时由于不满足CAS,所以就会进入while循环,不断判断是否满足CAS,直到A线程调用unlock方法释放了该锁。

    3.自旋锁的优缺点

    优点:

  1.  自旋锁不会使线程状态发生切换,一直处于用户态,即线程一直都是active的;不会使线程进入阻塞状态,减少了不必要的上下文切换,执行速度快

  2. 非自旋锁在获取不到锁的时候会进入阻塞状态,从而进入内核态,当获取到锁的时候需要从内核态恢复,需要线程上下文切换。(线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响锁的性能)

    缺点:

  1. 如果某个线程持有锁的时间过长,就会导致其它等待获取锁的线程进入循环等待,消耗CPU。使用不当会造成CPU使用率极高。

  2. 上面Java实现的自旋锁不是公平的,即无法满足等待时间最长的线程优先获取锁。不公平的锁就会存在“线程饥饿”问题。

  4.自旋锁---可重入与不可重入

    文章开始的时候的那段代码,仔细分析一下就可以看出,它是不支持重入的,即当一个线程第一次已经获取到了该锁,在锁释放之前又一次重新获取该锁,第二次就不能成功获取到。

    由于不满足CAS,所以第二次获取会进入while循环等待,而如果是可重入锁,第二次也是应该能够成功获取到的。而且,即使第二次能够成功获取,那么当第一次释放锁的时候,第二次获取到的锁也会被释放,而这是不合理的。为了实现可重入锁,我们需要引入一个计数器,用来记录获取锁的线程数。

public class ReentrantSpinLock {private AtomicReference<Thread> cas = new AtomicReference<Thread>();private int count;public void lock() {  Thread current = Thread.currentThread();if (current == cas.get()) { // 如果当前线程已经获取到了锁,线程数增加一,然后返回  count++;return;  }// 如果没获取到锁,则通过CAS自旋while (!cas.compareAndSet(null, current)) {// DO nothing  }  }public void unlock() {  Thread cur = Thread.currentThread();if (cur == cas.get()) {if (count > 0) {// 如果大于0,表示当前线程多次获取了该锁,释放锁通过count减一来模拟  count--;  } else {// 如果count==0,可以将锁释放,这样就能保证获取锁的次数与释放锁的次数是一致的了。 cas.compareAndSet(cur, null);  }  }  } }

可重入性描述这样的一个问题:一个线程在持有一个锁的时候,它内部能否再次(多次)申请该锁。如果一个线程已经获得了锁,其内部还可以多次申请该锁成功。那么我们就称该锁为可重入锁。ReentrantLock 和synchronized 都是 可重入锁。

二.ReentrantLock(今天的重点)

1.先整体了解一下重入锁ReentrantLock的大体思路?

先通过一张图,了解一下ReentrantLock的关系网(就好比看一个人NB不NB得先看他周围的人和家里的亲戚够不够NB,现在就开始看看ReentrantLock这个“家伙”有多牛逼!),先上图 。

首先ReentrantLock继承了他爸爸的AbstractQueuedSynchronizer的财产,这个人物有什么来历请第三部分AbstractQueuedSynchronizer的原理实现;

然后ReentrantLock还实现了他妈妈的一个目标(这里他爸爸和他妈妈本身没有太大的血缘关系)也就是ReentrantLock他实现的这个Lock接口和ReentrantLock继承的AbstractQueuedSynchronizer这个类本身没有太大关系,但是这都是他自己继承的财产他都可以随便用;

而且ReentrantLock这个人自己手下还有几个子公司(也就是内部类),有一个母公司和两个子公司(这里指的是公平锁和非公平锁的两个字公司)Sync实现了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync两个类继承自Sync,实现了lock方法,然后分别公平抢占和非公平抢占针对tryAcquire有不同的实现。

2.开始梳理ReentrantLock具体实现的逻辑步骤

先看一下ReentrantLock类里面都有什么东西 

Java中的锁(可重入锁/不可重入锁)

重入锁中的公平锁的实现:

(1)首先公平锁对应的逻辑是 ReentrantLock 内部静态类 FairSync,看那一下静态内部类里面有什么 

Java中的锁(可重入锁/不可重入锁)

2)会先从lock方法中去获取锁

final void lock() {// 调用 AQS acquire 获取锁  acquire(1); }

(3)这个acquire();方法是AQS中的方法,因为ReentrantLock继承的是AbstractQueuedSynchronizer.class这个类,而且FairSync这个又是ReentrantLock的内部类,所以就直接可以调用acquire()这个方法,(abstract static class Sync extends AbstractQueuedSynchronizer , ReentrantLock.FairSync.java重入锁中的内部类公平锁又是继承Sync 这个类(Sync .java是ReentrantLock.java中的内部类))

/** * 这个方法也就是lock()方法的关键方法。tryAcquire获得资源,返回true,直接结束。若未获取资源,新建一个节点插入队尾,          @param arg the acquire argument.  This value is conveyed to     * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */public final void acquire(int arg) {if (!tryAcquire(arg) &&//获取资源立刻结束 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//没有被中断过,也结束 selfInterrupt();    }

4)然后是调用FairSync(公平锁中的)tryAcquire(arg)方法,去尝试再次去获取锁

1、解释这个方法是干嘛的hasQueuedPredecessors():

这个方法目的就是是判断是否有其他线程比当前线程在同步队列中等待的时间更长。有的话,返回 true,否则返回 false

具体解释就是:进入队列中会有一个队列可能会有多个正在等待的获取锁的线程节点,可能有Head(头结点)、Node1、Node2、Node3、Tail(尾节点),如果此时Node2节点想要去获取锁,在公平锁中他就会先去判断整个队列中是不是还有比我等待时间更长的节点,如果有,就让他先获取锁,如果没有,那我就获取锁(这里就体会到了公平性),

注意:因为这个Head(头结点)、Node1、Node2、Node3、Tail(尾节点)队列是现今先出队列FIFO的队列,也就是说谁先进来的谁就在前面,也即是谁先进来的等待时间肯定都会比后进来的等待时间较长

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//hasQueuedPredecessors()这个方法就是公平锁和非公平锁的不同之处之一if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {//没有前驱节点并且CAS设置成功  setExclusiveOwnerThread(current);//设置当前线程为独占线程return true; }else if (current == getExclusiveOwnerThread()) {//这里和非公平锁类似int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");                 setState(nextc);return true;             }return false;        }    }

重入锁中的非公平锁的实现:

1)首先非公平锁对应的逻辑是 ReentrantLock 内部静态类 NoFairSync,看那一下静态内部类里面有什么 

Java中的锁(可重入锁/不可重入锁)

final void lock() {/*     * 这里调用直接 CAS 设置 state 变量,如果设置成功,表明加锁成功。这里并没有像公平锁     * 那样调用 acquire 方法让线程进入同步队列进行排队,而是直接调用 CAS 抢占锁。抢占失败     * 再调用 acquire 方法将线程置于队列尾部排队。*/if (compareAndSetState(0, 1))  setExclusiveOwnerThread(Thread.currentThread());else acquire(1);}

(2)也是去获取锁调用acquire()方法

public final void acquire(int arg) {//if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  selfInterrupt();}

(3)然后再去调用tryAcquire(arg)方法,这个方法中和公平锁中的内容就会有些不一样了

protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);}

(4)然后进入到nonfairTryAcquire()方法这里并没有调用hasQueuedPredecessors()这个方法

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();// 获取同步状态   int c = getState();// 如果同步状态 c = 0,表明锁当前没有线程获得,此时可加锁。if (c == 0) {// 调用 CAS 加锁,如果失败,则说明有其他线程在竞争获取锁if (compareAndSetState(0, acquires)) {// 设置当前线程为锁的持有线程  setExclusiveOwnerThread(current);return true;  }  }// 如果当前线程已经持有锁,此处条件为 true,表明线程需再次获取锁,也就是重入else if (current == getExclusiveOwnerThread()) {// 计算重入后的同步状态值,acquires 一般为1int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");// 设置新的同步状态值  setState(nextc);return true;  }return false; }

三.AbstractQueuedSynchronizer的原理实现

1.AQS到底是个什么东西?他能用来干嘛?他的实现原理是怎样的?

  •  首先AQS是一个抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架(其实说白了,就是说当有多个线程时,为了更好的控制并发,可以把AQS看成是一个正在管理挂号买东西排队的管理员,由AQS来进行对这个排队的人员(也就是把线程看成节点)进行判断排队的人有没有权利去买东西)

  • 他其实不是说可以单独来实现一个什么功能,但是有好多重要的功能都是得依靠他的底层支持,才能实现其他的ReentrantLock/Semaphore/CountDownLatch…各种锁机制功能

  • 先不考虑代码,直接先理一下AQS的整个流程大概是个什么样的,心里有点逼数,想看源码,百度一堆这样的源码。

整体流程图 

核心部分图 

实现原理的大概具体步骤如下:

前提须知:

1、首先你要知道的是AQS的锁的类型是分为两种的:第一种是独占式锁的获取和释放,第二种是共享式的锁的释放和获取;

2、第一步首先是用acquire(int arg)方法先拿到这个线程的共享资源的状态,这个状态变量是用volatile来修饰的,只有当获取到的state大于等于0时才表示获取锁成功(重入一次锁状态就加1,释放一次锁状态就减一),若果失败就把当前线程包装成一个node节点入队列(FIFO);

3、第二步就是第一步获取状态小于0的时候说明失败,然后调用的 addWaiter(Node mode)方法把该线程包装成一个节点,为了提高性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾;

4、然后再上一步的基础上,如果尝试把该节点加入队列尾部失败,这里又会去调用 enq(final Node node)方法,(1)先去判断这个队列是不是已经初始化了,如果初始化了就用CAS保证只有一个头结点可以初始化成功,如果没有初始化先初始化再CAS来保证只有一个线程节点创建成功,(2)最后在enq方法中进行无限次的自旋,直到,如果成功会直接使用CAS操作只能返回一个节点(compareAndSetTail(pred, node));

5、操作完了上面的操作之后,就相当于是线程节点已经成功的加入的等待队列中,然后进行的就是挂起当前线程,等待被唤醒。然后调用boolean acquireQueued(final Node node, int arg) ,先把锁的标记为默认为false,然后去判断该节点的前置结点是不是头结点,如果是把当前结点线程用setHead(node);设置为头结点(这里有个小坑,就是只有head头结点才是cpu正在执行的线程节点,后面的节点都是等待线程节点,而且在这个头结点执行的线程过程中头结点是可以中断的),如果设置头结点成功,就把锁标记为设置为true并返回;

6、然后再接着上一个步骤继续判断,如果没有获取锁成功,则进入挂起逻辑,也就是如果没有成功的话就进入下一个方法,node是当前线程的节点,pred是它的前置节点 

boolean shouldParkAfterFailedAcquire(Node pred, Node node),在这个方法判断成功之后,会继续接着5的步骤把锁的标记设置为true,然后如果判断锁的标记是与否,否的话继续 cancelAcquire(node);

7、处理获取锁失败之后的挂起逻辑boolean shouldParkAfterFailedAcquire(Node pred, Node node)的方法,(1)前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起,(2)如果前置节点的waitStatus大于0的话,把当前结点赋给前置结点的下一个结点,如果不大于0 的话,使用CAS的compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 最后挂起的状态改为false,它是用来判断当前节点是否可以被挂起,也就是唤醒条件是否已经具备,即如果挂起了,那一定是可以由其他线程来唤醒的。该方法如果返回false,即挂起条件没有完备,那就会重新执行acquireQueued方法的循环体,进行重新判断,如果返回true,那就表示万事俱备;

8、这里继续执行步骤5的 void cancelAcquire(Node node) 方法,拿到当前失败线程节点的等待状态是不是小于0,大于的话直接Node.CANCELLED;赋值给正在遍历的线程节点的waitStatus 中,然后继续判断当前节点是不是尾节点,是的话使用CAS操作compareAndSetNext(pred, predNext, null);把节点设置为空,若不是尾节点的话,当大于0的时候跳出循环,继续如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点compareAndSetNext(pred, predNext, next);

9、最后是释放锁,先去过去当前节点的waitStatus,然后如果waitStatus小于0尝试去释放锁使用compareAndSetWaitStatus(node, ws, 0)CAS操作,然后去判断如果当前线程节点的下一个节点,如果发现节点的waitStatus 小于0,就说明找到了待唤醒的节点,然后不为空的时候,就去唤醒该节点。


以上内容参考自:

1.https://blog.csdn.net/qq_34337272/article/details/81252853

2.https://blog.csdn.net/qq_36520235/article/details/81669831

3.https://blog.csdn.net/qq_36520235/article/details/81263037