8.1 CountDownLatch
CountDownLatch计数器
特点:内部计数器递减。
功能:主线程等待所有子线程执行完毕进行汇总。
8.1.1 案例
任务分解,第三个任务需要等待第一个任务和第二个任务执行计算后进行汇总。
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** @author KHighness* @since 2021-05-07*/@Slf4j(topic = "CountDownLatch")public class CountDownLatchDemo {private static int total = 0;private static final CountDownLatch countDownLatch = new CountDownLatch(2);private static final ExecutorService executorService = Executors.newFixedThreadPool(3);private static void sleep(int timeout, TimeUnit unit) {try {unit.sleep(timeout);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {executorService.submit(() -> {log.debug("state = {}", countDownLatch.getCount());total += 1;sleep(1, TimeUnit.SECONDS);log.debug("{} run over", Thread.currentThread().getName());countDownLatch.countDown();log.debug("state = {}", countDownLatch.getCount());});executorService.submit(() -> {log.debug("state = {}", countDownLatch.getCount());total += 2;sleep(1, TimeUnit.SECONDS);log.debug("{} run over", Thread.currentThread().getName());countDownLatch.countDown();log.debug("state = {}", countDownLatch.getCount());});executorService.submit(() -> {log.debug("state = {}", countDownLatch.getCount());try {countDownLatch.await();log.debug("result: total = {}", total);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{} run over", Thread.currentThread().getName());});executorService.shutdown();sleep(3, TimeUnit.SECONDS);executorService.shutdownNow();}}
运行结果:
2021-05-08 12:14:58.374 [pool-1-thread-1] DEBUG CountDownLatch - state = 22021-05-08 12:14:58.374 [pool-1-thread-2] DEBUG CountDownLatch - state = 22021-05-08 12:14:58.374 [pool-1-thread-3] DEBUG CountDownLatch - state = 22021-05-08 12:14:59.379 [pool-1-thread-2] DEBUG CountDownLatch - pool-1-thread-2 run over2021-05-08 12:14:59.379 [pool-1-thread-1] DEBUG CountDownLatch - pool-1-thread-1 run over2021-05-08 12:14:59.380 [pool-1-thread-2] DEBUG CountDownLatch - state = 12021-05-08 12:14:59.380 [pool-1-thread-1] DEBUG CountDownLatch - state = 02021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - result: total = 32021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - pool-1-thread-3 run over
8.1.2 原理
类图

(1)构造方法
入参:count,会将计数器值count赋给了AQS的状态变量state。
// CountDownLatchpublic CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}// SyncSync(int count) {setState(count);}
(2)void await()方法
当线程调用CountDownLatch的await方法后,当前线程就会阻塞,直到:
- 所有线程都调用了
CountDownLatch对象的countDown方法后,即计数器值为0时 - 其他线程调用了当前线程的
interrupt方法中断了当前线程,当前线程抛出InterruptedException异常
// CountDownLatchpublic void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// AQS// 获取共享资源时可被中断public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 如果线程被中断即抛出异常if (Thread.interrupted())throw new InterruptedException();// 查看当前计数器值是否为0,为0则直接返回,否则进入AQS的队列等待if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}// Sync// 实现的AQS接口protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
(3)boolean await(long timeout, TimeUnit unit)方法
当线程调用了CountDownLatch的该方法后,当前线程会被阻塞,直到:
- 所有线程都调用了
CountDownLatch对象的countDown方法后,即计数器值为0时 - 设置的
timeout时间到了,因为超时返回false - 其他线程调用了当前线程的
interrupt方法中断了当前线程,当前线程抛出InterruptedException异常
// CountDownLatchpublic boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}
(4)void countDown()方法
线程调用该方法后,计数器的值递减,递减后如果计数器的值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。
如果state原始值为n,有(n+1)个线程调用了countDown方法,那么第(n+1)个线程调用无效。
// CountDownLatchpublic void countDown() {sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) {// 调用Sync的实现,成功则唤醒阻塞的线程if (tryReleaseShared(arg)) {// AQS释放资源doReleaseShared();return true;}return false;}// Syncprotected boolean tryReleaseShared(int releases) {// 循环CAS使计数器(状态值state)减1并更新,直到成功for (;;) {int c = getState();// 防止state变成负数if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
(5)long getCount()方法
获取当前计数器的值,即AQS的state值,一般用于测试。
// CountDownLatchpublic long getCount() {return sync.getCount();}// AQSint getCount() {return getState();}
8.1.3 小结
CountDownLatch相比于使用线程的join方法来实现线程间同步,前者更具有灵活性和方便性,因为在ExecutorService线程池中无法直接调用其他线程的join方法。
CountDownLatch使用AQS的状态变量state来存放计数器的值。首先在初始化设置计数器值(AQS状态值),多个线程调用countDown方法实际是原子性递减AQS的状态值。当线程调用await方法后当前线程会被放入AQS的阻塞队列等待,待计数器为0再返回。其他线程调用countDown方法让计数器值递减1,当计数器值变成0时,当前线程还要调用AQS的doReleaseSShared方法来激活由于调用await方法而被阻塞的线程。
8.2 CyclicBarrier
特点:内部计数器递减。
功能:让一组线程全部达到一个状态后再全部同时执行。
[^ 1]: 回环是因为当所有线程执行完毕,并重置CylicBarrier的状态以便重用。
[^ 2]:线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。
8.2.1 案例
一个任务需要三步完成,需要执行多个任务。
@Slf4j(topic = "CyclicBarrier")public class CyclicBarrierDemo {private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {log.debug("==========================");});private static final ExecutorService executorService = Executors.newFixedThreadPool(10);private static void sleep(int timeout, TimeUnit unit) {try {unit.sleep(timeout);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {for (int i = 0; i < 3; i++) {executorService.submit(() -> {try {log.debug("{} first step", Thread.currentThread().getName());sleep(1, TimeUnit.SECONDS);cyclicBarrier.await();log.debug("{} second step", Thread.currentThread().getName());sleep(1, TimeUnit.SECONDS);cyclicBarrier.await();log.debug("{} third step", Thread.currentThread().getName());} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}executorService.shutdown();}}
运行结果:
2021-05-08 16:28:58.549 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 first step2021-05-08 16:28:58.549 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 first step2021-05-08 16:28:58.549 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 first step2021-05-08 16:28:59.558 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================2021-05-08 16:28:59.559 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 second step2021-05-08 16:28:59.559 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 second step2021-05-08 16:28:59.559 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 second step2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 third step2021-05-08 16:29:00.570 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 third step2021-05-08 16:29:00.570 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 third step
8.2.2 原理
类图

CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。
属性:
lock:独占锁。trip:条件变量。barrierCommand:到达屏障点执行的任务。parties:线程计数器,这里表示多少线程调用await后,所有线程才会冲破屏障继续向下允许。count:执行记录器,一开始等于parties,每当有线程调用await就递减1,当count为0时就表示所有线程都到了屏障点。
parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,进而进行服用。
内部类Generation仅有一个属性broken,用来记录当前屏障是否被打破。是在锁内使用变量,所以并没有声明为volatile。
(1)构造方法
入参:parties(必选)、barrierAction(可选)
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}
(2)int await()方法
当前线程调用了CyclicBarrier的该方法时会被阻塞,直到:
parties个线程都调用了await方法,线程到达屏障点- 其他线程调用了当前线程的
interrupt方法中断了当前线程,则当前线程抛出InterruptedExcetion异常 - 与当前屏障点关联的
Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常
内部调用dowait方法,第一个参数为false说明不设置超时时间
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}
(3)boolean await(long timeout, TimeUnit unit)方法
当先线程调用了CyclicBarrier的该方法时会被阻塞,直到:
parties个线程都调用了await方法,也就是线程都到了屏障点,这时候返回true- 设置的
timeout时间到了,因为超时返回false - 其他线程调用当前线程的
interrupt方法中断了当前线程,则当前线程会抛出InterruptedException - 与当前屏障点关联的
Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常
内部调用dowait方法,第一个参数说明设置超时,第二个参数是超时时间
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {return dowait(true, unit.toNanos(timeout));}
(3)int dowait(boolean timed, long nanos)方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {...// (1)如果index=0则说明所有线程都到了屏障点,此时执行初始化时传递的任务int index = --count;if (index == 0) { // trippedboolean ranAction = false;try {// (2) 执行任务final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// (3)激活其他因调用await方法而被阻塞的线程,并且重置CyclicBarriernextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// (4)如果index!=0for (;;) {try {// (5)未设置超时时间if (!timed)trip.await();// (6)设置了超时时间else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {...}...}} finally {lock.unlock();}}private void nextGeneration() {// (7)唤醒条件队列中的阻塞线程trip.signalAll();// (8)重置CyclicBarriercount = parties;generation = new Generation();}
8.2.3 小结
CycleBarrier与CountDownLatch的不同之处在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。
内部通过ReentrantLock独占锁实现计数器原子性更新,并使用条件变量队列来实现线程同步。
8.3 Semaphore
Semaphore信号量
特点:内部计数器递增。
功能:限制能同时访问共享资源的线程上限。
8.3.1 案例
主线程等待两个子任务执行完毕。
@Slf4j(topic = "CountDownLatch")public class CountDownLatchDemo {private static final CountDownLatch countDownLatch = new CountDownLatch(2);private static final ExecutorService executorService = Executors.newFixedThreadPool(2);private static void sleep(int timeout, TimeUnit unit) {try {unit.sleep(timeout);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {executorService.submit(() -> {sleep(1, TimeUnit.SECONDS);countDownLatch.countDown();});executorService.submit(() -> {sleep(1, TimeUnit.SECONDS);countDownLatch.countDown();});log.debug("wait all child thread over");countDownLatch.await();log.debug("all child thread over");}}
8.3.2 原理
类图

Semaphore还是使用AQS实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。
(1)构造方法
入参:permits(必选)、fair(可选)
Semphore默认采用非公平策略,如果需要使用公平策略需要使用双参构造方法。初始化信号量个数permits被赋给了AQS的状态变量state。
// Semophorepublic Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}// SyncSync(int permits) {setState(permits);}
(2)void acquire()方法
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后直接返回。
public void acquire() throws InterruptedException {// 传递参数为1,说明要获取一个信号量资源sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// (1)如果线程被中断,则抛出中断异常if (Thread.interrupted())throw new InterruptedException();// (2)否则调用Sync子类方法尝试获取,这里根据构造方法确定NonfairSync还是FairSyncif (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
对于tryAcquireShared,分两种情况讨论:
- 非公平锁
先获取当前信号量,然后减去需要获取的值,得到剩余信号量个数,如果剩余信号量小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonfairSync是非公平获取的,也就是说先调用acquire方法获取信号量的线程不一定比后来者先获取到信号量。 ```java // NonfairSync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
// Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取当前信号量 int available = getState(); // 计算当前剩余量 int remaining = available - acquires; // 如果当前剩余值小于0或者CAS设置成功则返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
2. 公平锁 <br />AQS公平性的保证就靠`hasQueuedPredecessors`这个方法,如果当前线程节点的前驱结点是否也在等待获取该资源,是则放弃自己获取信号量的资格。```javaprotected int tryAcquireShared(int acquires) {for (;;) {// 先检查阻塞队列中是否有前驱结点if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
(3)void acquire(int permits)方法
该方法与acquire()方法不同,后者只需要获取一个信号量值,而前者则获取perimits个。
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}
(4)void acquireUninterruptibly()方法
该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly获取资源时,其他线程调用了当前线程的interrupt方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException异常而返回。
(5)void accquireUninterruptibly(int permits)方法
该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应。
public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}
(6)void release()方法
该方法的作用是把当前Semaphore对象的信号量增加1,如果当前有线程因为调用acquire方法被阻塞而被放入AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
// Semaphorepublic void release() {// (1)arg=1sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) {// (2) 尝试释放资源锁if (tryReleaseShared(arg)) {// (3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程doReleaseShared();return true;}return false;}// Syncprotected final boolean tryReleaseShared(int releases) {for (;;) {// (4)获取当前信号量值int current = getState();// (5)当前信号量值+1int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// (6)通过CAS更新信号量的值if (compareAndSetState(current, next))return true;}}
(7)void release(int permits)方法
该方法与release()的不同之处在于,前者让信号量加permits,后者加1。
8.3.3 小结
Semaphore完全可以达到CountDownLatch的效果,但是Semaphore的计数器是不可以自动重置的,不过通过变相的改变acquire方法的参数还是可以实现CyclicBarrier的功能的。
8.4 经典题目
8.4.1 交错执行
/*** A,B,C三个线程有序交替执行** @author KHighness* @since 2021-05-07*/@Slf4j(topic = "Alternately")public class AlternatelyDemo {/*** 三个信号量分别控制A,B,C的打印*/private static final Semaphore[] s = { new Semaphore(1), new Semaphore(1), new Semaphore(1)};private static final int size = 3;private static final char[] arr = {'A', 'B', 'C'};private static final ExecutorService executorService = Executors.newFixedThreadPool(3);public static void main(String[] args) throws InterruptedException {s[1].acquire();s[2].acquire();for (int i = 0; i < size; i++) {final int finalI = i;executorService.submit(() -> {while (true) {try {s[finalI].acquire();TimeUnit.MILLISECONDS.sleep(100);log.debug("{} => [{}]", arr[finalI], System.nanoTime());} catch (InterruptedException e) {e.printStackTrace();} finally {s[(finalI + 1) % size].release();}}});}}}
8.4.2 生产者-消费者
/*** 生产者-消费者模型** @author KHighness* @since 2021-08-03*/@Slf4j(topic = "SemaphoreProducerConsumerModel")public class ProducerConsumerDemo {/** 数据队列 */private final static Queue<Character> QUEUE = new LinkedList<>();/** 最大容量 */private final static int QUEUE_MAX_SIZE = 5;/** 随机变量 */private final static ThreadLocalRandom RANDOM = ThreadLocalRandom.current();/** 表示可消费的资源数 */private final static Semaphore FULL = new Semaphore(0);/** 表示队列的剩余空间 */private final static Semaphore EMPTY = new Semaphore(QUEUE_MAX_SIZE);/** 产品队列访问互斥的信号量 */private final static Semaphore MUTEX = new Semaphore(1);/*** 生成随机数字/字母字符* ascii char* 48-57 0~9* 65-90 A~Z* 97-122 a~z* @return ascii码字符*/private static Character randomChar() {int choice = RANDOM.nextInt(3);if (choice == 0)return (char) (RANDOM.nextInt(10) + 48);else if (choice == 1)return (char) (RANDOM.nextInt(26) + 65);elsereturn (char) (RANDOM.nextInt(26) + 97);}/*** 生产者*/private static class Producer extends Thread {public Producer() {super("生产者");}@SneakyThrows@Overridepublic void run() {while (true) {Character c = randomChar(); // 生产数据TimeUnit.SECONDS.sleep(1); // 模拟延时EMPTY.acquire(); // 请求空间MUTEX.acquire(); // 请求访问log.debug("生产 => [{}]", c); // 输出日志QUEUE.add(c); // 放入队列MUTEX.release(); // 释放队列FULL.release(); // 数据增加}}}/*** 消费者*/private static class Consumer extends Thread {public Consumer() {super("消费者");}@SneakyThrows@Overridepublic void run() {while (true) {FULL.acquire(); // 请求数据MUTEX.acquire(); // 请求访问TimeUnit.SECONDS.sleep(1); // 模拟延时Character c = QUEUE.poll(); // 消费数据log.debug("消费 => [{}]", c); // 输出日志MUTEX.release(); // 释放队列EMPTY.release(); // 空间增加}}}public static void main(String[] args) {new Producer().start();new Consumer().start();}}
8.4.3 读写者
/*** 读写者问题* 有读者和写者两组并发进程,共享一个文件,当两个或以上的读进程同时访问共享数据时* 不会产生副作用,但若某个写进程和其他进程(读进程或写进程)同时访问共享数据时则* 可能导致数据不一致的错误。** <p>要求:* <li>(1) 允许多个读者可以同时对文件执行读操作* <li>(2) 只允许一个写者往文件中写信息* <li>(3) 任一写者在完成写操作之前不允许其他读者或写者工作* <li>(4) 写者执行写操作前,应让已有的读者和写者全部退出** @author KHighness* @since 2021-08-03*/@Slf4j(topic = "ReaderWriterDemo")public class ReaderWriterDemo {/** 用于读写者互斥访问文件 */private final static Semaphore FILE_MUTEX = new Semaphore(1);/** 当前读者数量 */private static int READER_COUNT = 0;/** 对READER_COUNT操作的互斥变量 */private final static Semaphore COUNT_MUTEX = new Semaphore(1);/*** 写者*/private static class Writer extends Thread {public Writer() {super("写者");}@SneakyThrows@Overridepublic void run() {while (true) {FILE_MUTEX.acquire(); // 请求访问文件log.debug("写入文件"); // 输出日志TimeUnit.SECONDS.sleep(1);FILE_MUTEX.release(); // 释放共享文件}}}private static class Reader extends Thread {public Reader() {super("读者");}@SneakyThrows@Overridepublic void run() {while (true) {COUNT_MUTEX.acquire(); // 请求访问COUNTif (READER_COUNT == 0) // 第一个读进程FILE_MUTEX.acquire(); // 阻止写进程写READER_COUNT++; // 读者计数器递增COUNT_MUTEX.release(); // 恢复访问COUNTlog.debug("阅读文件"); // 输出日志TimeUnit.SECONDS.sleep(1);COUNT_MUTEX.acquire(); // 请求访问COUNTREADER_COUNT--; // 读者计数器递减if (READER_COUNT == 0) // 最后一个读进程FILE_MUTEX.release(); // 允许写进程写COUNT_MUTEX.release(); // 恢复访问COUNT}}}public static void main(String[] args) {new Writer().start();new Reader().start();}}
8.4.4 哲学家就餐
/*** 哲学家就餐问题* 当一名哲学家左右两边的筷子都可用时,才允许拿起筷子。** @author KHighness* @since 2021-08-03*/@Slf4j(topic = "PhilosopherDinnerDemo")public class PhilosopherDinnerDemo {/** 筷子 */private final static Semaphore[] CHOPSTICKS = {new Semaphore(1), new Semaphore(1),new Semaphore(1), new Semaphore(1), new Semaphore(1)};/** 互斥取筷子 */private final static Semaphore MUTEX = new Semaphore(1);/*** 哲学家*/private static class Philosopher extends Thread {private final int index;public Philosopher(int index) {super("哲学家-" + index);this.index = index;}@SneakyThrows@Overridepublic void run() {while (true) {MUTEX.acquire();CHOPSTICKS[index].acquire();CHOPSTICKS[(index + 1) % 5].acquire();MUTEX.release();log.debug("进餐");CHOPSTICKS[index].release();CHOPSTICKS[(index + 1) % 5].release();log.debug("思考");}}}public static void main(String[] args) {Philosopher[] philosophers = new Philosopher[5];for (int i = 0; i < philosophers.length; i++) {philosophers[i] = new Philosopher(i);philosophers[i].start();}}}
8.4.5 和尚和水
/*** 和尚与水问题* 寺庙有小和尚和老和尚若干,有一水缸,由小和尚提水入缸供老和尚引用。* 水缸可容10桶水,井每次只能容一个桶取水。水桶总数为3个,每次入缸仅取1桶水。** @author KHighness* @since 2021-08-03*/@Slf4j(topic = "MonkAndWaterDemo")public class MonkAndWaterDemo {/** 用于互斥地访问水井*/private static final Semaphore WELL = new Semaphore(1);/** 用于互斥地访问水缸 */private static final Semaphore VAT = new Semaphore(1);/** 用于表示水缸中剩余空间所能容纳的水的桶数 */private static final Semaphore EMPTY = new Semaphore(10);/** 表示水缸中水的桶数 */private static final Semaphore FULL = new Semaphore(0);/** 表示有多少个水桶可用 */private static final Semaphore BUCKET = new Semaphore(3);/*** 老和尚*/private static class OldMonk extends Thread {public OldMonk() {super("老和尚");}@SneakyThrows@Overridepublic void run() {while (true) {FULL.acquire(); // 请求水缸的水BUCKET.acquire(); // 请求水桶资源VAT.acquire(); // 请求水缸资源log.debug("从水缸中打一桶水");TimeUnit.SECONDS.sleep(1);VAT.release(); // 释放水缸资源EMPTY.release(); // 消耗水资源log.debug("喝水水");TimeUnit.SECONDS.sleep(1);BUCKET.release(); // 释放水桶资源}}}/*** 小和尚*/private static class YoungMonk extends Thread {public YoungMonk() {super("小和尚");}@SneakyThrows@Overridepublic void run() {while (true) {EMPTY.acquire(); // 请求水缸空间BUCKET.acquire(); // 请求桶资源WELL.acquire(); // 请求水井资源log.debug("从水井中打一桶水");TimeUnit.SECONDS.sleep(1);WELL.release(); // 释放水井资源VAT.acquire(); // 请求水缸资源log.debug("倒水水");TimeUnit.SECONDS.sleep(1);VAT.release(); // 释放水缸资源FULL.release(); // 增加水缸的水BUCKET.release(); // 释放水桶资源}}}public static void main(String[] args) {new OldMonk().start();new YoungMonk().start();}}
