一、应用
场景一:多线程等待单线程资源
static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5,100,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),(ThreadFactory) Thread::new);public static void main(String[] args) throws InterruptedException {multipleAwait();//singleAwait();}//多线程等待单个线程资源private static void multipleAwait() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 3; i++) {executor.execute(() -> {try {System.out.println(Thread.currentThread().getName() + "等待主线程资源");countDownLatch.await();TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "执行完成");} catch (InterruptedException e) {e.printStackTrace();}});}executor.shutdown();TimeUnit.SECONDS.sleep(3);countDownLatch.countDown();System.out.println("主线程执行完成");}
场景二:单线程等待多线程执行完成
//多个线程分别执行任务,全部执行完成,进行汇总。private void singleAwait() throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5,100,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),(ThreadFactory) Thread::new);CountDownLatch countDownLatch = new CountDownLatch(4);//多线程等待for (int i = 0; i <= 3; i++) {executor.execute(() -> {try {TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "执行完成");} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}});}executor.shutdown();System.out.println("等待其他任务任务执行");countDownLatch.await();System.out.println("任务执行完成");}执行结果:等待其他任务任务执行Thread-1执行完成Thread-2执行完成Thread-0执行完成Thread-1执行完成任务执行完成
二、原理分析
await方法
尝试获取共享锁
/*** 模拟。调用获取共享锁方法。* 如果state不为0.则认为未获取到共享锁。则当前线程进入阻塞队列中。**/public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//底层调用AQS方法.中断则抛出异常public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}//countdownlatch实现。如果不等于0,则返回-1.标识获取共享锁失败protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
doAcquireSharedInterruptibly方法。获取锁失败逻辑
/*** 具体过程,如果state!=0,则线程阻塞;等待state=0的那个线程从阻塞队列中唤醒** 获取锁失败处理逻辑。* 1.创建共享节点,插入同步等待队列中(CLH)* 2.如果,当前节点是头结点(或者节点重新被唤醒),则再次尝试获取锁。* 2.1.获取锁成功,则修改等待列表头结点* 2.2.并尝试唤醒队列中其他节点。(具体过程,如果state!=0,则线程阻塞;等待state=0的那个线程从阻塞队列中唤醒)* 3.不是头结点,* 3.1.第一次循环:修改waitstatus=-1* 3.2.第二次循环:阻塞当前线程。**/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
await(long timeout, TimeUnit unit)方法
�尝试获取共享锁。当前线程,阻塞timeout时间后,自动唤醒
/*** 模拟。调用获取共享锁方法。* 如果state不为0.则认为未获取到共享锁。则【当前线程】进入阻塞队列中,并阻塞指定时间(timeout)。**/public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}// 如果获取锁成功(即state==0),直接返回// 不成功,则执行 doAcquireSharedNanospublic final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}//countdownlatch实现。如果不等于0,则返回-1.标识获取共享锁失败protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
doAcquireSharedNanos(arg, nanosTimeout)
//使用LockSupport.parkNanos进行定时阻塞private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
countDown方法
释放资源,与共享锁区别。state进行减1操作
public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
�doReleaseShared
唤醒阻塞队列中阻塞节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;//唤醒节点,需要将[前一个节点]waitstate,-1--->0if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
