Semaphore

Semaphore也就是信号量,提供了资源数量的并发访问控制

image.png

  1. public class Main {
  2. public static void main(String[] args) {
  3. Semaphore semaphore = new Semaphore(1);
  4. for (int i = 0; i < 5; i++) {
  5. new MyThread("学生-" + (i + 1), semaphore).start();
  6. }
  7. }
  8. }
  9. class MyThread extends Thread {
  10. private final Semaphore semaphore;
  11. private final Random random = new Random();
  12. public MyThread(String name, Semaphore semaphore) {
  13. super(name);
  14. this.semaphore = semaphore;
  15. }
  16. @Override
  17. public void run() {
  18. try {
  19. // 获取信标:抢座
  20. semaphore.acquire();
  21. // 抢到之后开始写作业
  22. System.out.println(Thread.currentThread().getName() + " - 抢到了座位,开始写作业");
  23. Thread.sleep(random.nextInt(1000));
  24. System.out.println(Thread.currentThread().getName() + " - 作业写完,腾出座位");
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. // 释放信标:腾出座位
  29. semaphore.release();
  30. }
  31. }

image.png

CountDownLatch

假设一个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:

image.png
image.png

  1. public class CountDownLatchClient {
  2. public static void main(String[] args) throws InterruptedException {
  3. CountDownLatch latch = new CountDownLatch(5);
  4. for (int i = 0; i < 5; i++) {
  5. new MyThread("线程" + (i + 1), latch).start();
  6. }
  7. // main线程等待
  8. latch.await();
  9. System.out.println("main线程执行结束");
  10. }
  11. }
  12. class MyThread extends Thread {
  13. private final CountDownLatch latch;
  14. private final Random random = new Random();
  15. public MyThread(String name, CountDownLatch latch) {
  16. super(name);
  17. this.latch = latch;
  18. }
  19. @Override
  20. public void run() {
  21. try {
  22. Thread.sleep(random.nextInt(2000));
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. System.out.println(Thread.currentThread().getName() + " - 执行完毕");
  27. // latch计数减一
  28. latch.countDown();
  29. }
  30. }
  31. "C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" "-javaagent:E:\software\dev_tool\idea\exec\IntelliJ IDEA 2021.1.2\lib\idea_rt.jar=55649:E:\software\dev_tool\idea\exec\IntelliJ IDEA 2021.1.2\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\dev_need\workspace\concurrent-java\basic-demo\target\classes;C:\Users\wpp25\.m2\repository\junit\junit\4.12\junit-4.12.jar;C:\Users\wpp25\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar" com.wpp.concurrent.util.countdownlatchdemo.CountDownLatchClient
  32. 线程3 - 执行完毕
  33. 线程2 - 执行完毕
  34. 线程1 - 执行完毕
  35. 线程4 - 执行完毕
  36. 线程5 - 执行完毕
  37. main线程执行结束

image.png

await()实现分析

await()调用的是AQS 的模板方法,这个方法在前面已经介绍过。 CountDownLatch.Sync重新实现了tryAccuqireShared方法:

  • CountDownLatch

    1. public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    2. // AQS的模板方法
    3. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    4. }
  • AbstractQueuedSynchronizer

    1. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. return tryAcquireShared(arg) >= 0 ||
    5. doAcquireSharedNanos(arg, nanosTimeout);
    6. }
  • CountDownLatch

    1. protected int tryAcquireShared(int acquires) {
    2. return (getState() == 0) ? 1 : -1;
    3. }

    从tryAcquireShared(…)方法的实现来看,只要state != 0,调用await()方法的线程便会被放入AQS
    的阻塞队列,进入阻塞状态。

    countDown()实现分析

  • CountDownLatch

    1. public void countDown() {
    2. // AQS的模板方法
    3. sync.releaseShared(1);
    4. }
  • AbstractQueuedSynchronizer

    1. public final boolean releaseShared(int arg) {
    2. if (tryReleaseShared(arg)) {
    3. doReleaseShared();
    4. return true;
    5. }
    6. return false;
    7. }
  • CountDownLatch

    1. protected boolean tryReleaseShared(int releases) {
    2. // Decrement count; signal when transition to zero
    3. for (;;) {
    4. int c = getState();
    5. if (c == 0)
    6. return false;
    7. int nextc = c-1;
    8. if (compareAndSetState(c, nextc))
    9. return nextc == 0;
    10. }
    11. }

    countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared(…)由
    CountDownLatch.Sync实现。从上面的代码可以看出,只有state=0,tryReleaseShared(…)才会返回
    true,然后执行doReleaseShared(…),一次性唤醒队列中所有阻塞的线程。

总结:由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过
countDown()一直减state,减到0后一次性唤醒所有线程。如下图所示,假设初始总数为MN个线程
await(),M个线程countDown(),减到0之后,N个线程被唤醒
image.png

CyclicBarrier

该类用于协调多个线程同步执行操作的场合。

