引言

Semaphore实现了对资源的限定数量的访问。它维护了指定数量的许可证,只有拿到许可证的线程才能继续执行。也就是最多同时访问资源的线程数量不能超过许可证的数量。它的内部实现也是基于AQS的。这篇文章,我们来看它的实现。

一个例子

先看一个使用Semaphore的例子:

  1. public class SemaphoreTest {
  2. private static Semaphore semaphore = new Semaphore(6);
  3. public static void main(String[] args) {
  4. for(int i=0;i<15;i++){
  5. Thread thread = new Thread(new SemaphoreTask(),"thread_"+i);
  6. thread.start();
  7. }
  8. }
  9. static class SemaphoreTask implements Runnable{
  10. @Override
  11. public void run() {
  12. try {
  13. semaphore.acquire();
  14. System.out.println("线程"+Thread.currentThread().getName()+"获得了许可证");
  15. Thread.sleep(Integer.parseInt(Thread.currentThread().getName().replace("thread_",""))*1000);
  16. System.out.println("线程"+Thread.currentThread().getName()+"释放了许可证");
  17. semaphore.release();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. }

在这个例子中,Semaphore维护了6个许可证,但是同时想访问资源的线程有15个。这样最终能同时执行的线程只有6个,每个线程释放许可证之后,另外一个线程就能拿到,运行结果如下:

  1. 线程thread_1获得了许可证
  2. 线程thread_3获得了许可证
  3. 线程thread_2获得了许可证
  4. 线程thread_0获得了许可证
  5. 线程thread_5获得了许可证
  6. 线程thread_4获得了许可证
  7. 线程thread_0释放了许可证
  8. 线程thread_6获得了许可证
  9. 线程thread_1释放了许可证
  10. 线程thread_7获得了许可证
  11. 线程thread_2释放了许可证
  12. 线程thread_8获得了许可证
  13. 线程thread_3释放了许可证
  14. 线程thread_9获得了许可证
  15. 线程thread_4释放了许可证
  16. 线程thread_10获得了许可证
  17. 线程thread_5释放了许可证
  18. 线程thread_11获得了许可证
  19. 线程thread_6释放了许可证
  20. 线程thread_12获得了许可证
  21. 线程thread_7释放了许可证
  22. 线程thread_13获得了许可证
  23. 线程thread_8释放了许可证
  24. 线程thread_14获得了许可证
  25. 线程thread_9释放了许可证
  26. 线程thread_10释放了许可证
  27. 线程thread_11释放了许可证
  28. 线程thread_12释放了许可证
  29. 线程thread_13释放了许可证
  30. 线程thread_14释放了许可证

实现分析

构造方法

上面的例子中,我们看到了Semaphore的构造方法,它需要一个int类型的参数,该参数代表许可证的数量:

  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }

这个参数用来初始化内部的同步器,Semaphore中默认用到的是非公平锁。

  1. NonfairSync(int permits) {
  2. super(permits);
  3. }

看父类Sync的构造方法:

  1. Sync(int permits) {
  2. setState(permits);
  3. }

原来这个许可证数量是用来设置state字段的。那我们就大概能猜到,acquire会减少state的值,release会增加state的值,这个逻辑可能有点不好理解,acquire可以理解为获取通行证,每获取一个,通行证就少一个,release是释放通行证,每释放一个,通行证就多一个。

acquire方法

acquire方法用来获取通行证:

  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }

它调用的是同步器的acquireSharedInterruptibly方法:

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireSharedInterruptibly(arg);
  7. }

tryAcquireShared在Sync中给出了实现:

  1. final int nonfairTryAcquireShared(int acquires) {
  2. for (;;) {
  3. int available = getState();
  4. int remaining = available - acquires;
  5. //如果通行证用完了就直接返回 否则一直循环直到获取到通行证或者通行证用完
  6. if (remaining < 0 ||
  7. compareAndSetState(available, remaining))
  8. return remaining;
  9. }
  10. }

这是在一个循环里面,返回的条件是通行证要么用完要么拿到通行证,返回值代表当前线程最后有没有拿到通行证。
doAcquireSharedInterruptibly是AQS提供的,我们在前面的很多文章中都看到了它,它会将没有获取到通行证的线程构造成一个Node节点加入到共享队列中,然后线程处于WAITING状态来等待其他线程的唤醒,这里不再罗列代码。
所以acquire方法的逻辑就是如果当前线程拿到了许可证,就继续执行,同时会减少许可证的数量,否则就等待。

release方法

release方法调用的是同步器的releaseShared方法:

  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

我们还是重点来看tryReleaseShared方法:

  1. protected final boolean tryReleaseShared(int releases) {
  2. for (;;) {
  3. int current = getState();
  4. int next = current + releases;
  5. if (next < current) // overflow
  6. throw new Error("Maximum permit count exceeded");
  7. if (compareAndSetState(current, next))
  8. return true;
  9. }
  10. }

它也是在一个自旋循环中,不断地尝试增加state的值也就是增加许可证的数量直至成功。许可证增加成功之后,它会调用doReleaseShared方法唤醒等待中的线程。doReleaseShared我们在前面的文章中已经讲过,这里不再赘述。
所以release方法的逻辑就是增加许可证的数量,同时唤醒因为没有获取到通行证而处于WAITING状态的线程。

小结

Semaphore在很多的场景中也会用到,例如获取有限的数据库连接等。