8.1 CountDownLatch

CountDownLatch计数器

特点:内部计数器递减。

功能:主线程等待所有子线程执行完毕进行汇总。

8.1.1 案例

任务分解,第三个任务需要等待第一个任务和第二个任务执行计算后进行汇总。

  1. import lombok.extern.slf4j.Slf4j;
  2. import java.util.concurrent.*;
  3. /**
  4. * @author KHighness
  5. * @since 2021-05-07
  6. */
  7. @Slf4j(topic = "CountDownLatch")
  8. public class CountDownLatchDemo {
  9. private static int total = 0;
  10. private static final CountDownLatch countDownLatch = new CountDownLatch(2);
  11. private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
  12. private static void sleep(int timeout, TimeUnit unit) {
  13. try {
  14. unit.sleep(timeout);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. public static void main(String[] args) {
  20. executorService.submit(() -> {
  21. log.debug("state = {}", countDownLatch.getCount());
  22. total += 1;
  23. sleep(1, TimeUnit.SECONDS);
  24. log.debug("{} run over", Thread.currentThread().getName());
  25. countDownLatch.countDown();
  26. log.debug("state = {}", countDownLatch.getCount());
  27. });
  28. executorService.submit(() -> {
  29. log.debug("state = {}", countDownLatch.getCount());
  30. total += 2;
  31. sleep(1, TimeUnit.SECONDS);
  32. log.debug("{} run over", Thread.currentThread().getName());
  33. countDownLatch.countDown();
  34. log.debug("state = {}", countDownLatch.getCount());
  35. });
  36. executorService.submit(() -> {
  37. log.debug("state = {}", countDownLatch.getCount());
  38. try {
  39. countDownLatch.await();
  40. log.debug("result: total = {}", total);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. log.debug("{} run over", Thread.currentThread().getName());
  45. });
  46. executorService.shutdown();
  47. sleep(3, TimeUnit.SECONDS);
  48. executorService.shutdownNow();
  49. }
  50. }

运行结果:

  1. 2021-05-08 12:14:58.374 [pool-1-thread-1] DEBUG CountDownLatch - state = 2
  2. 2021-05-08 12:14:58.374 [pool-1-thread-2] DEBUG CountDownLatch - state = 2
  3. 2021-05-08 12:14:58.374 [pool-1-thread-3] DEBUG CountDownLatch - state = 2
  4. 2021-05-08 12:14:59.379 [pool-1-thread-2] DEBUG CountDownLatch - pool-1-thread-2 run over
  5. 2021-05-08 12:14:59.379 [pool-1-thread-1] DEBUG CountDownLatch - pool-1-thread-1 run over
  6. 2021-05-08 12:14:59.380 [pool-1-thread-2] DEBUG CountDownLatch - state = 1
  7. 2021-05-08 12:14:59.380 [pool-1-thread-1] DEBUG CountDownLatch - state = 0
  8. 2021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - result: total = 3
  9. 2021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - pool-1-thread-3 run over

8.1.2 原理

类图

image-20210508154726370.png

(1)构造方法

入参:count,会将计数器值count赋给了AQS的状态变量state

  1. // CountDownLatch
  2. public CountDownLatch(int count) {
  3. if (count < 0) throw new IllegalArgumentException("count < 0");
  4. this.sync = new Sync(count);
  5. }
  6. // Sync
  7. Sync(int count) {
  8. setState(count);
  9. }

(2)void await()方法

当线程调用CountDownLatchawait方法后,当前线程就会阻塞,直到:

  • 所有线程都调用了CountDownLatch对象的countDown方法后,即计数器值为0时
  • 其他线程调用了当前线程的interrupt方法中断了当前线程,当前线程抛出InterruptedException异常
  1. // CountDownLatch
  2. public void await() throws InterruptedException {
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. // AQS
  6. // 获取共享资源时可被中断
  7. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  8. // 如果线程被中断即抛出异常
  9. if (Thread.interrupted())
  10. throw new InterruptedException();
  11. // 查看当前计数器值是否为0,为0则直接返回,否则进入AQS的队列等待
  12. if (tryAcquireShared(arg) < 0)
  13. doAcquireSharedInterruptibly(arg);
  14. }
  15. // Sync
  16. // 实现的AQS接口
  17. protected int tryAcquireShared(int acquires) {
  18. return (getState() == 0) ? 1 : -1;
  19. }

(3)boolean await(long timeout, TimeUnit unit)方法

当线程调用了CountDownLatch的该方法后,当前线程会被阻塞,直到:

  • 所有线程都调用了CountDownLatch对象的countDown方法后,即计数器值为0时
  • 设置的timeout时间到了,因为超时返回false
  • 其他线程调用了当前线程的interrupt方法中断了当前线程,当前线程抛出InterruptedException异常
  1. // CountDownLatch
  2. public boolean await(long timeout, TimeUnit unit)
  3. throws InterruptedException {
  4. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  5. }

(4)void countDown()方法

线程调用该方法后,计数器的值递减,递减后如果计数器的值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。

如果state原始值为n,有(n+1)个线程调用了countDown方法,那么第(n+1)个线程调用无效。

  1. // CountDownLatch
  2. public void countDown() {
  3. sync.releaseShared(1);
  4. }
  5. // AQS
  6. public final boolean releaseShared(int arg) {
  7. // 调用Sync的实现,成功则唤醒阻塞的线程
  8. if (tryReleaseShared(arg)) {
  9. // AQS释放资源
  10. doReleaseShared();
  11. return true;
  12. }
  13. return false;
  14. }
  15. // Sync
  16. protected boolean tryReleaseShared(int releases) {
  17. // 循环CAS使计数器(状态值state)减1并更新,直到成功
  18. for (;;) {
  19. int c = getState();
  20. // 防止state变成负数
  21. if (c == 0)
  22. return false;
  23. int nextc = c-1;
  24. if (compareAndSetState(c, nextc))
  25. return nextc == 0;
  26. }
  27. }

(5)long getCount()方法

获取当前计数器的值,即AQS的state值,一般用于测试。

  1. // CountDownLatch
  2. public long getCount() {
  3. return sync.getCount();
  4. }
  5. // AQS
  6. int getCount() {
  7. return getState();
  8. }

8.1.3 小结

CountDownLatch相比于使用线程的join方法来实现线程间同步,前者更具有灵活性和方便性,因为在ExecutorService线程池中无法直接调用其他线程的join方法。

CountDownLatch使用AQS的状态变量state来存放计数器的值。首先在初始化设置计数器值(AQS状态值),多个线程调用countDown方法实际是原子性递减AQS的状态值。当线程调用await方法后当前线程会被放入AQS的阻塞队列等待,待计数器为0再返回。其他线程调用countDown方法让计数器值递减1,当计数器值变成0时,当前线程还要调用AQS的doReleaseSShared方法来激活由于调用await方法而被阻塞的线程。

8.2 CyclicBarrier

CyclicBarrier回环[1]屏障[2]

特点:内部计数器递减。

功能:让一组线程全部达到一个状态后再全部同时执行。

[^ 1]: 回环是因为当所有线程执行完毕,并重置CylicBarrier的状态以便重用。

[^ 2]:线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。

8.2.1 案例

一个任务需要三步完成,需要执行多个任务。

  1. @Slf4j(topic = "CyclicBarrier")
  2. public class CyclicBarrierDemo {
  3. private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {log.debug("==========================");});
  4. private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
  5. private static void sleep(int timeout, TimeUnit unit) {
  6. try {
  7. unit.sleep(timeout);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. public static void main(String[] args) {
  13. for (int i = 0; i < 3; i++) {
  14. executorService.submit(() -> {
  15. try {
  16. log.debug("{} first step", Thread.currentThread().getName());
  17. sleep(1, TimeUnit.SECONDS);
  18. cyclicBarrier.await();
  19. log.debug("{} second step", Thread.currentThread().getName());
  20. sleep(1, TimeUnit.SECONDS);
  21. cyclicBarrier.await();
  22. log.debug("{} third step", Thread.currentThread().getName());
  23. } catch (InterruptedException | BrokenBarrierException e) {
  24. e.printStackTrace();
  25. }
  26. });
  27. }
  28. executorService.shutdown();
  29. }
  30. }

运行结果:

  1. 2021-05-08 16:28:58.549 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 first step
  2. 2021-05-08 16:28:58.549 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 first step
  3. 2021-05-08 16:28:58.549 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 first step
  4. 2021-05-08 16:28:59.558 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================
  5. 2021-05-08 16:28:59.559 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 second step
  6. 2021-05-08 16:28:59.559 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 second step
  7. 2021-05-08 16:28:59.559 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 second step
  8. 2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================
  9. 2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 third step
  10. 2021-05-08 16:29:00.570 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 third step
  11. 2021-05-08 16:29:00.570 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 third step

8.2.2 原理

类图

image-20210508154851790.png

CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。

属性:

  • lock:独占锁。
  • trip:条件变量。
  • barrierCommand:到达屏障点执行的任务。
  • parties:线程计数器,这里表示多少线程调用await后,所有线程才会冲破屏障继续向下允许。
  • count:执行记录器,一开始等于parties,每当有线程调用await就递减1,当count为0时就表示所有线程都到了屏障点。

parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,进而进行服用。

内部类Generation仅有一个属性broken,用来记录当前屏障是否被打破。是在锁内使用变量,所以并没有声明为volatile

(1)构造方法

入参:parties(必选)、barrierAction(可选)

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }

(2)int await()方法

当前线程调用了CyclicBarrier的该方法时会被阻塞,直到:

  • parties个线程都调用了await方法,线程到达屏障点
  • 其他线程调用了当前线程的interrupt方法中断了当前线程,则当前线程抛出InterruptedExcetion异常
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常

内部调用dowait方法,第一个参数为false说明不设置超时时间

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. try {
  3. return dowait(false, 0L);
  4. } catch (TimeoutException toe) {
  5. throw new Error(toe); // cannot happen
  6. }
  7. }

(3)boolean await(long timeout, TimeUnit unit)方法

当先线程调用了CyclicBarrier的该方法时会被阻塞,直到:

  • parties个线程都调用了await方法,也就是线程都到了屏障点,这时候返回true
  • 设置的timeout时间到了,因为超时返回false
  • 其他线程调用当前线程的interrupt方法中断了当前线程,则当前线程会抛出InterruptedException
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常

内部调用dowait方法,第一个参数说明设置超时,第二个参数是超时时间

  1. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  2. return dowait(true, unit.toNanos(timeout));
  3. }

(3)int dowait(boolean timed, long nanos)方法

  1. private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. ...
  6. // (1)如果index=0则说明所有线程都到了屏障点,此时执行初始化时传递的任务
  7. int index = --count;
  8. if (index == 0) { // tripped
  9. boolean ranAction = false;
  10. try {
  11. // (2) 执行任务
  12. final Runnable command = barrierCommand;
  13. if (command != null)
  14. command.run();
  15. ranAction = true;
  16. // (3)激活其他因调用await方法而被阻塞的线程,并且重置CyclicBarrier
  17. nextGeneration();
  18. return 0;
  19. } finally {
  20. if (!ranAction)
  21. breakBarrier();
  22. }
  23. }
  24. // (4)如果index!=0
  25. for (;;) {
  26. try {
  27. // (5)未设置超时时间
  28. if (!timed)
  29. trip.await();
  30. // (6)设置了超时时间
  31. else if (nanos > 0L)
  32. nanos = trip.awaitNanos(nanos);
  33. } catch (InterruptedException ie) {
  34. ...
  35. }
  36. ...
  37. }
  38. } finally {
  39. lock.unlock();
  40. }
  41. }
  42. private void nextGeneration() {
  43. // (7)唤醒条件队列中的阻塞线程
  44. trip.signalAll();
  45. // (8)重置CyclicBarrier
  46. count = parties;
  47. generation = new Generation();
  48. }

