Semaphore简介

Semaphore 作用

Semaphore 通常叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。可以把它简单的理解成停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

可以看到,Semaphore 可以达到一个限流线程数的效果。

常用方法

  1. //获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
  2. acquire()
  3. //获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
  4. acquire(int permits)
  5. //获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
  6. acquireUninterruptibly()
  7. //尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
  8. tryAcquire()
  9. //尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
  10. tryAcquire(long timeout, TimeUnit unit)
  11. //释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
  12. release()
  13. //等待队列里是否还存在等待线程。
  14. hasQueuedThreads()
  15. //获取等待队列里阻塞的线程数。
  16. getQueueLength()
  17. //清空令牌把可用令牌数置为0,返回清空令牌的数量。
  18. drainPermits()
  19. //返回可用的令牌数量。
  20. availablePermits()

代码演示

  1. @Slf4j(topic = "c.main")
  2. public class Main {
  3. public static void main(String[] args) {
  4. // 1. 创建 semaphore 对象
  5. Semaphore semaphore = new Semaphore(3);
  6. // 2. 10个线程同时运行
  7. for (int i = 0; i < 10; i++) {
  8. new Thread(() -> {
  9. // 3. 获取许可
  10. try {
  11. semaphore.acquire();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. try {
  16. log.debug("running...");
  17. try {
  18. Thread.sleep(1000);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. log.debug("end...");
  23. } finally {
  24. // 4. 释放许可
  25. semaphore.release();
  26. }
  27. }).start();
  28. }
  29. }
  30. }

测试结果:
image.png
可以看到明显将线程数限制在了3个。

实例应用

使用场景

  • 数据库连接池:同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。
  • 限流线程数:在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)

    代码示例

    实现数据库连接池等待机制的代码如下:
    1. @Slf4j(topic = "c.pool")
    2. public class Pool {
    3. // 1. 连接池大小
    4. private final int poolSize;
    5. // 2. 连接对象数组
    6. private final Connection[] connections;
    7. // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    8. private final AtomicIntegerArray states;
    9. private final Semaphore semaphore;
    10. // 4. 构造方法初始化
    11. public Pool(int poolSize) {
    12. this.poolSize = poolSize;
    13. // 许可数与资源数一致
    14. this.semaphore = new Semaphore(poolSize);
    15. this.connections = new Connection[poolSize];
    16. this.states = new AtomicIntegerArray(new int[poolSize]);
    17. for (int i = 0; i < poolSize; i++) {
    18. connections[i] = new Connection() {
    19. //省略各种实现方法
    20. };
    21. }
    22. }
    23. // 5. 借连接
    24. public Connection borrow() {// t1, t2, t3
    25. // 获取许可
    26. try {
    27. semaphore.acquire(); // 没有许可的线程,在此一直等待
    28. } catch (InterruptedException e) {
    29. e.printStackTrace();
    30. }
    31. for (int i = 0; i < poolSize; i++) {
    32. // 获取空闲连接
    33. if(states.get(i) == 0) {
    34. if (states.compareAndSet(i, 0, 1)) {
    35. log.debug("borrow {}", connections[i]);
    36. return connections[i];
    37. }
    38. }
    39. }
    40. // 不会执行到这里
    41. return null;
    42. }
    43. // 6. 归还连接
    44. public void free(Connection conn) {
    45. for (int i = 0; i < poolSize; i++) {
    46. if (connections[i] == conn) {
    47. states.set(i, 0);
    48. log.debug("free {}", conn);
    49. semaphore.release();
    50. break;
    51. }
    52. }
    53. }
    54. }