一、应用

  1. static Semaphore semaphore = new Semaphore(3);
  2. public static void main(String[] args) {
  3. ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
  4. for (int i = 0; i < 20;i++){
  5. executor.execute(SemaphoreDemo::exec);
  6. }
  7. }
  8. private static void exec() {
  9. try {
  10. semaphore.acquire();
  11. TimeUnit.SECONDS.sleep(5);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }finally {
  15. semaphore.release();
  16. }
  17. System.out.println("当前执行线程:" + Thread.currentThread().getName());
  18. }

二、源码解析

acquire方法

  1. /**
  2. * 从这个信号量获取一个许可,
  3. * (如果有)立即返回,将可用许可证数量减一
  4. * (如果无)阻塞当前线程直到有可用,或者线程被中断。
  5. *
  6. * 如果没有可用的许可证,则当前线程将出于线程调度目的被禁用,并处于休眠状态,
  7. * 结束休眠两种情况之一:
  8. * 1.其他一些线程归还许可;
  9. * 2.其他线程会中断当前线程。
  10. * 如果当前线程: 在进入该方法时设置其中断状态;或 在等待许可证时被打断,
  11. * 然后抛出InterruptedException,并清除当前线程的中断状态。
  12. * 抛出: InterruptedException–如果当前线程被中断
  13. **/
  14. public void acquire() throws InterruptedException {
  15. sync.acquireSharedInterruptibly(1);
  16. }
  17. public final void acquireSharedInterruptibly(int arg)
  18. throws InterruptedException {
  19. //如果设置中断标识,则排除异常;【并清理中断标识】
  20. if (Thread.interrupted())
  21. throw new InterruptedException();
  22. //小于0;表示无可用许可
  23. if (tryAcquireShared(arg) < 0)//条件中使用cas修改许可数量
  24. //阻塞当前线程
  25. doAcquireSharedInterruptibly(arg);
  26. }

doAcquireSharedInterruptibly方法,获取锁失败逻辑

  1. /**
  2. * 具体过程,如果当前state 减去 需要的许可 <0,则线程阻塞;等待有其他线程释放资源,state恢复,获取到资源的线程,唤醒阻塞队列阻塞的线程
  3. *
  4. * 获取锁失败处理逻辑。
  5. * 1.创建共享节点,插入同步等待队列中(CLH)
  6. * 2.如果,当前节点是头结点(或者节点重新被唤醒),则再次尝试获取锁。
  7. * 2.1.获取锁成功,则修改等待列表头结点
  8. * 2.2.并尝试唤醒队列中其他节点。(存在获取阻塞队列线程的逻辑)
  9. * 3.不是头结点,
  10. * 3.1.第一次循环:修改waitstatus=-1
  11. * 3.2.第二次循环:阻塞当前线程。
  12. **/
  13. private void doAcquireSharedInterruptibly(int arg)
  14. throws InterruptedException {
  15. //创建共享节点,并插入等待队列尾部
  16. final Node node = addWaiter(Node.SHARED);
  17. boolean failed = true;
  18. try {
  19. for (;;) {
  20. //获取当前新建节点前一个节点。如果前一个节点是head,当前新节点是队列中第一个元素
  21. final Node p = node.predecessor();
  22. if (p == head) {
  23. //如果是队列中第一个元素,则再次尝试获取许可(有可能是从阻塞状态被唤醒后,再次执行到此处)
  24. int r = tryAcquireShared(arg);
  25. if (r >= 0) {
  26. //获得许可成功后,
  27. //1.将最新节点退化为头结点。
  28. //2.如果并唤醒队列中r个共享节点
  29. setHeadAndPropagate(node, r);
  30. p.next = null; // help GC
  31. failed = false;
  32. return;
  33. }
  34. }
  35. /**
  36. * 第一个循环,通过shouldParkAfterFailedAcquire,修改waitstatus=-1
  37. * 第二个循环,对节点中线程进行休眠。
  38. * 如果休眠过程中,被设置中断标识。重新唤醒后,则抛出异常
  39. **/
  40. if (shouldParkAfterFailedAcquire(p, node) &&
  41. parkAndCheckInterrupt())
  42. throw new InterruptedException();
  43. }
  44. } finally {
  45. //发生异常则清理队列中异常节点
  46. if (failed)
  47. cancelAcquire(node);
  48. }
  49. }

