Semaphore介绍

信号量:操作系统中PV操作的原语在java的实现,AbstractQueuedSynchronizer实现
大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现
大小为n(n>0)的信号量可以实现限流的功能,只能有n个线程同时获取信号量
1648215732(1).png
PV操作是操作系统一种实现进程互斥与同步的有效方法
PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思
PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性

  1. S1
  2. ②若S1后仍大于或等于0,则进程继续执行;
  3. ③若S1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
  4. V操作的主要动作是:
  5. S1
  6. ②若相加后结果大于0,则进程继续执行;
  7. ③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。

Semaphore 常用方法

构造器

  1. /**
  2. * Creates a {@code Semaphore} with the given number of
  3. * permits and nonfair fairness setting.
  4. *
  5. * @param permits the initial number of permits available.
  6. * This value may be negative, in which case releases
  7. * must occur before any acquires will be granted.
  8. */
  9. public Semaphore(int permits) {
  10. sync = new NonfairSync(permits);
  11. }
  12. //fair 是否公平
  13. //permits 许可的资源数
  14. public Semaphore(int permits, boolean fair) {
  15. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  16. }
  1. public void acquire() throws InterruptedException
  2. public boolean tryAcquire()
  3. public void release()
  4. public int availablePermits()
  5. public final int getQueueLength()
  6. public final boolean hasQueuedThreads()
  7. protected void reducePermits(int reduction)
  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

    Semaphore源码分析

    关注点:
    1. Semaphore的加锁解锁(共享锁)逻辑实现
    2. 线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现
    3:获取资源失败的入队
    4:释放资源之后的取队列head 和唤醒后一个节点的线程 ```java public final void acquireSharedInterruptibly(int arg)
    1. throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. if (tryAcquireShared(arg) < 0)//取获取资源
    5. doAcquireSharedInterruptibly(arg);//获取不到入队操作
    }

/**

  1. * Acquires in shared interruptible mode.
  2. * @param arg the acquire argument
  3. */
  4. private void doAcquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. final Node node = addWaiter(Node.SHARED);//获取node,没有就入队
  7. boolean failed = true;
  8. try {
  9. for (;;) {
  10. final Node p = node.predecessor();
  11. if (p == head) {
  12. int r = tryAcquireShared(arg);
  13. if (r >= 0) {
  14. setHeadAndPropagate(node, r);
  15. p.next = null; // help GC
  16. failed = false;
  17. return;
  18. }
  19. }
  20. //获取资源失败取阻塞
  21. if (shouldParkAfterFailedAcquire(p, node) &&//设置节点的waitStatus = -1
  22. parkAndCheckInterrupt())// park线程
  23. throw new InterruptedException();
  24. }
  25. } finally {
  26. if (failed)
  27. cancelAcquire(node);
  28. }
  29. }
  1. ```java
  2. public final boolean releaseShared(int arg) {
  3. if (tryReleaseShared(arg)) {//释放资源, 资源数修改
  4. doReleaseShared();//唤醒下一个线程
  5. return true;
  6. }
  7. return false;
  8. }
  9. //AQS实现的
  10. private void doReleaseShared() {
  11. /*
  12. * Ensure that a release propagates, even if there are other
  13. * in-progress acquires/releases. This proceeds in the usual
  14. * way of trying to unparkSuccessor of head if it needs
  15. * signal. But if it does not, status is set to PROPAGATE to
  16. * ensure that upon release, propagation continues.
  17. * Additionally, we must loop in case a new node is added
  18. * while we are doing this. Also, unlike other uses of
  19. * unparkSuccessor, we need to know if CAS to reset status
  20. * fails, if so rechecking.
  21. */
  22. for (;;) {
  23. Node h = head;
  24. if (h != null && h != tail) {
  25. int ws = h.waitStatus;
  26. if (ws == Node.SIGNAL) {
  27. //先改状态 为0
  28. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  29. continue; // loop to recheck cases
  30. unparkSuccessor(h);//唤醒下一个节点
  31. }
  32. else if (ws == 0 &&
  33. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  34. continue; // loop on failed CAS
  35. }
  36. if (h == head) // loop if head changed
  37. break;
  38. }
  39. }
  40. private void unparkSuccessor(Node node) {
  41. int ws = node.waitStatus;
  42. if (ws < 0)
  43. compareAndSetWaitStatus(node, ws, 0);
  44. Node s = node.next;
  45. if (s == null || s.waitStatus > 0) {
  46. s = null;
  47. for (Node t = tail; t != null && t != node; t = t.prev)
  48. if (t.waitStatus <= 0)
  49. s = t;
  50. }
  51. if (s != null)
  52. LockSupport.unpark(s.thread);//直接唤醒下一个线程
  53. }
  54. public final void acquireShared(int arg) {
  55. if (tryAcquireShared(arg) < 0)
  56. doAcquireShared(arg);
  57. }
  58. private void doAcquireShared(int arg) {
  59. final Node node = addWaiter(Node.SHARED);
  60. boolean failed = true;
  61. try {
  62. boolean interrupted = false;
  63. for (;;) {
  64. final Node p = node.predecessor();
  65. if (p == head) {
  66. int r = tryAcquireShared(arg);
  67. if (r >= 0) {
  68. setHeadAndPropagate(node, r);//继续唤醒下个节点
  69. p.next = null; // help GC
  70. if (interrupted)
  71. selfInterrupt();
  72. failed = false;
  73. return;
  74. }
  75. }
  76. if (shouldParkAfterFailedAcquire(p, node) &&
  77. parkAndCheckInterrupt())
  78. interrupted = true;
  79. }
  80. } finally {
  81. if (failed)
  82. cancelAcquire(node);
  83. }
  84. }
  85. private void setHeadAndPropagate(Node node, int propagate) {
  86. Node h = head; // Record old head for check below
  87. setHead(node);
  88. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  89. (h = head) == null || h.waitStatus < 0) {
  90. Node s = node.next;
  91. if (s == null || s.isShared())
  92. doReleaseShared();//继续唤醒线程
  93. }
  94. }
  95. private void doReleaseShared() {
  96. for (;;) {
  97. Node h = head;
  98. if (h != null && h != tail) {
  99. int ws = h.waitStatus;
  100. if (ws == Node.SIGNAL) {
  101. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  102. continue; // loop to recheck cases
  103. unparkSuccessor(h);
  104. }
  105. else if (ws == 0 &&
  106. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  107. continue; // loop on failed CAS
  108. }
  109. if (h == head) // loop if head changed
  110. break;
  111. }
  112. }

30198.png

其他链接:
https://www.processon.com/view/link/61950f6e5653bb30803c5bd2