CountDownLatch(线程计数器)
countDown()计数器递减
public void countDown() {sync.releaseShared(1);}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
唤醒共享锁队列的线程
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;//如果节点状态为 SIGNAL 表示可以被唤醒if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
await()阻塞等待 CountDownLatch变成0在释放执行后面逻辑
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//state 不为 0 的时候 进行阻塞if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {//根据判断结果获取锁int r = tryAcquireShared(arg);//这里不会走 因为 外面判断了 r<0if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&//阻塞等待parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
Semaphore(信号量)
假设初始时 是5 每次调用 release()方法 都针对 state进行递减 因此当 state令牌==5的时候 意味着所有的令牌都被使用完了 后续调用的线程都会以共享类型加入到 CLH队列中 而当 state<5时 说明有其他线程释放了令牌 可以从clh队列中唤醒头部的线程
acquire()获取令牌
release()释放令牌
tryAcquire()
尝试获取指定数量的令牌 此过程是非阻塞的 如果令牌数不够 则返回false否则返回true
drainPermits()
hasQueuedThreads()
判断当前 Semaphore实例是否存在正在等待令牌的线程
CyclicBarrier
基本使用
public static void main(String[] args) {int parties =4;CyclicBarrier cyclicBarrier = new CyclicBarrier(parties,() ->{System.out.println("所有线程执行完开始执行");});for (int i = 0; i <parties ; i++) {new ImportDataTask(cyclicBarrier).start();}}static class ImportDataTask extends Thread {private CyclicBarrier cyclicBarrier;public ImportDataTask(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"线程执行完毕");cyclicBarrier.await();}}}
原理
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock(); //获取锁try {final Generation g = generation;if (g.broken) //确认当前的 genration的barrier是否失效throw new BrokenBarrierException();//线程是否中断if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;//如果到达了屏障 需要去释放被阻塞的线程if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)//如果 CylicBarrier回调不为空 直接触发回调command.run();ranAction = true;//进入下一个屏障周期nextGeneration();return 0;} finally {//回调任务失败if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out//自旋 等待到达屏障点或线程中断 等待超时等for (;;) {try {//是否有超时时间 如果没有直接阻塞if (!timed)trip.await();else if (nanos > 0L)//采用超时等待机制nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