setHeadAndPropagate

  1. /**
  2. * 设置队列头,并检查后继队列是否在共享模式下等待,如果是,则在“传播>0”或“传播状态”已设置的情况下进行传播
  3. *
  4. **/
  5. private void setHeadAndPropagate(Node node, int propagate) {
  6. Node h = head; // Record old head for check below
  7. setHead(node);
  8. /*
  9. * propagate > 0表示存在许可。可以执行唤醒逻辑
  10. */
  11. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  12. (h = head) == null || h.waitStatus < 0) {
  13. Node s = node.next;
  14. if (s == null || s.isShared())
  15. //唤醒逻辑
  16. doReleaseShared();
  17. }
  18. }

doReleaseShared

  1. //唤醒逻辑
  2. private void doReleaseShared() {
  3. /*
  4. *
  5. */
  6. for (;;) {
  7. Node h = head;
  8. if (h != null && h != tail) {
  9. int ws = h.waitStatus;
  10. if (ws == Node.SIGNAL) {
  11. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  12. continue; // loop to recheck cases
  13. unparkSuccessor(h);
  14. }
  15. else if (ws == 0 &&
  16. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  17. continue; // loop on failed CAS
  18. }
  19. if (h == head) // loop if head changed
  20. break;
  21. }
  22. }

acquireUninterruptibly方法

  1. /**
  2. * 从这个信号量获取一个许可,
  3. * (如果有)立即返回,将可用许可证数量减一
  4. * (如果无)阻塞当前线程直到有可用,或者线程被中断。
  5. * 如果没有可用的许可证,则当前线程出于线程调度目的将被禁用,并处于休眠状态,直到其他线程调用此信号量的释放方法,
  6. *
  7. * 如果当前线程在等待许可证时【被中断】,那么它将【继续等待】,获取许可后,将重新设置中断状态
  8. **/
  9. public void acquireUninterruptibly() {
  10. sync.acquireShared(1);
  11. }
  12. public final void acquireShared(int arg) {
  13. //小于0;表示无可用许可
  14. if (tryAcquireShared(arg) < 0)//条件中使用cas修改许可数量
  15. doAcquireShared(arg);
  16. }

doAcquireShared方法

  1. //与上面doAcquireSharedInterruptibly方法逻辑相同,
  2. // 区别:如果在休眠过程中,设置了中断标识。【重新唤醒后,不会抛异常】,会重新设置中断标识
  3. private void doAcquireShared(int arg) {
  4. final Node node = addWaiter(Node.SHARED);
  5. boolean failed = true;
  6. try {
  7. boolean interrupted = false;
  8. for (;;) {
  9. final Node p = node.predecessor();
  10. if (p == head) {
  11. int r = tryAcquireShared(arg);
  12. if (r >= 0) {
  13. setHeadAndPropagate(node, r);
  14. p.next = null; // help GC
  15. if (interrupted)
  16. selfInterrupt();
  17. failed = false;
  18. return;
  19. }
  20. }
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. interrupted = true;
  24. }
  25. } finally {
  26. if (failed)
  27. cancelAcquire(node);
  28. }
  29. }

tryAcquire()方法

  1. /**
  2. * 获取许可证(如果有)并立即返回true,将可用许可证的数量减一。
  3. * 如果没有可用的许可证,则此方法将立即返回值false。
  4. **/
  5. public boolean tryAcquire() {
  6. return sync.nonfairTryAcquireShared(1) >= 0;
  7. }
  8. //仅有cas操作
  9. final int nonfairTryAcquireShared(int acquires) {
  10. for (;;) {
  11. int available = getState();
  12. int remaining = available - acquires;
  13. if (remaining < 0 ||
  14. compareAndSetState(available, remaining))
  15. return remaining;
  16. }
  17. }

tryAcquire(long timeout, TimeUnit unit)

  1. public boolean tryAcquire(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  4. }
  5. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  6. throws InterruptedException {
  7. //设置中断,则抛出异常
  8. if (Thread.interrupted())
  9. throw new InterruptedException();
  10. //如果有可用许可,则立即返回true。
  11. //否则阻塞指定时间timeout.则阻塞期间如果获取则返回true。否则返回false
  12. return tryAcquireShared(arg) >= 0 ||
  13. doAcquireSharedNanos(arg, nanosTimeout);
  14. }

doAcquireSharedNanos方法

  1. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (nanosTimeout <= 0L)
  4. return false;
  5. //获取截止时间纳秒数
  6. final long deadline = System.nanoTime() + nanosTimeout;
  7. //插入等待队列
  8. final Node node = addWaiter(Node.SHARED);
  9. boolean failed = true;
  10. try {
  11. for (;;) {
  12. final Node p = node.predecessor();
  13. if (p == head) {
  14. int r = tryAcquireShared(arg);
  15. if (r >= 0) {
  16. setHeadAndPropagate(node, r);
  17. p.next = null; // help GC
  18. failed = false;
  19. return true;
  20. }
  21. }
  22. nanosTimeout = deadline - System.nanoTime();
  23. //到达截止时间,直接返回false
  24. if (nanosTimeout <= 0L)
  25. return false;
  26. //修改当前节点前一个元素waitstatus=-1
  27. //如果距离截止时间大于1000纳秒,则进行阻塞
  28. if (shouldParkAfterFailedAcquire(p, node) &&
  29. nanosTimeout > spinForTimeoutThreshold)
  30. //阻塞线程
  31. LockSupport.parkNanos(this, nanosTimeout);
  32. //休眠期间设置中断标识。唤醒后,则抛出异常
  33. if (Thread.interrupted())
  34. throw new InterruptedException();
  35. }
  36. } finally {
  37. if (failed)
  38. cancelAcquire(node);
  39. }
  40. }

