引言
Semaphore实现了对资源的限定数量的访问。它维护了指定数量的许可证,只有拿到许可证的线程才能继续执行。也就是最多同时访问资源的线程数量不能超过许可证的数量。它的内部实现也是基于AQS的。这篇文章,我们来看它的实现。
一个例子
先看一个使用Semaphore的例子:
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(6);
public static void main(String[] args) {
for(int i=0;i<15;i++){
Thread thread = new Thread(new SemaphoreTask(),"thread_"+i);
thread.start();
}
}
static class SemaphoreTask implements Runnable{
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("线程"+Thread.currentThread().getName()+"获得了许可证");
Thread.sleep(Integer.parseInt(Thread.currentThread().getName().replace("thread_",""))*1000);
System.out.println("线程"+Thread.currentThread().getName()+"释放了许可证");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在这个例子中,Semaphore维护了6个许可证,但是同时想访问资源的线程有15个。这样最终能同时执行的线程只有6个,每个线程释放许可证之后,另外一个线程就能拿到,运行结果如下:
线程thread_1获得了许可证
线程thread_3获得了许可证
线程thread_2获得了许可证
线程thread_0获得了许可证
线程thread_5获得了许可证
线程thread_4获得了许可证
线程thread_0释放了许可证
线程thread_6获得了许可证
线程thread_1释放了许可证
线程thread_7获得了许可证
线程thread_2释放了许可证
线程thread_8获得了许可证
线程thread_3释放了许可证
线程thread_9获得了许可证
线程thread_4释放了许可证
线程thread_10获得了许可证
线程thread_5释放了许可证
线程thread_11获得了许可证
线程thread_6释放了许可证
线程thread_12获得了许可证
线程thread_7释放了许可证
线程thread_13获得了许可证
线程thread_8释放了许可证
线程thread_14获得了许可证
线程thread_9释放了许可证
线程thread_10释放了许可证
线程thread_11释放了许可证
线程thread_12释放了许可证
线程thread_13释放了许可证
线程thread_14释放了许可证
实现分析
构造方法
上面的例子中,我们看到了Semaphore的构造方法,它需要一个int类型的参数,该参数代表许可证的数量:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
这个参数用来初始化内部的同步器,Semaphore中默认用到的是非公平锁。
NonfairSync(int permits) {
super(permits);
}
看父类Sync的构造方法:
Sync(int permits) {
setState(permits);
}
原来这个许可证数量是用来设置state字段的。那我们就大概能猜到,acquire会减少state的值,release会增加state的值,这个逻辑可能有点不好理解,acquire可以理解为获取通行证,每获取一个,通行证就少一个,release是释放通行证,每释放一个,通行证就多一个。
acquire方法
acquire方法用来获取通行证:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
它调用的是同步器的acquireSharedInterruptibly方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared在Sync中给出了实现:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//如果通行证用完了就直接返回 否则一直循环直到获取到通行证或者通行证用完
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
这是在一个循环里面,返回的条件是通行证要么用完要么拿到通行证,返回值代表当前线程最后有没有拿到通行证。
doAcquireSharedInterruptibly是AQS提供的,我们在前面的很多文章中都看到了它,它会将没有获取到通行证的线程构造成一个Node节点加入到共享队列中,然后线程处于WAITING状态来等待其他线程的唤醒,这里不再罗列代码。
所以acquire方法的逻辑就是如果当前线程拿到了许可证,就继续执行,同时会减少许可证的数量,否则就等待。
release方法
release方法调用的是同步器的releaseShared方法:
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
我们还是重点来看tryReleaseShared方法:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
它也是在一个自旋循环中,不断地尝试增加state的值也就是增加许可证的数量直至成功。许可证增加成功之后,它会调用doReleaseShared方法唤醒等待中的线程。doReleaseShared我们在前面的文章中已经讲过,这里不再赘述。
所以release方法的逻辑就是增加许可证的数量,同时唤醒因为没有获取到通行证而处于WAITING状态的线程。
小结
Semaphore在很多的场景中也会用到,例如获取有限的数据库连接等。