一、介绍

1.1 简介

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

1.2 Semaphore API

  1. //返回此信号量中当前可用的许可证数。
  2. public int availablePermits();
  3. //返回正在等待获取许可证的线程数。
  4. public final int getQueueLength();
  5. //是否有线程正在等待获取许可证。
  6. public final boolean hasQueuedThreads();
  7. //获取并返回所有立即可用的许可证。
  8. public int drainPermits();
  9. //减少reduction个许可证,是个protected方法。
  10. protected void reducePermits(int reduction)
  11. //返回所有等待获取许可证的线程集合
  12. protected Collection<Thread> getQueuedThreads() ;
  13. //是否公平锁
  14. public boolean isFair();

二、示例

模拟30个线程读取数据保存到数据库中,而数据库的连接数只有10个,这是需要限制获取数据库连接数最多只能10;不然获取不到数据库连接异常。

  1. public class SemaphoreTest {
  2. private static final int THREAD_COUNT = 30;
  3. private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
  4. private static Semaphore s = new Semaphore(10);
  5. public static void main(String[] args) {
  6. for (int i = 0; i < THREAD_COUNT; i++) {
  7. int a=i;
  8. threadPool.execute(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. s.acquire();
  13. System.out.println("save data" + a);
  14. TimeUnit.SECONDS.sleep(1);
  15. s.release();
  16. } catch (InterruptedException e) {
  17. }
  18. }
  19. });
  20. }
  21. threadPool.shutdown();
  22. }
  23. }

三、源码分析

3.1 UML 图示

image.png
从UML 图示可以看到,Semaphore 内部采用AQS 实现了公平锁和非公平锁

3.2 Semaphore的构造方法

  1. //默认初始化非公平锁
  2. public Semaphore(int permits) {
  3. sync = new NonfairSync(permits);
  4. }
  5. public Semaphore(int permits, boolean fair) {
  6. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  7. }

3.3 acquire 方法

从Semaphore中获取”许可证“,阻塞直到有一个”许可证“可用 或者 线程被打断。
获取许可证,如果有许可证可用, 线程会立即返回执行,并减少许可证的数量;
如果没有许可证可用,那么会阻塞线程的执行,直到:

  1. 有其他线程释放有许可证, 当前线程是下一个分配许可证
  2. 有其他线程打断当前线程

实际上是调用acquireSharedInterruptibly方法

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. //判断线程是否被打断
  4. if (Thread.interrupted())
  5. throw new InterruptedException();
  6. //获取共享锁
  7. if (tryAcquireShared(arg) < 0)
  8. //失败后,进入等待队列
  9. doAcquireSharedInterruptibly(arg);
  10. }

非公平锁 获取共享锁实现nonfairTryAcquireShared

  1. final int nonfairTryAcquireShared(int acquires) {
  2. //自旋
  3. for (;;) {
  4. //获取”许可证“数量
  5. int available = getState();
  6. //剩余”许可证“数量
  7. int remaining = available - acquires;
  8. //如果”许可证“数量少于0, 则阻塞线程
  9. if (remaining < 0 ||
  10. //否则,cas 更新可用的”许可证“数量
  11. compareAndSetState(available, remaining))
  12. return remaining;
  13. }
  14. }

公平锁 获取共享锁实现

  1. protected int tryAcquireShared(int acquires) {
  2. //自旋
  3. for (;;) {
  4. //当前是否有线程在等待队列中,等待获取许可证
  5. if (hasQueuedPredecessors())
  6. return -1;
  7. //获取”许可证“数量
  8. int available = getState();
  9. //剩余”许可证“数量
  10. int remaining = available - acquires;
  11. //如果”许可证“数量少于0, 则阻塞线程
  12. if (remaining < 0 ||
  13. //否则,cas 更新可用的许可证数量
  14. compareAndSetState(available, remaining))
  15. return remaining;
  16. }
  17. }

3.4 release 方法

实际上是调用releaseShared 方法

  1. public final boolean releaseShared(int arg) {
  2. //尝试释放共享锁
  3. if (tryReleaseShared(arg)) {
  4. doReleaseShared();
  5. return true;
  6. }
  7. return false;
  8. }

tryReleaseShared 方法逻辑

  1. protected final boolean tryReleaseShared(int releases) {
  2. //自旋
  3. for (;;) {
  4. //获取”许可证“数量
  5. int current = getState();
  6. //回收后的数量
  7. int next = current + releases;
  8. if (next < current) // overflow
  9. throw new Error("Maximum permit count exceeded");
  10. //cas 更新”许可证“数量
  11. if (compareAndSetState(current, next))
  12. return true;
  13. }
  14. }

参考

  • 《Java并发编程的艺术》