Semaphore
Semaphore也就是信号量,提供了资源数量的并发访问控制

public class Main {public static void main(String[] args) {Semaphore semaphore = new Semaphore(1);for (int i = 0; i < 5; i++) {new MyThread("学生-" + (i + 1), semaphore).start();}}}class MyThread extends Thread {private final Semaphore semaphore;private final Random random = new Random();public MyThread(String name, Semaphore semaphore) {super(name);this.semaphore = semaphore;}@Overridepublic void run() {try {// 获取信标:抢座semaphore.acquire();// 抢到之后开始写作业System.out.println(Thread.currentThread().getName() + " - 抢到了座位,开始写作业");Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName() + " - 作业写完,腾出座位");} catch (InterruptedException e) {e.printStackTrace();}// 释放信标:腾出座位semaphore.release();}}
CountDownLatch
假设一个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:


public class CountDownLatchClient {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {new MyThread("线程" + (i + 1), latch).start();}// main线程等待latch.await();System.out.println("main线程执行结束");}}class MyThread extends Thread {private final CountDownLatch latch;private final Random random = new Random();public MyThread(String name, CountDownLatch latch) {super(name);this.latch = latch;}@Overridepublic void run() {try {Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " - 执行完毕");// latch计数减一latch.countDown();}}"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" "-javaagent:E:\software\dev_tool\idea\exec\IntelliJ IDEA 2021.1.2\lib\idea_rt.jar=55649:E:\software\dev_tool\idea\exec\IntelliJ IDEA 2021.1.2\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\dev_need\workspace\concurrent-java\basic-demo\target\classes;C:\Users\wpp25\.m2\repository\junit\junit\4.12\junit-4.12.jar;C:\Users\wpp25\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar" com.wpp.concurrent.util.countdownlatchdemo.CountDownLatchClient线程3 - 执行完毕线程2 - 执行完毕线程1 - 执行完毕线程4 - 执行完毕线程5 - 执行完毕main线程执行结束
await()实现分析
await()调用的是AQS 的模板方法,这个方法在前面已经介绍过。 CountDownLatch.Sync重新实现了tryAccuqireShared方法:
CountDownLatch
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {// AQS的模板方法return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}
AbstractQueuedSynchronizer
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
CountDownLatch
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
从tryAcquireShared(…)方法的实现来看,只要state != 0,调用await()方法的线程便会被放入AQS
的阻塞队列,进入阻塞状态。countDown()实现分析
CountDownLatch
public void countDown() {// AQS的模板方法sync.releaseShared(1);}
AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
CountDownLatch
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared(…)由
CountDownLatch.Sync实现。从上面的代码可以看出,只有state=0,tryReleaseShared(…)才会返回
true,然后执行doReleaseShared(…),一次性唤醒队列中所有阻塞的线程。
总结:由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过
countDown()一直减state,减到0后一次性唤醒所有线程。如下图所示,假设初始总数为M,N个线程
await(),M个线程countDown(),减到0之后,N个线程被唤醒
CyclicBarrier
该类用于协调多个线程同步执行操作的场合。
使用场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。
首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。
把10个人看作10个线程,10个线程之间的同步过程如下图所示:
在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2
个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。
public class CyclicBarrierClient {public static void main(String[] args) {// CyclicBarrier barrier = new CyclicBarrier(5);CyclicBarrier barrier = new CyclicBarrier(5, () -> {System.out.println("该阶段结束");});for (int i = 0; i < 5; i++) {new MyThread("线程-" + (i + 1), barrier).start();}}}class MyThread extends Thread {private final CyclicBarrier barrier;private final Random random = new Random();public MyThread(String name, CyclicBarrier barrier) {super(name);this.barrier = barrier;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " - 向公司出发");Thread.sleep(random.nextInt(5000));System.out.println(Thread.currentThread().getName() + " - 已经到达公司");// 等待其他线程该阶段结束barrier.await();System.out.println(Thread.currentThread().getName() + " - 开始笔试");Thread.sleep(random.nextInt(5000));System.out.println(Thread.currentThread().getName() + " - 笔试结束");// 等待其他线程该阶段结束barrier.await();System.out.println(Thread.currentThread().getName() + " - 开始面试");Thread.sleep(random.nextInt(5000));System.out.println(Thread.currentThread().getName() + " - 面试结束");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}
Exchanger
Exchanger用于线程之间交换数据
public class ExchangeClient {private static final Random random = new Random();public static void main(String[] args) {// 建一个多线程共用的exchange对象// 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自己的数据作为参数// 传递进去,返回值是另外一个线程调用exchange传进去的参数Exchanger<String> exchanger = new Exchanger<>();new Thread("线程1") {@Overridepublic void run() {while (true) {try {// 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调用exchange为止。String otherData = exchanger.exchange("交换数据1");System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();new Thread("线程2") {@Overridepublic void run() {while (true) {try {String otherData = exchanger.exchange("交换数据2");System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();new Thread("线程3") {@Overridepublic void run() {while (true) {try {String otherData = exchanger.exchange("交换数据3");System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();}}
Phaser
从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。
public class FunOfCountDownLatch {public static void main(String[] args) {Phaser phaser = new Phaser(5);for (int i = 0; i < 5; i++) {new Thread("线程-" + (i + 1)) {private final Random random = new Random();@Overridepublic void run() {System.out.println(getName() + " - 开始运行");try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(getName() + " - 运行结束");phaser.arrive();}}.start();}System.out.println("线程启动完毕");System.out.println(phaser.getPhase());// phaser.awaitAdvance(phaser.getPhase());phaser.awaitAdvance(0);System.out.println("线程运行结束");}}
public class FunOfCyclicBarrier {public static void main(String[] args) {Phaser phaser = new Phaser(5);for (int i = 0; i < 5; i++) {new MyThread("线程-" + (i + 1), phaser).start();}phaser.awaitAdvance(0);System.out.println("运行结束");}}class MyThread extends Thread {private final Phaser phaser;private final Random random = new Random();public MyThread(String name, Phaser phaser) {super(name);this.phaser = phaser;}@Overridepublic void run() {System.out.println(getName() + " - 开始向公司出发");slowly();System.out.println(getName() + " - 已经到达公司");// 到达同步点,等待其他线程phaser.arriveAndAwaitAdvance();System.out.println(getName() + " - 开始笔试");slowly();System.out.println(getName() + " - 笔试结束");// 到达同步点,等待其他线程phaser.arriveAndAwaitAdvance();System.out.println(getName() + " - 开始面试");slowly();System.out.println(getName() + " - 面试结束");}private void slowly() {try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}
