一、应用
static Semaphore semaphore = new Semaphore(3);public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));for (int i = 0; i < 20;i++){executor.execute(SemaphoreDemo::exec);}}private static void exec() {try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}finally {semaphore.release();}System.out.println("当前执行线程:" + Thread.currentThread().getName());}
二、源码解析
acquire方法
/*** 从这个信号量获取一个许可,* (如果有)立即返回,将可用许可证数量减一* (如果无)阻塞当前线程直到有可用,或者线程被中断。** 如果没有可用的许可证,则当前线程将出于线程调度目的被禁用,并处于休眠状态,* 结束休眠两种情况之一:* 1.其他一些线程归还许可;* 2.其他线程会中断当前线程。* 如果当前线程: 在进入该方法时设置其中断状态;或 在等待许可证时被打断,* 然后抛出InterruptedException,并清除当前线程的中断状态。* 抛出: InterruptedException–如果当前线程被中断**/public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//如果设置中断标识,则排除异常;【并清理中断标识】if (Thread.interrupted())throw new InterruptedException();//小于0;表示无可用许可if (tryAcquireShared(arg) < 0)//条件中使用cas修改许可数量//阻塞当前线程doAcquireSharedInterruptibly(arg);}
doAcquireSharedInterruptibly方法,获取锁失败逻辑
/*** 具体过程,如果当前state 减去 需要的许可 <0,则线程阻塞;等待有其他线程释放资源,state恢复,获取到资源的线程,唤醒阻塞队列阻塞的线程** 获取锁失败处理逻辑。* 1.创建共享节点,插入同步等待队列中(CLH)* 2.如果,当前节点是头结点(或者节点重新被唤醒),则再次尝试获取锁。* 2.1.获取锁成功,则修改等待列表头结点* 2.2.并尝试唤醒队列中其他节点。(存在获取阻塞队列线程的逻辑)* 3.不是头结点,* 3.1.第一次循环:修改waitstatus=-1* 3.2.第二次循环:阻塞当前线程。**/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//创建共享节点,并插入等待队列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获取当前新建节点前一个节点。如果前一个节点是head,当前新节点是队列中第一个元素final Node p = node.predecessor();if (p == head) {//如果是队列中第一个元素,则再次尝试获取许可(有可能是从阻塞状态被唤醒后,再次执行到此处)int r = tryAcquireShared(arg);if (r >= 0) {//获得许可成功后,//1.将最新节点退化为头结点。//2.如果并唤醒队列中r个共享节点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}/*** 第一个循环,通过shouldParkAfterFailedAcquire,修改waitstatus=-1* 第二个循环,对节点中线程进行休眠。* 如果休眠过程中,被设置中断标识。重新唤醒后,则抛出异常**/if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {//发生异常则清理队列中异常节点if (failed)cancelAcquire(node);}}
setHeadAndPropagate
/*** 设置队列头,并检查后继队列是否在共享模式下等待,如果是,则在“传播>0”或“传播状态”已设置的情况下进行传播***/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** propagate > 0表示存在许可。可以执行唤醒逻辑*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())//唤醒逻辑doReleaseShared();}}
doReleaseShared
//唤醒逻辑private void doReleaseShared() {/***/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
acquireUninterruptibly方法
/*** 从这个信号量获取一个许可,* (如果有)立即返回,将可用许可证数量减一* (如果无)阻塞当前线程直到有可用,或者线程被中断。* 如果没有可用的许可证,则当前线程出于线程调度目的将被禁用,并处于休眠状态,直到其他线程调用此信号量的释放方法,** 如果当前线程在等待许可证时【被中断】,那么它将【继续等待】,获取许可后,将重新设置中断状态**/public void acquireUninterruptibly() {sync.acquireShared(1);}public final void acquireShared(int arg) {//小于0;表示无可用许可if (tryAcquireShared(arg) < 0)//条件中使用cas修改许可数量doAcquireShared(arg);}
doAcquireShared方法
//与上面doAcquireSharedInterruptibly方法逻辑相同,// 区别:如果在休眠过程中,设置了中断标识。【重新唤醒后,不会抛异常】,会重新设置中断标识private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
tryAcquire()方法
/*** 获取许可证(如果有)并立即返回true,将可用许可证的数量减一。* 如果没有可用的许可证,则此方法将立即返回值false。**/public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}//仅有cas操作final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
tryAcquire(long timeout, TimeUnit unit)
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {//设置中断,则抛出异常if (Thread.interrupted())throw new InterruptedException();//如果有可用许可,则立即返回true。//否则阻塞指定时间timeout.则阻塞期间如果获取则返回true。否则返回falsereturn tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
doAcquireSharedNanos方法
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;//获取截止时间纳秒数final long deadline = System.nanoTime() + nanosTimeout;//插入等待队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();//到达截止时间,直接返回falseif (nanosTimeout <= 0L)return false;//修改当前节点前一个元素waitstatus=-1//如果距离截止时间大于1000纳秒,则进行阻塞if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)//阻塞线程LockSupport.parkNanos(this, nanosTimeout);//休眠期间设置中断标识。唤醒后,则抛出异常if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
release方法
public void release() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
doReleaseShared()方法
/*** 唤醒1个节点* 场景一:头结点waitstatus=-1,修改为0,则执行唤醒操作,结束* 场景二:头结点waitstatus=0,则修改为-3,**/private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {//队列不为空int ws = h.waitStatus;if (ws == Node.SIGNAL) {//头结点,waitstatus=-1//完成将-1修改为0,则进行唤醒动作;否则自旋。(可能被其他线程修改为0,或者就是cas操作失败)if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}//如果被其他线程线程修改为0,则把0修改为-3else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}//完成-1-->0;或者0-->-3操作后,则跳出循环if (h == head) // loop if head changedbreak;}}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;//从队尾查找在队列中排在最前面需要唤醒节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
�
