CountDownLatch

CountDownLatch是一个同步功能的辅助类,其效果是给定一个计数(count),当使用这个CountDownLatch类的线程判断计数不为0时,则处于wait阻塞状态,如果为0则继续执行。
实现等待与继续运行的效果分别需要使用await()和countDown()方法来进行。调用await()方法时判断当前计数count是否为0,不为0则阻塞等待;其他线程调用countDown()方法将当前计数减1,当减到0是等到线程运行。

CountDownLatch也是使用AQS实现,AQS state值就是当前计数count

  1. public class CountDownLatch {
  2. // 同步控制的计数锁。使用AQS表示计数。
  3. private static final class Sync extends AbstractQueuedSynchronizer {
  4. private static final long serialVersionUID = 4982264981922014374L;
  5. Sync(int count) {
  6. setState(count); // state表示当前计数
  7. }
  8. int getCount() {
  9. return getState();
  10. }
  11. protected int tryAcquireShared(int acquires) {
  12. return (getState() == 0) ? 1 : -1;
  13. }
  14. protected boolean tryReleaseShared(int releases) {
  15. // Decrement count; signal when transition to zero
  16. for (;;) {
  17. int c = getState();
  18. if (c == 0)
  19. return false;
  20. int nextc = c-1;
  21. if (compareAndSetState(c, nextc))
  22. return nextc == 0;
  23. }
  24. }
  25. }
  26. private final Sync sync;
  27. /**
  28. * 用给定的计数构造一个countdownlock。
  29. */
  30. public CountDownLatch(int count) {
  31. if (count < 0) throw new IllegalArgumentException("count < 0");
  32. this.sync = new Sync(count);
  33. }
  34. /**
  35. * 导致当前线程等待,直到锁存数被计算为零,除非线程被中断。
  36. */
  37. public void await() throws InterruptedException {
  38. sync.acquireSharedInterruptibly(1);
  39. }
  40. /**
  41. * 导致当前线程等待,直到存锁被计算为零,除非线程被中断,或者指定的等待时间流逝。
  42. * 如果当前计数为零,那么该方法将立即返回true。
  43. */
  44. public boolean await(long timeout, TimeUnit unit)
  45. throws InterruptedException {
  46. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  47. }
  48. /**
  49. *如果计数达到零,释放所有等待线程的计数。
  50. *如果电流计数大于零,那么它就会递减。
  51. *如果新计数为零,那么所有等待线程都重新启用以实现线程调度。
  52. *如果当前计数等于0,那么什么都没有发生。
  53. */
  54. public void countDown() {
  55. sync.releaseShared(1);
  56. }
  57. /**
  58. * 获取当前计数
  59. */
  60. public long getCount() {
  61. return sync.getCount();
  62. }
  63. }
  1. public class Code01_CountDownLatchDemo {
  2. private CountDownLatch countDownLatch = new CountDownLatch(1);
  3. public void workStart() {
  4. System.out.println("test begin");
  5. try {
  6. countDownLatch.await();
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println("test end");
  11. }
  12. public void workDown() {
  13. System.out.println("work over");
  14. countDownLatch.countDown();
  15. }
  16. public static void main(String[] args) {
  17. Code01_CountDownLatchDemo t = new Code01_CountDownLatchDemo();
  18. new Thread(t::workStart).start();
  19. try {
  20. TimeUnit.SECONDS.sleep(2);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. new Thread(t::workDown).start();
  25. }
  26. }

CyclicBarrier

CyclicBarrier 不仅具有CountdownLatch全部功能,还可以实现阶段性同步功能,其使用意义在于可以循环地实现线程要一起做任务的目标,而CountDownLatch仅仅支持一次线程与同步点阻塞。
CyclicBarrier允许一组线程互相等待,直到某个公共屏障点(common barrier point),这些线程必须实时互相等待,可使用CyclicBarrier实现。

CyclicBarrier 底层是使用ReentrantLock + Condition 实现,ReenTrantLcok底层是AQS实现。

  1. /** ReentrantLock */
  2. private final ReentrantLock lock = new ReentrantLock();
  3. /** Condition 等待队列 */
  4. private final Condition trip = lock.newCondition();
  5. /** 一组参与者的数量(需要等待的数量) */
  6. private final int parties;
  7. /* 参与者到齐后的回调 */
  8. private final Runnable barrierCommand;
  9. /** 当前这一组*/
  10. private Generation generation = new Generation();
  11. /** 还需要等待的数量 */
  12. private int count;
  13. public CyclicBarrier(int parties, Runnable barrierAction) {
  14. if (parties <= 0) throw new IllegalArgumentException();
  15. this.parties = parties;
  16. this.count = parties;
  17. this.barrierCommand = barrierAction;
  18. }
  19. public CyclicBarrier(int parties) {
  20. this(parties, null);
  21. }

await方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}


public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
BrokenBarrierException,
TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

ClyclicBarrier#await 方法都是调用了 dowait方法,dowait方法实现:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    // 实例的ReentrantLock 
    final ReentrantLock lock = this.lock;

    // 获取锁
    lock.lock();
    try {
        // 当前的这一组
        final Generation g = generation;

        // 当前这一组Barrier是否broken(前面有线程被interttupt)
        if (g.broken)
            // 抛出异常
            throw new BrokenBarrierException();
        // 当前线程是否被中断
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 编号为cout-1
        int index = --count;

        // 为最后一个需要等待的线程
        if (index == 0) {  // tripped         
            boolean ranAction = false;
            try {
                // 等齐的后执行的线程
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 执行结束开启下一代(下一组等待)
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await(); // 添加到condition等待队列
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    // 如果线程被interrupt,则开启新的一代,当前代broken==true,抛出异常
                    breakBarrier();  
                    throw ie;
                } else {
                    // 当前代被被破坏了
                    Thread.currentThread().interrupt();
                }
            }

            // 如果当前组被破坏
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果不是当前这一代,返回index
            if (g != generation)
                return index;
            // 如果等待时间到了
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();  // 唤醒全部等待线程,开启新的一代,重新await 
}

reset

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation  // 开始新的一代
    } finally {
        lock.unlock();
    }
}
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();  // 唤醒等待队列全部线程
    // set up next generation
    count = parties;
    generation = new Generation();  // 重新一代
}

