Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖 AQS 的状态 State,是在生产当中比较常用的一个工具类。Semaphore 的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。
微信截图_20211124145715.png

Semaphore 的简单例子

  1. public class SemaphoreRunner {
  2. //主线程开启10个线程
  3. public static void main(String[] args) {
  4. //设置每次最多只能有5个线程同时执行
  5. Semaphore semaphore = new Semaphore(5);
  6. for (int i=0;i<10;i++){
  7. new Thread(new Task(semaphore,"yangguo+"+i)).start();
  8. }
  9. }
  10. static class Task extends Thread{
  11. Semaphore semaphore;
  12. public Task(Semaphore semaphore,String tname){
  13. super(tname);
  14. this.semaphore = semaphore;
  15. }
  16. public void run() {
  17. try {
  18. //尝试获取通行许可
  19. semaphore.acquire();
  20. System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
  21. Thread.sleep(5000);
  22. //释放资源
  23. semaphore.release();
  24. //超时获取资源,获取不成功则采取降级策略
  25. /*if(semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)){
  26. System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
  27. Thread.sleep(5000);
  28. semaphore.release();//释放公共资源
  29. }else{
  30. fallback();
  31. }*/
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. public void fallback(){
  37. System.out.println("降级");
  38. }
  39. }
  40. }

上面代码中,main线程开启了10个线程,但是每次只能有5个线程同时执行,剩余的线程等待前面线程执行完毕后才能够执行,这就起到了限流的作用。

semaphore 的常用方法

  1. public void acquire() throws InterruptedException
  2. public boolean tryAcquire()
  3. public void release()
  4. public int availablePermits()
  5. public final int getQueueLength()
  6. public final boolean hasQueuedThreads()
  7. protected void reducePermits(int reduction)
  8. protected Collection<Thread> getQueuedThreads()
  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

构造方法

  1. //以该方法创建默认是非公平的
  2. public Semaphore(int permits) {
  3. sync = new NonfairSync(permits);
  4. }
  5. --------------------------
  6. public Semaphore(int permits, boolean fair) {
  7. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  8. }

permits:表示每次许可的线程数量(即AQS中state的值)
fair:表示公平性,如果是公平方式,下次被唤醒的线程会是CLH队列中的第一个线程

  1. FairSync(int permits) {
  2. super(permits);
  3. }
  4. -------------------
  5. Sync(int permits) {
  6. setState(permits);
  7. }
  8. -----------------------
  9. //设置AQS中的state
  10. protected final void setState(int newState) {
  11. state = newState;
  12. }

获取通行许可——aquire()

  1. //支持中断方式获取,遇到中断抛异常
  2. public void acquire() throws InterruptedException {
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. //非中断方式获取,遇到中断不抛异常
  6. public void acquireUninterruptibly() {
  7. sync.acquireShared(1);
  8. }

下面我们以支持中断的方式来看看里面的实现源码

  1. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. if (tryAcquireShared(arg) < 0)
  5. doAcquireSharedInterruptibly(arg);
  6. }

获取资源的公平实现与非公平实现


公平方式

  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. //如果是公平的方式获取,首先还要判断当前CLH队列里面有没有节点,有则需要排队
  4. if (hasQueuedPredecessors())
  5. return -1;
  6. int available = getState();
  7. int remaining = available - acquires;
  8. //如果state减完当前线程需要的资源数小于0,直接返回
  9. //否则返回减完剩余的state的值
  10. if (remaining < 0 || compareAndSetState(available, remaining))
  11. return remaining;
  12. }
  13. }

调用 tryAcquireShared(),以公平的方式获取,首先还要判断当前CLH队列里面有没有节点在阻塞,有则需要排队。如果没有则将当前 state 减去线程所需要的资源数,如果小于0(资源不够)则直接返回,如果大于0则通过CAS修改 state 的值。

非公平方式

  1. protected int tryAcquireShared(int acquires) {
  2. return nonfairTryAcquireShared(acquires);
  3. }
  4. ------------------------------
  5. final int nonfairTryAcquireShared(int acquires) {
  6. for (;;) {
  7. //非公平的方式不需要检查CLH队列中是否有元素,可以直接争夺资源
  8. int available = getState();
  9. int remaining = available - acquires;
  10. //如果state减完当前线程需要的资源数小于0,直接返回
  11. //否则返回减完剩余的state的值
  12. if (remaining < 0 || compareAndSetState(available, remaining))
  13. return remaining;
  14. }
  15. }

非公平方式与公平方式最大的不同就是,非公平方式不需要检查CLH队列中是否有节点正在排队,可以直接去争夺资源。

  1. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  2. //创建节点加入CLH队列,并且把该节点设置为SHARED共享模式
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. //队头节点被唤醒后可以去争夺资
  10. int r = tryAcquireShared(arg);
  11. //r>0,代表还有资源可以获取,则通过广播的方式唤醒下一个节点去争夺资源
  12. if (r >= 0) {
  13. setHeadAndPropagate(node, r);
  14. p.next = null; // help GC
  15. failed = false;
  16. return;
  17. }
  18. }
  19. //将节点加入CLH队列,并将线程进行阻塞
  20. if (shouldParkAfterFailedAcquire(p, node) &&
  21. parkAndCheckInterrupt())
  22. throw new InterruptedException();
  23. }
  24. } finally {
  25. if (failed)
  26. cancelAcquire(node);
  27. }
  28. }

传播唤醒后续节点

共享模式下,当前节点获取资源成功后,如果还有资源剩余,则会以广播的方式唤醒后一个节点去争夺资源。如果还有剩余资源,则唤醒的线程获取资源成功后,继续调用 setHeadAndPropagate() 方法,一直广播下去。

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head;
  3. //将出队的节点置为head加点(哨兵节点)
  4. setHead(node);
  5. if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
  6. Node s = node.next;
  7. //如果下一个节点是共享模式的节点
  8. if (s == null || s.isShared())
  9. //将下一个共享节点线程唤醒,继续争夺资源
  10. doReleaseShared();
  11. }
  12. }

微信截图_20211124154512.png

释放资源——release()

  1. public void release() {
  2. sync.releaseShared(1);
  3. }

释放资源,把获取的资源还回去,并且唤醒CLH队列中的节点

  1. public final boolean releaseShared(int arg) {
  2. //释放资源
  3. if (tryReleaseShared(arg)) {
  4. //唤醒CLH队列中的节点
  5. doReleaseShared();
  6. return true;
  7. }
  8. return false;
  9. }
  10. ------------------------------
  11. protected final boolean tryReleaseShared(int releases) {
  12. for (;;) {
  13. int current = getState();
  14. //将state加回去
  15. int next = current + releases;
  16. if (next < current) // overflow
  17. throw new Error("Maximum permit count exceeded");
  18. //CAS修改加回去的state
  19. if (compareAndSetState(current, next))
  20. return true;
  21. }
  22. }

成功修改 state 的值后,唤醒 CLH 队列中的阻塞线程。

  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. //将head节点状态由SIGNAL改为0
  8. //如果修改失败,则说明有别的线程在释放资源,原来的head节点已被其他线程唤醒
  9. //此时需要重新获取到新的head节点
  10. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  11. continue;
  12. //唤醒线程
  13. unparkSuccessor(h);
  14. }
  15. else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  16. continue;
  17. }
  18. if (h == head)
  19. break;
  20. }
  21. }