Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖 AQS 的状态 State,是在生产当中比较常用的一个工具类。Semaphore 的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。 
Semaphore 的简单例子
public class SemaphoreRunner {//主线程开启10个线程public static void main(String[] args) {//设置每次最多只能有5个线程同时执行Semaphore semaphore = new Semaphore(5);for (int i=0;i<10;i++){new Thread(new Task(semaphore,"yangguo+"+i)).start();}}static class Task extends Thread{Semaphore semaphore;public Task(Semaphore semaphore,String tname){super(tname);this.semaphore = semaphore;}public void run() {try {//尝试获取通行许可semaphore.acquire();System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());Thread.sleep(5000);//释放资源semaphore.release();//超时获取资源,获取不成功则采取降级策略/*if(semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)){System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());Thread.sleep(5000);semaphore.release();//释放公共资源}else{fallback();}*/} catch (InterruptedException e) {e.printStackTrace();}}public void fallback(){System.out.println("降级");}}}
上面代码中,main线程开启了10个线程,但是每次只能有5个线程同时执行,剩余的线程等待前面线程执行完毕后才能够执行,这就起到了限流的作用。
semaphore 的常用方法
public void acquire() throws InterruptedExceptionpublic boolean tryAcquire()public void release()public int availablePermits()public final int getQueueLength()public final boolean hasQueuedThreads()protected void reducePermits(int reduction)protected Collection<Thread> getQueuedThreads()
- acquire() 表示阻塞并获取许可
- tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
- release() 表示释放许可
- int availablePermits():返回此信号量中当前可用的许可证数。
- int getQueueLength():返回正在等待获取许可证的线程数。
- boolean hasQueuedThreads():是否有线程正在等待获取许可证。
- void reducePermit(int reduction):减少 reduction 个许可证
- Collection getQueuedThreads():返回所有等待获取许可证的线程集合
构造方法
//以该方法创建默认是非公平的public Semaphore(int permits) {sync = new NonfairSync(permits);}--------------------------public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
permits:表示每次许可的线程数量(即AQS中state的值)
fair:表示公平性,如果是公平方式,下次被唤醒的线程会是CLH队列中的第一个线程
FairSync(int permits) {super(permits);}-------------------Sync(int permits) {setState(permits);}-----------------------//设置AQS中的stateprotected final void setState(int newState) {state = newState;}
获取通行许可——aquire()
//支持中断方式获取,遇到中断抛异常public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//非中断方式获取,遇到中断不抛异常public void acquireUninterruptibly() {sync.acquireShared(1);}
下面我们以支持中断的方式来看看里面的实现源码
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
获取资源的公平实现与非公平实现
公平方式
protected int tryAcquireShared(int acquires) {for (;;) {//如果是公平的方式获取,首先还要判断当前CLH队列里面有没有节点,有则需要排队if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;//如果state减完当前线程需要的资源数小于0,直接返回//否则返回减完剩余的state的值if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}
调用 tryAcquireShared(),以公平的方式获取,首先还要判断当前CLH队列里面有没有节点在阻塞,有则需要排队。如果没有则将当前 state 减去线程所需要的资源数,如果小于0(资源不够)则直接返回,如果大于0则通过CAS修改 state 的值。
非公平方式
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}------------------------------final int nonfairTryAcquireShared(int acquires) {for (;;) {//非公平的方式不需要检查CLH队列中是否有元素,可以直接争夺资源int available = getState();int remaining = available - acquires;//如果state减完当前线程需要的资源数小于0,直接返回//否则返回减完剩余的state的值if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}
非公平方式与公平方式最大的不同就是,非公平方式不需要检查CLH队列中是否有节点正在排队,可以直接去争夺资源。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {//创建节点加入CLH队列,并且把该节点设置为SHARED共享模式final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {//队头节点被唤醒后可以去争夺资int r = tryAcquireShared(arg);//r>0,代表还有资源可以获取,则通过广播的方式唤醒下一个节点去争夺资源if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//将节点加入CLH队列,并将线程进行阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
传播唤醒后续节点
在共享模式下,当前节点获取资源成功后,如果还有资源剩余,则会以广播的方式唤醒后一个节点去争夺资源。如果还有剩余资源,则唤醒的线程获取资源成功后,继续调用 setHeadAndPropagate() 方法,一直广播下去。
private void setHeadAndPropagate(Node node, int propagate) {Node h = head;//将出队的节点置为head加点(哨兵节点)setHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {Node s = node.next;//如果下一个节点是共享模式的节点if (s == null || s.isShared())//将下一个共享节点线程唤醒,继续争夺资源doReleaseShared();}}

释放资源——release()
public void release() {sync.releaseShared(1);}
释放资源,把获取的资源还回去,并且唤醒CLH队列中的节点
public final boolean releaseShared(int arg) {//释放资源if (tryReleaseShared(arg)) {//唤醒CLH队列中的节点doReleaseShared();return true;}return false;}------------------------------protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();//将state加回去int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//CAS修改加回去的stateif (compareAndSetState(current, next))return true;}}
成功修改 state 的值后,唤醒 CLH 队列中的阻塞线程。
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//将head节点状态由SIGNAL改为0//如果修改失败,则说明有别的线程在释放资源,原来的head节点已被其他线程唤醒//此时需要重新获取到新的head节点if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;//唤醒线程unparkSuccessor(h);}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)break;}}