Semaphore

Swmaphore主要是用于限制并发线程数量,是基于AQS实现。有公平实现和非公平实现。AQS state值表示可用的权限数量。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);  // 调用非公平实现
}

// true  调用公平实现,否则调用非公平实现
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

公平实现、非公平实现都是调用Sync

FairSync(int permits) {
    super(permits);
}

NonfairSync(int permits) {
    super(permits);
}
Sync(int permits) {
    setState(permits);  // state值表示权限可用数量
}

acquire

等待获取许可,阻塞直到需要的数量许可可用或者线程被中断。acquire() 默认需要1个许可,acquire(int permits)可指定需要许可数量。底层调用AQS#acquireSharedInterruptibly

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)  // tryAcquireShared 由子类实现
        doAcquireSharedInterruptibly(arg); // 如果剩余数量小于0 ,调用AQS#doAcquireSharedInterruptibly
}

非公平Semaphore#tryAcquireShared

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires); // 调用Sync#nonfairTryAcquireShared
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 当前state量 
        int available = getState();
        // 剩余权限数量
        int remaining = available - acquires;
        if (remaining < 0 ||  // 剩余权限数量小于0
            compareAndSetState(available, remaining))  // CAS获取权限 
            return remaining;  // 返回剩余数量 
    }
}

公平Semaphore#tryAcquireShared

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors()) // ASQ#hasQueuedPredecessors AQS阻塞等待有一个头结点
            return -1;
        // 当前线程位于AQS等待队列头部
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

acquireUninterruptibly

acquireUninterruptibly,阻塞等待获取许可,直到获取到许可,与acquire相比,
acquireUninterruptibly不可被中断。

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)  // 子类实现tryAcquireShared
        doAcquireShared(arg);
}

tryAcquire

