1.使用

  1. public static void main(String[] args) throws InterruptedException {
  2. final Semaphore semaphore = new Semaphore(2, true);
  3. Thread tA = new Thread(() ->{
  4. try {
  5. semaphore.acquire();
  6. System.out.println("线程A获取通行证成功");
  7. TimeUnit.SECONDS.sleep(10);
  8. } catch (InterruptedException e) {
  9. }finally {
  10. semaphore.release();
  11. }
  12. });
  13. tA.start();
  14. //确保线程A已经执行
  15. TimeUnit.MILLISECONDS.sleep(200);
  16. Thread tB = new Thread(() ->{
  17. try {
  18. semaphore.acquire(2);
  19. System.out.println("线程B获取通行证成功");
  20. } catch (InterruptedException e) {
  21. }finally {
  22. semaphore.release(2);
  23. }
  24. });
  25. tB.start();
  26. //确保线程B已经执行
  27. TimeUnit.MILLISECONDS.sleep(200);
  28. Thread tC = new Thread(() ->{
  29. try {
  30. semaphore.acquire();
  31. System.out.println("线程C获取通行证成功");
  32. } catch (InterruptedException e) {
  33. }finally {
  34. semaphore.release();
  35. }
  36. });
  37. tC.start();
  38. }
  1. package facetest.javase.juc;
  2. import java.util.concurrent.Semaphore;
  3. /**
  4. * @Created by 勺子
  5. * @Description pool
  6. * @Date 2022/4/6 10:42
  7. */
  8. public class Pool {
  9. /** 可同时访问资源的最大线程数*/
  10. private static final int MAX_AVAILABLE = 100;
  11. /** 信号量 表示:可获取的对象通行证*/
  12. private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
  13. /** 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是链接池*/
  14. protected Object[] items = new Object[MAX_AVAILABLE];
  15. /** 共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false*/
  16. protected boolean[] used = new boolean[MAX_AVAILABLE];
  17. /**
  18. * 获取一个空闲对象
  19. * 如果当前池中无空闲对象,则等待..直到有空闲对象为止
  20. */
  21. public Object getItem() throws InterruptedException {
  22. available.acquire();
  23. return getNextAvailableItem();
  24. }
  25. /**
  26. * 归还对象到池中
  27. */
  28. public void putItem(Object x) {
  29. if (markAsUnused(x))
  30. available.release();
  31. }
  32. /**
  33. * 获取池内一个空闲对象,获取成功则返回Object,失败返回Null
  34. * 成功后将对应的 used[i] = true
  35. */
  36. private synchronized Object getNextAvailableItem() {
  37. for (int i = 0; i < MAX_AVAILABLE; ++i) {
  38. if (!used[i]) {
  39. used[i] = true;
  40. return items[i];
  41. }
  42. }
  43. return null;
  44. }
  45. /**
  46. * 归还对象到池中,归还成功返回true
  47. * 归还失败:
  48. * 1.池中不存在该对象引用,返回false
  49. * 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false
  50. */
  51. private synchronized boolean markAsUnused(Object item) {
  52. for (int i = 0; i < MAX_AVAILABLE; ++i) {
  53. if (item == items[i]) {
  54. if (used[i]) {
  55. used[i] = false;
  56. return true;
  57. } else
  58. return false;
  59. }
  60. }
  61. return false;
  62. }
  63. }

2.源码

Semaphore内部有一个抽象的静态内部类sync,跟CountDownLatch一样,sync继承了AQS,因此Semaphore也有公平的方式,默认是非公平方式
无论公平锁还是非公平锁都是将传递的通行证赋值给AQS.state值

  1. //默认是非公平锁
  2. public Semaphore(int permits) {
  3. sync = new NonfairSync(permits);
  4. }
  5. //可以传递参数指定为公平锁
  6. public Semaphore(int permits, boolean fair) {
  7. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  8. }
  9. NonfairSync(int permits) {
  10. super(permits);
  11. }
  12. FairSync(int permits) {
  13. super(permits);
  14. }

1. acquire()

  1. public void acquire() throws InterruptedException {
  2. //调用AQS的方法
  3. sync.acquireSharedInterruptibly(1);
  4. }