release方法

  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }
  11. protected final boolean tryReleaseShared(int releases) {
  12. for (;;) {
  13. int current = getState();
  14. int next = current + releases;
  15. if (next < current) // overflow
  16. throw new Error("Maximum permit count exceeded");
  17. if (compareAndSetState(current, next))
  18. return true;
  19. }
  20. }

doReleaseShared()方法

  1. /**
  2. * 唤醒1个节点
  3. * 场景一:头结点waitstatus=-1,修改为0,则执行唤醒操作,结束
  4. * 场景二:头结点waitstatus=0,则修改为-3,
  5. **/
  6. private void doReleaseShared() {
  7. for (;;) {
  8. Node h = head;
  9. if (h != null && h != tail) {//队列不为空
  10. int ws = h.waitStatus;
  11. if (ws == Node.SIGNAL) {//头结点,waitstatus=-1
  12. //完成将-1修改为0,则进行唤醒动作;否则自旋。(可能被其他线程修改为0,或者就是cas操作失败)
  13. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  14. continue; // loop to recheck cases
  15. unparkSuccessor(h);
  16. }
  17. //如果被其他线程线程修改为0,则把0修改为-3
  18. else if (ws == 0 &&
  19. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  20. continue; // loop on failed CAS
  21. }
  22. //完成-1-->0;或者0-->-3操作后,则跳出循环
  23. if (h == head) // loop if head changed
  24. break;
  25. }
  26. }
  27. private void unparkSuccessor(Node node) {
  28. int ws = node.waitStatus;
  29. if (ws < 0)
  30. compareAndSetWaitStatus(node, ws, 0);
  31. Node s = node.next;
  32. if (s == null || s.waitStatus > 0) {
  33. s = null;
  34. //从队尾查找在队列中排在最前面需要唤醒节点
  35. for (Node t = tail; t != null && t != node; t = t.prev)
  36. if (t.waitStatus <= 0)
  37. s = t;
  38. }
  39. if (s != null)
  40. LockSupport.unpark(s.thread);
  41. }