Semaphore(信号量) 又是一个并发包里的常用组件。它是一个计数信号量,包含一组许可证,可以用acquire方法阻塞,直到获取一个许可证,可以用release方法释放一个许可证。Semaphore只是对可获取的数量进行维护,并没有真正的创建许可证对象。
使用举例
简单介绍一下Semaphore的使用,举一个LeetCode的题为例
https://leetcode-cn.com/problems/print-in-order/ 我们提供了一个类: public class Foo { public void one() { print(“one”); } public void two() { print(“two”); } public void three() { print(“three”); } } 三个不同的线程将会共用一个 Foo 实例。 线程 A 将会调用 one() 方法 线程 B 将会调用 two() 方法 线程 C 将会调用 three() 方法 请设计修改程序,以确保 two() 方法在 one() 方法之后被执行,three() 方法在 two() 方法之后被执行。
这道题有很多种解法,这里用信号量来实现一种解法
public class Foo {
//既然有三个方法,可以定义一个有3个许可证的信号量,而且必须得是非公平的
private Semaphore semaphore = new Semaphore(3);
public void first(Runnable printFirst) throws InterruptedException {
for(;;) {
//自旋阻塞,当信号量可用凭证为3时,获取一个凭证
if (semaphore.availablePermits() == 3) {
//初始化凭证数量就是3,那么一开始肯定先进入了这个分支中
printFirst.run();
//获取一个凭证,可用凭证数量变为2
semaphore.acquire();
break;
}
}
}
public void second(Runnable printSecond) throws InterruptedException {
for(;;) {
//当first获取了一个凭证后可用数量减少,
//那么此时second可以进入条件分支
if (semaphore.availablePermits() == 2) {
printSecond.run();
semaphore.acquire();
break;
}
}
}
public void third(Runnable printThird) throws InterruptedException {
for(;;) {
//同上
if (semaphore.availablePermits() == 1) {
printThird.run();
semaphore.acquire();
break;
}
}
}
}
上文代码中所使用的acquire方法是阻塞的,若不想阻塞可以使用tryAcquire
事实上Semaphore对于凭证数量的管理也是借鉴了AQS的status属性实现的。
acquire
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
直接调用了AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在acquireSharedInterruptibly方法里首先判断线程是不是被中断了,是的话抛出异常。然后调用了tryAcquireShared方法来尝试获取一个信号量,如果返回值小于0,表示获取失败,将会调用doAcquireSharedInterruptibly方法使当前线程进入队列进行等待。由于这一步存在等待操作,所以方法是阻塞的
tryAcquireShared方法是AQS留给子类实现的模板方法,在Semaphore中有公平和非公平的两种实现,以非公平为例,具体的实现是在Semaphore的内部类Sync的nonfairTryAcquireShared方法中
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
在Semaphore中,AQS的status用来表示当前剩余可用的凭证数量
先把需要获取的数量减下去,然后尝试用CAS的方式去设置。
如果需要获取的数量超过了当前剩余的凭证数量,则方法返回值将小于0,如果设置成功,那么返回值将大于0。在acquireSharedInterruptibly方法分析我们已经提到了,返回值小于0的话会进入doAcquireSharedInterruptibly方法,将当前线程构造成一个node然后进入AQS的队列进行等待。简单的说就是获取失败,则线程阻塞等待。
release
public void release() { sync.releaseShared(1); }
release方法中同样的,直接调用了AQS的releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在AQS的releaseShared方法中,首先调用了模板方法tryReleaseShared来对sttaus进行加操作,如果成功,调用doReleaseShared方法做锁释放的后续操作,例如唤醒后续节点等。
tryReleaseShared方法的实现也是在Semaphore的内部类Sync中
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
在tryReleaseShared方法中,首先将当前的凭证数加上释放的数,然后通过CAS去设置。设置成功则返回true,否则将一直等待直到成功。成功后,如上文所说,会调用doReleaseShared方法进行一些后续的操作。
drainPermits
这个方法的作用是让当前线程获取剩余的所有许可证。实现也很简单,通过CAS将status设置成0
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
tryAcquire
tryAcquire与acquire方法相对应,是acquire的非阻塞实现,调用的方法和acquire一样,唯一不同的是,调用失败以后不会将当前线程加入到队列中等待,而是直接返回
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}