使用场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。
首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。
把10个人看作10个线程,10个线程之间的同步过程如下图所示:
image.png
在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2
个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。

  1. public class CyclicBarrierClient {
  2. public static void main(String[] args) {
  3. // CyclicBarrier barrier = new CyclicBarrier(5);
  4. CyclicBarrier barrier = new CyclicBarrier(5, () -> {
  5. System.out.println("该阶段结束");
  6. });
  7. for (int i = 0; i < 5; i++) {
  8. new MyThread("线程-" + (i + 1), barrier).start();
  9. }
  10. }
  11. }
  12. class MyThread extends Thread {
  13. private final CyclicBarrier barrier;
  14. private final Random random = new Random();
  15. public MyThread(String name, CyclicBarrier barrier) {
  16. super(name);
  17. this.barrier = barrier;
  18. }
  19. @Override
  20. public void run() {
  21. try {
  22. System.out.println(Thread.currentThread().getName() + " - 向公司出发");
  23. Thread.sleep(random.nextInt(5000));
  24. System.out.println(Thread.currentThread().getName() + " - 已经到达公司");
  25. // 等待其他线程该阶段结束
  26. barrier.await();
  27. System.out.println(Thread.currentThread().getName() + " - 开始笔试");
  28. Thread.sleep(random.nextInt(5000));
  29. System.out.println(Thread.currentThread().getName() + " - 笔试结束");
  30. // 等待其他线程该阶段结束
  31. barrier.await();
  32. System.out.println(Thread.currentThread().getName() + " - 开始面试");
  33. Thread.sleep(random.nextInt(5000));
  34. System.out.println(Thread.currentThread().getName() + " - 面试结束");
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. } catch (BrokenBarrierException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }

Exchanger

Exchanger用于线程之间交换数据

  1. public class ExchangeClient {
  2. private static final Random random = new Random();
  3. public static void main(String[] args) {
  4. // 建一个多线程共用的exchange对象
  5. // 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自己的数据作为参数
  6. // 传递进去,返回值是另外一个线程调用exchange传进去的参数
  7. Exchanger<String> exchanger = new Exchanger<>();
  8. new Thread("线程1") {
  9. @Override
  10. public void run() {
  11. while (true) {
  12. try {
  13. // 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调用exchange为止。
  14. String otherData = exchanger.exchange("交换数据1");
  15. System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
  16. Thread.sleep(random.nextInt(2000));
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. }.start();
  23. new Thread("线程2") {
  24. @Override
  25. public void run() {
  26. while (true) {
  27. try {
  28. String otherData = exchanger.exchange("交换数据2");
  29. System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
  30. Thread.sleep(random.nextInt(2000));
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. }.start();
  37. new Thread("线程3") {
  38. @Override
  39. public void run() {
  40. while (true) {
  41. try {
  42. String otherData = exchanger.exchange("交换数据3");
  43. System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
  44. Thread.sleep(random.nextInt(2000));
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. }.start();
  51. }
  52. }

Phaser

从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

  1. public class FunOfCountDownLatch {
  2. public static void main(String[] args) {
  3. Phaser phaser = new Phaser(5);
  4. for (int i = 0; i < 5; i++) {
  5. new Thread("线程-" + (i + 1)) {
  6. private final Random random = new Random();
  7. @Override
  8. public void run() {
  9. System.out.println(getName() + " - 开始运行");
  10. try {
  11. Thread.sleep(random.nextInt(1000));
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. System.out.println(getName() + " - 运行结束");
  16. phaser.arrive();
  17. }
  18. }.start();
  19. }
  20. System.out.println("线程启动完毕");
  21. System.out.println(phaser.getPhase());
  22. // phaser.awaitAdvance(phaser.getPhase());
  23. phaser.awaitAdvance(0);
  24. System.out.println("线程运行结束");
  25. }
  26. }
  1. public class FunOfCyclicBarrier {
  2. public static void main(String[] args) {
  3. Phaser phaser = new Phaser(5);
  4. for (int i = 0; i < 5; i++) {
  5. new MyThread("线程-" + (i + 1), phaser).start();
  6. }
  7. phaser.awaitAdvance(0);
  8. System.out.println("运行结束");
  9. }
  10. }
  11. class MyThread extends Thread {
  12. private final Phaser phaser;
  13. private final Random random = new Random();
  14. public MyThread(String name, Phaser phaser) {
  15. super(name);
  16. this.phaser = phaser;
  17. }
  18. @Override
  19. public void run() {
  20. System.out.println(getName() + " - 开始向公司出发");
  21. slowly();
  22. System.out.println(getName() + " - 已经到达公司");
  23. // 到达同步点,等待其他线程
  24. phaser.arriveAndAwaitAdvance();
  25. System.out.println(getName() + " - 开始笔试");
  26. slowly();
  27. System.out.println(getName() + " - 笔试结束");
  28. // 到达同步点,等待其他线程
  29. phaser.arriveAndAwaitAdvance();
  30. System.out.println(getName() + " - 开始面试");
  31. slowly();
  32. System.out.println(getName() + " - 面试结束");
  33. }
  34. private void slowly() {
  35. try {
  36. Thread.sleep(random.nextInt(1000));
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }