一、应用
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
for (int i = 0; i < 20;i++){
executor.execute(SemaphoreDemo::exec);
}
}
private static void exec() {
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
System.out.println("当前执行线程:" + Thread.currentThread().getName());
}
二、源码解析
acquire方法
/**
* 从这个信号量获取一个许可,
* (如果有)立即返回,将可用许可证数量减一
* (如果无)阻塞当前线程直到有可用,或者线程被中断。
*
* 如果没有可用的许可证,则当前线程将出于线程调度目的被禁用,并处于休眠状态,
* 结束休眠两种情况之一:
* 1.其他一些线程归还许可;
* 2.其他线程会中断当前线程。
* 如果当前线程: 在进入该方法时设置其中断状态;或 在等待许可证时被打断,
* 然后抛出InterruptedException,并清除当前线程的中断状态。
* 抛出: InterruptedException–如果当前线程被中断
**/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果设置中断标识,则排除异常;【并清理中断标识】
if (Thread.interrupted())
throw new InterruptedException();
//小于0;表示无可用许可
if (tryAcquireShared(arg) < 0)//条件中使用cas修改许可数量
//阻塞当前线程
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly方法,获取锁失败逻辑
/**
* 具体过程,如果当前state 减去 需要的许可 <0,则线程阻塞;等待有其他线程释放资源,state恢复,获取到资源的线程,唤醒阻塞队列阻塞的线程
*
* 获取锁失败处理逻辑。
* 1.创建共享节点,插入同步等待队列中(CLH)
* 2.如果,当前节点是头结点(或者节点重新被唤醒),则再次尝试获取锁。
* 2.1.获取锁成功,则修改等待列表头结点
* 2.2.并尝试唤醒队列中其他节点。(存在获取阻塞队列线程的逻辑)
* 3.不是头结点,
* 3.1.第一次循环:修改waitstatus=-1
* 3.2.第二次循环:阻塞当前线程。
**/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//创建共享节点,并插入等待队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前新建节点前一个节点。如果前一个节点是head,当前新节点是队列中第一个元素
final Node p = node.predecessor();
if (p == head) {
//如果是队列中第一个元素,则再次尝试获取许可(有可能是从阻塞状态被唤醒后,再次执行到此处)
int r = tryAcquireShared(arg);
if (r >= 0) {
//获得许可成功后,
//1.将最新节点退化为头结点。
//2.如果并唤醒队列中r个共享节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
/**
* 第一个循环,通过shouldParkAfterFailedAcquire,修改waitstatus=-1
* 第二个循环,对节点中线程进行休眠。
* 如果休眠过程中,被设置中断标识。重新唤醒后,则抛出异常
**/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//发生异常则清理队列中异常节点
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
/**
* 设置队列头,并检查后继队列是否在共享模式下等待,如果是,则在“传播>0”或“传播状态”已设置的情况下进行传播
*
**/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* propagate > 0表示存在许可。可以执行唤醒逻辑
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//唤醒逻辑
doReleaseShared();
}
}
doReleaseShared
//唤醒逻辑
private void doReleaseShared() {
/*
*
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
acquireUninterruptibly方法
/**
* 从这个信号量获取一个许可,
* (如果有)立即返回,将可用许可证数量减一
* (如果无)阻塞当前线程直到有可用,或者线程被中断。
* 如果没有可用的许可证,则当前线程出于线程调度目的将被禁用,并处于休眠状态,直到其他线程调用此信号量的释放方法,
*
* 如果当前线程在等待许可证时【被中断】,那么它将【继续等待】,获取许可后,将重新设置中断状态
**/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
//小于0;表示无可用许可
if (tryAcquireShared(arg) < 0)//条件中使用cas修改许可数量
doAcquireShared(arg);
}
doAcquireShared方法
//与上面doAcquireSharedInterruptibly方法逻辑相同,
// 区别:如果在休眠过程中,设置了中断标识。【重新唤醒后,不会抛异常】,会重新设置中断标识
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire()方法
/**
* 获取许可证(如果有)并立即返回true,将可用许可证的数量减一。
* 如果没有可用的许可证,则此方法将立即返回值false。
**/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
//仅有cas操作
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
tryAcquire(long timeout, TimeUnit unit)
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
//设置中断,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//如果有可用许可,则立即返回true。
//否则阻塞指定时间timeout.则阻塞期间如果获取则返回true。否则返回false
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
doAcquireSharedNanos方法
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//获取截止时间纳秒数
final long deadline = System.nanoTime() + nanosTimeout;
//插入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
//到达截止时间,直接返回false
if (nanosTimeout <= 0L)
return false;
//修改当前节点前一个元素waitstatus=-1
//如果距离截止时间大于1000纳秒,则进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//阻塞线程
LockSupport.parkNanos(this, nanosTimeout);
//休眠期间设置中断标识。唤醒后,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release方法
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared()方法
/**
* 唤醒1个节点
* 场景一:头结点waitstatus=-1,修改为0,则执行唤醒操作,结束
* 场景二:头结点waitstatus=0,则修改为-3,
**/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {//队列不为空
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//头结点,waitstatus=-1
//完成将-1修改为0,则进行唤醒动作;否则自旋。(可能被其他线程修改为0,或者就是cas操作失败)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果被其他线程线程修改为0,则把0修改为-3
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//完成-1-->0;或者0-->-3操作后,则跳出循环
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从队尾查找在队列中排在最前面需要唤醒节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