tryAcquire() 直接调用sync.nonfairTryAcquireShared(1),不会被阻塞。
tryAcquire(long timeout, TimeUnit unit),调用AQS#tryAcquireSharedNanos,阻塞,获取到许可或者等待时间后自动返回。

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||  
        doAcquireSharedNanos(arg, nanosTimeout);
}

release

释放许,底层是AQS#releaseShared是按

public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {  // tryRelease 由子类实现
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 当前state值
        int current = getState();
        // 归还权限后的state值
        int next = current + releases;
        // 超出权限数量最大限制
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))  // CAS操作归还权限
            return true;
    }
}

Exchanger

Exchanger 的功能是使2个线程之间传输数据。

public class Code02_ExchangerDemo01 {

    @SuppressWarnings("all")
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        Thread thead1 = new Thread(() -> {

            try {
                System.out.println(Thread.currentThread().getName() + "获取到线程2的值:  " + exchanger.exchange("线程1的值:ONE"));
                System.out.println("线程1结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程1");

        Thread thead2 = new Thread(() -> {

            try {
                System.out.println(Thread.currentThread().getName() + "获取到线程1的值:  " + exchanger.exchange("线程2的值:TWO"));
                System.out.println("线程2结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "线程2");

        thead1.start();
        thead2.start();
    }

}

Phaser

一个可重用的同步屏障,在功能上类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。

Registration: 与CountDonwnLatch、CyclicBarrier不同,Phaser注册的同步参与者数量(parties)可以随时间变化(可使用register、bulkRegister、构造函数初始化参与者数量(parties的值),可以使用arriveAndDeregister注销)。 Synchronization:与CyclicBarrier一样,Phaser也可以被反复等待。方法arriveAndAwaitAdvance的作用类似于CyclicBarrier.await。Phaser每一代都有一个phase number,从0开始,所有参与者到达时加1,达到Integer.MAX_VALUE时又回到0。 Arrival:方法arrive和arriveAndDeregister记录到达者。这两个方法非阻塞,只返回到达的phaser 的phase number。当给定阶段的最后一方到达时,将执行一个可选操作,并推进该阶段。 Waiting:方法awaitAdvance需要一个参数来指示到达阶段号,并在相位器前进到(或已经处于)不同阶段时返回,该方法不可被中断。 Termination:Phaser可以进入终止状态,可以通过方法isTerminated方法检查状态。当进入终止状态,所有同步方法立即返回。当调用onAdvance返回true时触发终止。 Tiering:相位器可以分层以减少争用。具有大量参与方的相位器可能会经历大量的同步争用成本,因此可以将其设置为一组子相位器共享一个公共父节点。这可能会大大增加吞吐量,尽管它会导致更大的操作开销。 Monitoring:虽然同步方法只能由已注册的参与者调用,但是相位器的当前状态可以由任何调用方监视。在任何给定时刻,getRegisteredParties返回总共参与者的数量,其中getArrivedParties已经到达当前阶段(getPhase)的参与者。当剩余的(getUnarrivedParties)方到达时,阶段向前推进。

/**
state值的
0-15位:表示unarrived
16-31位:表示wait
32-62位:表示terminated
63位:符号位
*/
private volatile long state;
  • arriveAndAwaitAnvance 到达屏障处等待
  • arriveAndDeregister 到达屏障出取消参与,parties数减1
  • getPhase 获取已经到达第几个屏障
  • onAdvance 通过新的屏障是被调用
  • getRegisteredParties 获得注册的parties数量
  • register 动态地添加一个parties的值
  • bulkRegister 批量增加partise的值
  • getArrivedParties 获取已经到达的parties数
  • getUnarrivedParties 获取没有到达的parties数
  • arrive 是到达的parties的值加1,并且不在屏障处等待,继续执行
  • awaitAdvance(int phase) 在第几个屏障处等待(如果getPhase方法返回的值等于传入的参数,则在屏障处等待,否则继续运行
  • awaitAdvanceInterruptibly(int) 是可被中断的,而arriveAndAwaitAnvance 是不可被中断的
  • forceTermination 是phaser屏障功能失效
  • isTerminated 判断phaser对象是否是销毁状态