8.2.3 小结

CycleBarrierCountDownLatch的不同之处在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。

内部通过ReentrantLock独占锁实现计数器原子性更新,并使用条件变量队列来实现线程同步。

8.3 Semaphore

Semaphore信号量

特点:内部计数器递增。

功能:限制能同时访问共享资源的线程上限。

8.3.1 案例

主线程等待两个子任务执行完毕。

  1. @Slf4j(topic = "CountDownLatch")
  2. public class CountDownLatchDemo {
  3. private static final CountDownLatch countDownLatch = new CountDownLatch(2);
  4. private static final ExecutorService executorService = Executors.newFixedThreadPool(2);
  5. private static void sleep(int timeout, TimeUnit unit) {
  6. try {
  7. unit.sleep(timeout);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. public static void main(String[] args) throws InterruptedException {
  13. executorService.submit(() -> {
  14. sleep(1, TimeUnit.SECONDS);
  15. countDownLatch.countDown();
  16. });
  17. executorService.submit(() -> {
  18. sleep(1, TimeUnit.SECONDS);
  19. countDownLatch.countDown();
  20. });
  21. log.debug("wait all child thread over");
  22. countDownLatch.await();
  23. log.debug("all child thread over");
  24. }
  25. }

8.3.2 原理

类图

image-20210508204931688.png

Semaphore还是使用AQS实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。

(1)构造方法

入参:permits(必选)、fair(可选)

Semphore默认采用非公平策略,如果需要使用公平策略需要使用双参构造方法。初始化信号量个数permits被赋给了AQS的状态变量state

  1. // Semophore
  2. public Semaphore(int permits, boolean fair) {
  3. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  4. }
  5. // Sync
  6. Sync(int permits) {
  7. setState(permits);
  8. }

(2)void acquire()方法

当前线程调用该方法的目的是希望获取一个信号量资源。

如果当前信号量个数大于0,则当前信号量的计数会减1,然后直接返回。

  1. public void acquire() throws InterruptedException {
  2. // 传递参数为1,说明要获取一个信号量资源
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  6. // (1)如果线程被中断,则抛出中断异常
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. // (2)否则调用Sync子类方法尝试获取,这里根据构造方法确定NonfairSync还是FairSync
  10. if (tryAcquireShared(arg) < 0)
  11. doAcquireSharedInterruptibly(arg);
  12. }

对于tryAcquireShared,分两种情况讨论:

  1. 非公平锁
    先获取当前信号量,然后减去需要获取的值,得到剩余信号量个数,如果剩余信号量小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
    另外,由于NonfairSync是非公平获取的,也就是说先调用acquire方法获取信号量的线程不一定比后来者先获取到信号量。 ```java // NonfairSync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }

// Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取当前信号量 int available = getState(); // 计算当前剩余量 int remaining = available - acquires; // 如果当前剩余值小于0或者CAS设置成功则返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

  1. 2. 公平锁 <br />AQS公平性的保证就靠`hasQueuedPredecessors`这个方法,如果当前线程节点的前驱结点是否也在等待获取该资源,是则放弃自己获取信号量的资格。
  2. ```java
  3. protected int tryAcquireShared(int acquires) {
  4. for (;;) {
  5. // 先检查阻塞队列中是否有前驱结点
  6. if (hasQueuedPredecessors())
  7. return -1;
  8. int available = getState();
  9. int remaining = available - acquires;
  10. if (remaining < 0 ||
  11. compareAndSetState(available, remaining))
  12. return remaining;
  13. }
  14. }

(3)void acquire(int permits)方法

该方法与acquire()方法不同,后者只需要获取一个信号量值,而前者则获取perimits个。

  1. public void acquire(int permits) throws InterruptedException {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.acquireSharedInterruptibly(permits);
  4. }

(4)void acquireUninterruptibly()方法

该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly获取资源时,其他线程调用了当前线程的interrupt方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException异常而返回。

(5)void accquireUninterruptibly(int permits)方法

该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应。

  1. public void acquireUninterruptibly(int permits) {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.acquireShared(permits);
  4. }

(6)void release()方法

该方法的作用是把当前Semaphore对象的信号量增加1,如果当前有线程因为调用acquire方法被阻塞而被放入AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。

  1. // Semaphore
  2. public void release() {
  3. // (1)arg=1
  4. sync.releaseShared(1);
  5. }
  6. // AQS
  7. public final boolean releaseShared(int arg) {
  8. // (2) 尝试释放资源锁
  9. if (tryReleaseShared(arg)) {
  10. // (3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
  11. doReleaseShared();
  12. return true;
  13. }
  14. return false;
  15. }
  16. // Sync
  17. protected final boolean tryReleaseShared(int releases) {
  18. for (;;) {
  19. // (4)获取当前信号量值
  20. int current = getState();
  21. // (5)当前信号量值+1
  22. int next = current + releases;
  23. if (next < current) // overflow
  24. throw new Error("Maximum permit count exceeded");
  25. // (6)通过CAS更新信号量的值
  26. if (compareAndSetState(current, next))
  27. return true;
  28. }
  29. }

(7)void release(int permits)方法

该方法与release()的不同之处在于,前者让信号量加permits,后者加1

8.3.3 小结

Semaphore完全可以达到CountDownLatch的效果,但是Semaphore的计数器是不可以自动重置的,不过通过变相的改变acquire方法的参数还是可以实现CyclicBarrier的功能的。

8.4 经典题目

8.4.1 交错执行

  1. /**
  2. * A,B,C三个线程有序交替执行
  3. *
  4. * @author KHighness
  5. * @since 2021-05-07
  6. */
  7. @Slf4j(topic = "Alternately")
  8. public class AlternatelyDemo {
  9. /**
  10. * 三个信号量分别控制A,B,C的打印
  11. */
  12. private static final Semaphore[] s = { new Semaphore(1), new Semaphore(1), new Semaphore(1)};
  13. private static final int size = 3;
  14. private static final char[] arr = {'A', 'B', 'C'};
  15. private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
  16. public static void main(String[] args) throws InterruptedException {
  17. s[1].acquire();
  18. s[2].acquire();
  19. for (int i = 0; i < size; i++) {
  20. final int finalI = i;
  21. executorService.submit(() -> {
  22. while (true) {
  23. try {
  24. s[finalI].acquire();
  25. TimeUnit.MILLISECONDS.sleep(100);
  26. log.debug("{} => [{}]", arr[finalI], System.nanoTime());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } finally {
  30. s[(finalI + 1) % size].release();
  31. }
  32. }
  33. });
  34. }
  35. }
  36. }

8.4.2 生产者-消费者

  1. /**
  2. * 生产者-消费者模型
  3. *
  4. * @author KHighness
  5. * @since 2021-08-03
  6. */
  7. @Slf4j(topic = "SemaphoreProducerConsumerModel")
  8. public class ProducerConsumerDemo {
  9. /** 数据队列 */
  10. private final static Queue<Character> QUEUE = new LinkedList<>();
  11. /** 最大容量 */
  12. private final static int QUEUE_MAX_SIZE = 5;
  13. /** 随机变量 */
  14. private final static ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
  15. /** 表示可消费的资源数 */
  16. private final static Semaphore FULL = new Semaphore(0);
  17. /** 表示队列的剩余空间 */
  18. private final static Semaphore EMPTY = new Semaphore(QUEUE_MAX_SIZE);
  19. /** 产品队列访问互斥的信号量 */
  20. private final static Semaphore MUTEX = new Semaphore(1);
  21. /**
  22. * 生成随机数字/字母字符
  23. * ascii char
  24. * 48-57 0~9
  25. * 65-90 A~Z
  26. * 97-122 a~z
  27. * @return ascii码字符
  28. */
  29. private static Character randomChar() {
  30. int choice = RANDOM.nextInt(3);
  31. if (choice == 0)
  32. return (char) (RANDOM.nextInt(10) + 48);
  33. else if (choice == 1)
  34. return (char) (RANDOM.nextInt(26) + 65);
  35. else
  36. return (char) (RANDOM.nextInt(26) + 97);
  37. }
  38. /**
  39. * 生产者
  40. */
  41. private static class Producer extends Thread {
  42. public Producer() {
  43. super("生产者");
  44. }
  45. @SneakyThrows
  46. @Override
  47. public void run() {
  48. while (true) {
  49. Character c = randomChar(); // 生产数据
  50. TimeUnit.SECONDS.sleep(1); // 模拟延时
  51. EMPTY.acquire(); // 请求空间
  52. MUTEX.acquire(); // 请求访问
  53. log.debug("生产 => [{}]", c); // 输出日志
  54. QUEUE.add(c); // 放入队列
  55. MUTEX.release(); // 释放队列
  56. FULL.release(); // 数据增加
  57. }
  58. }
  59. }
  60. /**
  61. * 消费者
  62. */
  63. private static class Consumer extends Thread {
  64. public Consumer() {
  65. super("消费者");
  66. }
  67. @SneakyThrows
  68. @Override
  69. public void run() {
  70. while (true) {
  71. FULL.acquire(); // 请求数据
  72. MUTEX.acquire(); // 请求访问
  73. TimeUnit.SECONDS.sleep(1); // 模拟延时
  74. Character c = QUEUE.poll(); // 消费数据
  75. log.debug("消费 => [{}]", c); // 输出日志
  76. MUTEX.release(); // 释放队列
  77. EMPTY.release(); // 空间增加
  78. }
  79. }
  80. }
  81. public static void main(String[] args) {
  82. new Producer().start();
  83. new Consumer().start();
  84. }
  85. }

8.4.3 读写者

  1. /**
  2. * 读写者问题
  3. * 有读者和写者两组并发进程,共享一个文件,当两个或以上的读进程同时访问共享数据时
  4. * 不会产生副作用,但若某个写进程和其他进程(读进程或写进程)同时访问共享数据时则
  5. * 可能导致数据不一致的错误。
  6. *
  7. * <p>要求:
  8. * <li>(1) 允许多个读者可以同时对文件执行读操作
  9. * <li>(2) 只允许一个写者往文件中写信息
  10. * <li>(3) 任一写者在完成写操作之前不允许其他读者或写者工作
  11. * <li>(4) 写者执行写操作前,应让已有的读者和写者全部退出
  12. *
  13. * @author KHighness
  14. * @since 2021-08-03
  15. */
  16. @Slf4j(topic = "ReaderWriterDemo")
  17. public class ReaderWriterDemo {
  18. /** 用于读写者互斥访问文件 */
  19. private final static Semaphore FILE_MUTEX = new Semaphore(1);
  20. /** 当前读者数量 */
  21. private static int READER_COUNT = 0;
  22. /** 对READER_COUNT操作的互斥变量 */
  23. private final static Semaphore COUNT_MUTEX = new Semaphore(1);
  24. /**
  25. * 写者
  26. */
  27. private static class Writer extends Thread {
  28. public Writer() {
  29. super("写者");
  30. }
  31. @SneakyThrows
  32. @Override
  33. public void run() {
  34. while (true) {
  35. FILE_MUTEX.acquire(); // 请求访问文件
  36. log.debug("写入文件"); // 输出日志
  37. TimeUnit.SECONDS.sleep(1);
  38. FILE_MUTEX.release(); // 释放共享文件
  39. }
  40. }
  41. }
  42. private static class Reader extends Thread {
  43. public Reader() {
  44. super("读者");
  45. }
  46. @SneakyThrows
  47. @Override
  48. public void run() {
  49. while (true) {
  50. COUNT_MUTEX.acquire(); // 请求访问COUNT
  51. if (READER_COUNT == 0) // 第一个读进程
  52. FILE_MUTEX.acquire(); // 阻止写进程写
  53. READER_COUNT++; // 读者计数器递增
  54. COUNT_MUTEX.release(); // 恢复访问COUNT
  55. log.debug("阅读文件"); // 输出日志
  56. TimeUnit.SECONDS.sleep(1);
  57. COUNT_MUTEX.acquire(); // 请求访问COUNT
  58. READER_COUNT--; // 读者计数器递减
  59. if (READER_COUNT == 0) // 最后一个读进程
  60. FILE_MUTEX.release(); // 允许写进程写
  61. COUNT_MUTEX.release(); // 恢复访问COUNT
  62. }
  63. }
  64. }
  65. public static void main(String[] args) {
  66. new Writer().start();
  67. new Reader().start();
  68. }
  69. }

8.4.4 哲学家就餐

  1. /**
  2. * 哲学家就餐问题
  3. * 当一名哲学家左右两边的筷子都可用时,才允许拿起筷子。
  4. *
  5. * @author KHighness
  6. * @since 2021-08-03
  7. */
  8. @Slf4j(topic = "PhilosopherDinnerDemo")
  9. public class PhilosopherDinnerDemo {
  10. /** 筷子 */
  11. private final static Semaphore[] CHOPSTICKS = {new Semaphore(1), new Semaphore(1),
  12. new Semaphore(1), new Semaphore(1), new Semaphore(1)};
  13. /** 互斥取筷子 */
  14. private final static Semaphore MUTEX = new Semaphore(1);
  15. /**
  16. * 哲学家
  17. */
  18. private static class Philosopher extends Thread {
  19. private final int index;
  20. public Philosopher(int index) {
  21. super("哲学家-" + index);
  22. this.index = index;
  23. }
  24. @SneakyThrows
  25. @Override
  26. public void run() {
  27. while (true) {
  28. MUTEX.acquire();
  29. CHOPSTICKS[index].acquire();
  30. CHOPSTICKS[(index + 1) % 5].acquire();
  31. MUTEX.release();
  32. log.debug("进餐");
  33. CHOPSTICKS[index].release();
  34. CHOPSTICKS[(index + 1) % 5].release();
  35. log.debug("思考");
  36. }
  37. }
  38. }
  39. public static void main(String[] args) {
  40. Philosopher[] philosophers = new Philosopher[5];
  41. for (int i = 0; i < philosophers.length; i++) {
  42. philosophers[i] = new Philosopher(i);
  43. philosophers[i].start();
  44. }
  45. }
  46. }

8.4.5 和尚和水

  1. /**
  2. * 和尚与水问题
  3. * 寺庙有小和尚和老和尚若干,有一水缸,由小和尚提水入缸供老和尚引用。
  4. * 水缸可容10桶水,井每次只能容一个桶取水。水桶总数为3个,每次入缸仅取1桶水。
  5. *
  6. * @author KHighness
  7. * @since 2021-08-03
  8. */
  9. @Slf4j(topic = "MonkAndWaterDemo")
  10. public class MonkAndWaterDemo {
  11. /** 用于互斥地访问水井*/
  12. private static final Semaphore WELL = new Semaphore(1);
  13. /** 用于互斥地访问水缸 */
  14. private static final Semaphore VAT = new Semaphore(1);
  15. /** 用于表示水缸中剩余空间所能容纳的水的桶数 */
  16. private static final Semaphore EMPTY = new Semaphore(10);
  17. /** 表示水缸中水的桶数 */
  18. private static final Semaphore FULL = new Semaphore(0);
  19. /** 表示有多少个水桶可用 */
  20. private static final Semaphore BUCKET = new Semaphore(3);
  21. /**
  22. * 老和尚
  23. */
  24. private static class OldMonk extends Thread {
  25. public OldMonk() {
  26. super("老和尚");
  27. }
  28. @SneakyThrows
  29. @Override
  30. public void run() {
  31. while (true) {
  32. FULL.acquire(); // 请求水缸的水
  33. BUCKET.acquire(); // 请求水桶资源
  34. VAT.acquire(); // 请求水缸资源
  35. log.debug("从水缸中打一桶水");
  36. TimeUnit.SECONDS.sleep(1);
  37. VAT.release(); // 释放水缸资源
  38. EMPTY.release(); // 消耗水资源
  39. log.debug("喝水水");
  40. TimeUnit.SECONDS.sleep(1);
  41. BUCKET.release(); // 释放水桶资源
  42. }
  43. }
  44. }
  45. /**
  46. * 小和尚
  47. */
  48. private static class YoungMonk extends Thread {
  49. public YoungMonk() {
  50. super("小和尚");
  51. }
  52. @SneakyThrows
  53. @Override
  54. public void run() {
  55. while (true) {
  56. EMPTY.acquire(); // 请求水缸空间
  57. BUCKET.acquire(); // 请求桶资源
  58. WELL.acquire(); // 请求水井资源
  59. log.debug("从水井中打一桶水");
  60. TimeUnit.SECONDS.sleep(1);
  61. WELL.release(); // 释放水井资源
  62. VAT.acquire(); // 请求水缸资源
  63. log.debug("倒水水");
  64. TimeUnit.SECONDS.sleep(1);
  65. VAT.release(); // 释放水缸资源
  66. FULL.release(); // 增加水缸的水
  67. BUCKET.release(); // 释放水桶资源
  68. }
  69. }
  70. }
  71. public static void main(String[] args) {
  72. new OldMonk().start();
  73. new YoungMonk().start();
  74. }
  75. }