2.AQS.acquireSharedInterruptibly

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. //如果线程已经被中断,直接抛出异常
  4. if (Thread.interrupted())
  5. throw new InterruptedException();
  6. //调用FairSync.tryAcquireShared方法
  7. //小于0的情况有两种:
  8. //1 非占用锁的线程调用了acquire方法 阻塞队列有等待者线程
  9. //2 当前剩下的通行证数不够当前线程本次获取
  10. //什么时候大于0 ? 获取通行证成功,返回剩余的通行证数
  11. if (tryAcquireShared(arg) < 0)
  12. //条件成立:走阻塞线程逻辑
  13. doAcquireSharedInterruptibly(arg);
  14. }

3.FairSync.tryAcquireShared

尝试获取锁,获取成功返回剩余的通行证数;获取失败返回小于0的数

  1. //尝试获取通行证,获取成功返回 >=0的值;获取失败 返回<0的值
  2. protected int tryAcquireShared(int acquires) {
  3. for (;;) {
  4. //判断当前AQS阻塞队列是否有 等待者线程,如果有直接返回 -1 ,表示当前acquire操作的线程需要进入队列等待...
  5. if (hasQueuedPredecessors())
  6. return -1;
  7. //执行到这里,哪几种情况?
  8. //1.调用acquire时,AQS阻塞队列内没有其他等待者
  9. //2.当前节点 在阻塞队列中是head.next节点
  10. //获取state,state这里表示 通行证
  11. int available = getState();
  12. //remaining 表示当前线程 获取通行证之后,semaphore还剩余数量
  13. int remaining = available - acquires;
  14. //条件一:remaining < 0成立,说明线程获取通行证失败
  15. //条件二:前置条件,remaining大于等于0,cas更新state成功,说明线程获取通行证成功,cas失败则自旋
  16. if (remaining < 0 ||
  17. compareAndSetState(available, remaining))
  18. return remaining;
  19. }
  20. }

4.AQS.doAcquireSharedInterruptibly

