源码分析:Semaphore之信号量
简介
Semaphore 又名计数信号量,从概念上来讲,信号量初始并维护一定数量的许可证,使用之前先要先获得一个许可,用完之后再释放一个许可。信号量通常用于限制线程的数量来控制访问某些资源,从而达到单机限流的目的,比如SpringCloud 中的Zuul 组件用的是 Hystrix 的信号量(semaphore)隔离模式。
源码分析
重要的内部类
Semaphore 和 ReentrantLock 内部类完全相似, 有3个重要的内部类,分别也是 Sync
、NonfairSync
和FairSync
;
Sync 是后面两个的父类,继承至AbstractQueuedSynchronizer(AQS)
NonfairSync和FairSync都继承至Sync
NonfairSync 主要用于实现非公平锁,FairSync 主要用于实现公平锁
如果你看了前面几天关于锁的源码分析,是不是发现它们的套路都差不多呢?
重要的属性
和 ReentrantLock 也完全一样,只有一个重要的属性,同步器sync:
private final Sync sync;
两个构造方法
// ①指定初始许可证数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// ②指定初始许可证数量和公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
两个构造方法最后都是初始化许可证数量,调用的也就是同步器里面的构造方法来初始化AQS 里面的state字段
// Sync 的构造方法
Sync(int permits) {
setState(permits);
}
// AQS 中的代码
protected final void setState(int newState) {
state = newState;
}
获取许可:acquire()
默认每次获得1个许可,如果没有可用的许可证会阻塞线程,或者被中断抛出异常。
源码分析:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 默认每次获得1个许可
}
acquireSharedInterruptibly(1)
会调用 AQS 里面的方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 线程被中断,抛出异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // tryAcquireShared 尝试获得许可,返回小于0 表示没有获得许可
doAcquireSharedInterruptibly(arg); // 没有获得许可,排队阻塞
}
tryAcquireShared(arg)
方法:
tryAcquireShared 有两种实现,也就是 FairSync(公平模式) 和 NonfairSync(非公平模式) 不同实现。
公平模式的实现代码
FairSync.tryAcquireShared
:// acquires
protected int tryAcquireShared(int acquires) {
for (;;) { // 自旋
if (hasQueuedPredecessors()) // 检查是否有更早的线程在排队获得许可
return -1; // 有排队的线程,返回-1,小于0表示获得许可失败
int available = getState(); // 获得可用许可数
int remaining = available - acquires; // 减去一个许可,计算剩余的许可数
if (remaining < 0 || compareAndSetState(available, remaining))
// remaining < 0 成立的话就说明获取许可失败了,出去也要排队阻塞线程
return remaining;
}
}非公平模式的实现代码NonfairSync.tryAcquireShared:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); // 调用父类Sync里面的实现方法
}
// 父类Sync里面的实现方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋
int available = getState(); // 获得可用许可数
int remaining = available - acquires; // 减去一个许可,计算剩余的许可数
if (remaining < 0 || compareAndSetState(available, remaining))
// remaining < 0 成立的话就说明获取许可失败了,出去也要排队阻塞线程
return remaining;
}
}有没有发现他们的代码非常相识?公平模式的实现就只是比非公平模式多了一个
hasQueuedPredecessors()
方法调用判断,这个方法主要就是检查排队的队列里面是不是还有其他线程。在之前分析ReentrantLock 源码的文章中也有提到。
如果tryAcquireShared
方法没有获得许可(返回值小于0),就会进入到AQS 的 doAcquireSharedInterruptibly 方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 为当前线程创建排队节点,并加入到队列
// addWaiter方法的分析在之前的AQS分析文章已经分析过了
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { // 自旋,尝试获得许可,阻塞线程,唤醒后继续获得许可
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获得许可
if (r >= 0) { // 获得许可
setHeadAndPropagate(node, r); // 设置排队的头节点
p.next = null; // help GC
failed = false;
return; // 线程获得许可,退出
}
}
// shouldParkAfterFailedAcquire 如果线程应阻塞,则返回true
// 之前的AQS分析文章已经分析过了
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 被中断了,抛出异常
throw new InterruptedException();
}
} finally {
if (failed) // 节点被取消
cancelAcquire(node);
}
}
获得许可总结:
获得许可就是对初始化的许可证进行减1,直到没有许可证了就会进入到队列排队阻塞线程
公平模式下,会去看排队的队列是否有更早的线程在排队获取
非公平模式下,不会去检查排队队列
释放许可:acquire()
默认释放一个许可
public void release() {
sync.releaseShared(1); // 释放一个许可
}
调用的还是AQS框架里面的代码实现:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // tryReleaseShared 是信号量自己实现的
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared 方法实现:
说明一下,这个释放许可的实现,公平模式和非公平模式都是调用的同一个实现。
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 自旋
int current = getState(); //当前可用的许可
int next = current + releases; // 加上释放的许可
if (next < current) // 以防你传个负的树过来释放
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // CAS 修改,成功就是释放成功,失败的话继续自旋
return true;
}
}
释放许可总结:
释放许可就是把开始获得的许可还回去
用到CAS来修改许可证数,用自旋来保证一定会还回去(直到还成功为止)
其他API方法
Semaphore 还有其他的很多API可以调用,但其实源码都差不多,所以这里就不继续分析了,如果你把我之前分析AQS、ReentrantLock、ReentrantReadWriteLock的源码文章也看了,你就会发现这个Semaphore 的源码读起来非常简单了,这里再简单说下其他API的作用。
void acquire(int permits)
和上面分析的acquire()功能一样,只不过你可以指定获取许可数,源码在减的时候就不是减1了,在释放的时候也要注意,最好保持一致。
被中断会抛出异常void acquireUninterruptibly()
Uninterruptibly(),和 acquire() 方法的唯一区别就是线程被中断了也不会抛出异常,其他完全一致void acquireUninterruptibly(int permits)
被中断不抛出异常,指定每次获取许可的数量boolean tryAcquire()
只会尝试一次获得许可,获得成功了就返回true,失败了不会去排队阻塞线程。
还有几个带参数的,意思都差不多。int availablePermits()
返回可用的许可数void release(int permits)
一次释放指定的许可数
Semaphore 总结
Semaphore 也是基于AQS框架来实现的
Semaphore 也有公平和非公平之说,公平就是在获取许可之前会先看一下队列是否有其他线程在排队
Semaphore 的初始信号量必须指定,如果是1的话,功能就相当于一个互斥锁了
Semaphore 支持重入获得许可,但是这里要注意的是,如果一个线程先获得了许可,没释放又来获得许可,这时候许可数不足的情况下,当前线程会被阻塞,有可能会死锁。
如果这篇文章没看懂,可以先去看看的之前关于AQS(AQS分析文章里面有一个自己实现的共享锁,和这里的信号量非常相似)、ReentrantLock和RRWLock源码分析的文章,所有文章看完,保证你一懂百懂,奥利给。