一、应用
场景一:多线程等待单线程资源
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(ThreadFactory) Thread::new);
public static void main(String[] args) throws InterruptedException {
multipleAwait();
//singleAwait();
}
//多线程等待单个线程资源
private static void multipleAwait() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "等待主线程资源");
countDownLatch.await();
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
TimeUnit.SECONDS.sleep(3);
countDownLatch.countDown();
System.out.println("主线程执行完成");
}
场景二:单线程等待多线程执行完成
//多个线程分别执行任务,全部执行完成,进行汇总。
private void singleAwait() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(ThreadFactory) Thread::new);
CountDownLatch countDownLatch = new CountDownLatch(4);
//多线程等待
for (int i = 0; i <= 3; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
executor.shutdown();
System.out.println("等待其他任务任务执行");
countDownLatch.await();
System.out.println("任务执行完成");
}
执行结果:
等待其他任务任务执行
Thread-1执行完成
Thread-2执行完成
Thread-0执行完成
Thread-1执行完成
任务执行完成
二、原理分析
await方法
尝试获取共享锁
/**
* 模拟。调用获取共享锁方法。
* 如果state不为0.则认为未获取到共享锁。则当前线程进入阻塞队列中。
**/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//底层调用AQS方法.中断则抛出异常
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//countdownlatch实现。如果不等于0,则返回-1.标识获取共享锁失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly方法。获取锁失败逻辑
/**
* 具体过程,如果state!=0,则线程阻塞;等待state=0的那个线程从阻塞队列中唤醒
*
* 获取锁失败处理逻辑。
* 1.创建共享节点,插入同步等待队列中(CLH)
* 2.如果,当前节点是头结点(或者节点重新被唤醒),则再次尝试获取锁。
* 2.1.获取锁成功,则修改等待列表头结点
* 2.2.并尝试唤醒队列中其他节点。(具体过程,如果state!=0,则线程阻塞;等待state=0的那个线程从阻塞队列中唤醒)
* 3.不是头结点,
* 3.1.第一次循环:修改waitstatus=-1
* 3.2.第二次循环:阻塞当前线程。
**/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
await(long timeout, TimeUnit unit)方法
�尝试获取共享锁。当前线程,阻塞timeout时间后,自动唤醒
/**
* 模拟。调用获取共享锁方法。
* 如果state不为0.则认为未获取到共享锁。则【当前线程】进入阻塞队列中,并阻塞指定时间(timeout)。
**/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 如果获取锁成功(即state==0),直接返回
// 不成功,则执行 doAcquireSharedNanos
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//countdownlatch实现。如果不等于0,则返回-1.标识获取共享锁失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedNanos(arg, nanosTimeout)
//使用LockSupport.parkNanos进行定时阻塞
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown方法
释放资源,与共享锁区别。state进行减1操作
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
�doReleaseShared
唤醒阻塞队列中阻塞节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//唤醒节点,需要将[前一个节点]waitstate,-1--->0
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}