阻塞线程的逻辑

  • 将线程封装为node几点入阻塞队列
  • 自旋给自己找个好爸爸,然后阻塞,被唤醒后判断是否是head.next节点,是的话设置为头节点,逐次唤醒后继节点

    1. //AQS的doAcquireSharedInterruptibly方法
    2. private void doAcquireSharedInterruptibly(int arg)
    3. throws InterruptedException {
    4. //将调用semaphore.acquire方法的线程封装为node 添加到AQS的阻塞队列中
    5. final Node node = addWaiter(Node.SHARED);
    6. boolean failed = true;
    7. try {
    8. for (;;) {
    9. //获取当前node的前驱节点
    10. final Node p = node.predecessor();
    11. //条件成立 说明当前node节点是head.next节点,有权利获取 共享锁
    12. if (p == head) {
    13. int r = tryAcquireShared(arg);
    14. //站在semaphore角度,r大于0 说明当前node获取到锁了
    15. if (r >= 0) {
    16. //设置为头节点并唤醒后继节点
    17. setHeadAndPropagate(node, r);
    18. p.next = null; // help GC
    19. failed = false;
    20. return;
    21. }
    22. }
    23. //给当前线程找到好爸爸,将好爸爸的状态设置为singal -1 ,返回true
    24. //parkAndCheckInterrupt 挂起node对应的线程
    25. if (shouldParkAfterFailedAcquire(p, node) &&
    26. parkAndCheckInterrupt())
    27. throw new InterruptedException();
    28. }
    29. } finally {
    30. //阻塞过程中被中断唤醒之后,会抛出中断异常,然后将node节点设置为取消状态,走清除节点逻辑
    31. if (failed)
    32. cancelAcquire(node);
    33. }
    34. }

    5.AQS.setHeadAndPropagate

    1. //AQS的方法 设置当前node为 head节点,并向后传播(依次唤醒!)
    2. private void setHeadAndPropagate(Node node, int propagate) {
    3. Node h = head;
    4. //将当前节点设置为head节点
    5. setHead(node);
    6. //propagate 是剩余的通行证数
    7. if (propagate > 0 || h == null || h.waitStatus < 0 ||
    8. (h = head) == null || h.waitStatus < 0) {
    9. //获取当前节点的后继节点
    10. Node s = node.next;
    11. //条件一:s == null 成立 当前node节点已经是tail节点了,条件一成立 doReleaseShared会处理这种情况
    12. //条件二:前置条件 s!=null 那么要求s节点必须是 共享模式
    13. if (s == null || s.isShared())
    14. //基本上所有情况都会执行到这里
    15. doReleaseShared();
    16. }
    17. }

    6.release

    每调用一次,恢复一个通行证

    1. public void release() {
    2. //调用AQS的方法
    3. sync.releaseShared(1);
    4. }

    7.AQS.releaseShared

    1. public final boolean releaseShared(int arg) {
    2. //条件成立:表示当前线程释放资源成功,释放资源成功后,去唤醒获取资源失败的线程...
    3. if (tryReleaseShared(arg)) {
    4. //唤醒获取资源失败的线程
    5. doReleaseShared();
    6. return true;
    7. }
    8. return false;
    9. }

    8.Sync.tryReleaseShared

    恢复锁资源通常成功

    1. protected final boolean tryReleaseShared(int releases) {
    2. //自旋,恢复当前通行证数
    3. for (;;) {
    4. int current = getState();
    5. int next = current + releases;
    6. if (next < current) // overflow
    7. throw new Error("Maximum permit count exceeded");
    8. //通常cas锁资源修改成功
    9. if (compareAndSetState(current, next))
    10. return true;
    11. }
    12. }

    9.AQS.doReleaseShared

    1. //AQS的doReleaseShared方法
    2. //有哪几种情况会调用当前方法?
    3. //1.semaphore.release释放锁资源成功 然后调用doReleaseShared方法去唤醒head.next对应的线程
    4. //2.被唤醒的线程在doAcquireSharedInterruptibly方法中调用setHeadAndPropagate方法,然后会调用doReleaseShared该方法
    5. private void doReleaseShared() {
    6. for (;;) {
    7. //获取当前AQS的head节点
    8. Node h = head;
    9. //条件一 h != null 成立 说明阻塞队列不为空
    10. //条件二 h != tail 成立说明当前阻塞队列不只有head一个节点
    11. //h ==tail 表示head和tail指向同一个node节点,什么时候会出现?
    12. //1.正常唤醒情况,唤醒最后一个node节点时候,head是等于tail的
    13. //2.第一个调用await的线程在准备addWaiter入队时给head节点擦屁股后还没有把自己放进阻塞队列时候 这时候与countdown方法发生并发了
    14. if (h != null && h != tail) {
    15. //执行到这里 说明当前head一定有后继节点
    16. int ws = h.waitStatus;
    17. if (ws == Node.SIGNAL) {
    18. //将当前node状态改为0
    19. //为什么用cas 多个线程唤醒head.next节点时候, 可能会失败
    20. //案例:t3线程在if (h == head) 返回false时 t3不会退出循环,会继续自旋 参与到唤醒下一个head.next逻辑
    21. //t3此时执行cas 成功..t4(head节点线程)在t3修改成功之前,也进入到这里代码块,t4会compareAndSetWaitStatus 修改失败,因为t3改过了
    22. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    23. continue;
    24. //唤醒head的后继节点
    25. unparkSuccessor(h);
    26. }
    27. else if (ws == 0 &&
    28. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    29. continue;
    30. }
    31. //条件成立
    32. //1. 说明刚被unaprk唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的 设置当前后继node为head节点逻辑
    33. //2. h==null
    34. //3. h==tail head==tail指向一个node对象
    35. //条件不成立
    36. //被唤醒的节点 很积极 直接将自己设置为head节点, 此时 唤醒它的节点(前驱节点) 执行h==head不成立
    37. //此时head节点的前驱节点 不会跳出doReleaseShared方法,会继续唤醒 新head节点的后继节点..
    38. if (h == head) // loop if head changed
    39. break;
    40. }
    41. }

    Semaphore 案例分析.png信号量,获取通行证流程.png