8.1 CountDownLatch
CountDownLatch
计数器
特点:内部计数器递减。
功能:主线程等待所有子线程执行完毕进行汇总。
8.1.1 案例
任务分解,第三个任务需要等待第一个任务和第二个任务执行计算后进行汇总。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* @author KHighness
* @since 2021-05-07
*/
@Slf4j(topic = "CountDownLatch")
public class CountDownLatchDemo {
private static int total = 0;
private static final CountDownLatch countDownLatch = new CountDownLatch(2);
private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
private static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
executorService.submit(() -> {
log.debug("state = {}", countDownLatch.getCount());
total += 1;
sleep(1, TimeUnit.SECONDS);
log.debug("{} run over", Thread.currentThread().getName());
countDownLatch.countDown();
log.debug("state = {}", countDownLatch.getCount());
});
executorService.submit(() -> {
log.debug("state = {}", countDownLatch.getCount());
total += 2;
sleep(1, TimeUnit.SECONDS);
log.debug("{} run over", Thread.currentThread().getName());
countDownLatch.countDown();
log.debug("state = {}", countDownLatch.getCount());
});
executorService.submit(() -> {
log.debug("state = {}", countDownLatch.getCount());
try {
countDownLatch.await();
log.debug("result: total = {}", total);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{} run over", Thread.currentThread().getName());
});
executorService.shutdown();
sleep(3, TimeUnit.SECONDS);
executorService.shutdownNow();
}
}
运行结果:
2021-05-08 12:14:58.374 [pool-1-thread-1] DEBUG CountDownLatch - state = 2
2021-05-08 12:14:58.374 [pool-1-thread-2] DEBUG CountDownLatch - state = 2
2021-05-08 12:14:58.374 [pool-1-thread-3] DEBUG CountDownLatch - state = 2
2021-05-08 12:14:59.379 [pool-1-thread-2] DEBUG CountDownLatch - pool-1-thread-2 run over
2021-05-08 12:14:59.379 [pool-1-thread-1] DEBUG CountDownLatch - pool-1-thread-1 run over
2021-05-08 12:14:59.380 [pool-1-thread-2] DEBUG CountDownLatch - state = 1
2021-05-08 12:14:59.380 [pool-1-thread-1] DEBUG CountDownLatch - state = 0
2021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - result: total = 3
2021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - pool-1-thread-3 run over
8.1.2 原理
类图
(1)构造方法
入参:count
,会将计数器值count
赋给了AQS的状态变量state
。
// CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// Sync
Sync(int count) {
setState(count);
}
(2)void await()
方法
当线程调用CountDownLatch
的await
方法后,当前线程就会阻塞,直到:
- 所有线程都调用了
CountDownLatch
对象的countDown
方法后,即计数器值为0时 - 其他线程调用了当前线程的
interrupt
方法中断了当前线程,当前线程抛出InterruptedException
异常
// CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS
// 获取共享资源时可被中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 如果线程被中断即抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 查看当前计数器值是否为0,为0则直接返回,否则进入AQS的队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// Sync
// 实现的AQS接口
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
(3)boolean await(long timeout, TimeUnit unit)
方法
当线程调用了CountDownLatch
的该方法后,当前线程会被阻塞,直到:
- 所有线程都调用了
CountDownLatch
对象的countDown
方法后,即计数器值为0时 - 设置的
timeout
时间到了,因为超时返回false
- 其他线程调用了当前线程的
interrupt
方法中断了当前线程,当前线程抛出InterruptedException
异常
// CountDownLatch
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
(4)void countDown()
方法
线程调用该方法后,计数器的值递减,递减后如果计数器的值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。
如果state原始值为n,有(n+1)个线程调用了countDown
方法,那么第(n+1)个线程调用无效。
// CountDownLatch
public void countDown() {
sync.releaseShared(1);
}
// AQS
public final boolean releaseShared(int arg) {
// 调用Sync的实现,成功则唤醒阻塞的线程
if (tryReleaseShared(arg)) {
// AQS释放资源
doReleaseShared();
return true;
}
return false;
}
// Sync
protected boolean tryReleaseShared(int releases) {
// 循环CAS使计数器(状态值state)减1并更新,直到成功
for (;;) {
int c = getState();
// 防止state变成负数
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
(5)long getCount()
方法
获取当前计数器的值,即AQS的state
值,一般用于测试。
// CountDownLatch
public long getCount() {
return sync.getCount();
}
// AQS
int getCount() {
return getState();
}
8.1.3 小结
CountDownLatch
相比于使用线程的join
方法来实现线程间同步,前者更具有灵活性和方便性,因为在ExecutorService
线程池中无法直接调用其他线程的join
方法。
CountDownLatch
使用AQS的状态变量state
来存放计数器的值。首先在初始化设置计数器值(AQS状态值),多个线程调用countDown
方法实际是原子性递减AQS的状态值。当线程调用await
方法后当前线程会被放入AQS的阻塞队列等待,待计数器为0再返回。其他线程调用countDown
方法让计数器值递减1,当计数器值变成0时,当前线程还要调用AQS的doReleaseSShared
方法来激活由于调用await
方法而被阻塞的线程。
8.2 CyclicBarrier
特点:内部计数器递减。
功能:让一组线程全部达到一个状态后再全部同时执行。
[^ 1]: 回环是因为当所有线程执行完毕,并重置CylicBarrier
的状态以便重用。
[^ 2]:线程调用await
方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await
方法后,线程们就会冲破屏障,继续向下运行。
8.2.1 案例
一个任务需要三步完成,需要执行多个任务。
@Slf4j(topic = "CyclicBarrier")
public class CyclicBarrierDemo {
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {log.debug("==========================");});
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
private static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
log.debug("{} first step", Thread.currentThread().getName());
sleep(1, TimeUnit.SECONDS);
cyclicBarrier.await();
log.debug("{} second step", Thread.currentThread().getName());
sleep(1, TimeUnit.SECONDS);
cyclicBarrier.await();
log.debug("{} third step", Thread.currentThread().getName());
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
运行结果:
2021-05-08 16:28:58.549 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 first step
2021-05-08 16:28:58.549 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 first step
2021-05-08 16:28:58.549 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 first step
2021-05-08 16:28:59.558 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================
2021-05-08 16:28:59.559 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 second step
2021-05-08 16:28:59.559 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 second step
2021-05-08 16:28:59.559 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 second step
2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================
2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 third step
2021-05-08 16:29:00.570 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 third step
2021-05-08 16:29:00.570 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 third step
8.2.2 原理
类图
CyclicBarrier
基于独占锁实现,本质底层还是基于AQS的。
属性:
lock
:独占锁。trip
:条件变量。barrierCommand
:到达屏障点执行的任务。parties
:线程计数器,这里表示多少线程调用await
后,所有线程才会冲破屏障继续向下允许。count
:执行记录器,一开始等于parties
,每当有线程调用await
就递减1,当count
为0时就表示所有线程都到了屏障点。
parties
始终用来记录总的线程个数,当count
计数器值变为0后,会将parties
的值赋给count
,进而进行服用。
内部类Generation
仅有一个属性broken
,用来记录当前屏障是否被打破。是在锁内使用变量,所以并没有声明为volatile
。
(1)构造方法
入参:parties
(必选)、barrierAction
(可选)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
(2)int await()
方法
当前线程调用了CyclicBarrier
的该方法时会被阻塞,直到:
parties
个线程都调用了await
方法,线程到达屏障点- 其他线程调用了当前线程的
interrupt
方法中断了当前线程,则当前线程抛出InterruptedExcetion
异常 - 与当前屏障点关联的
Generation
对象的broken
标志被设置为true
时,会抛出BrokenBarrierException
异常
内部调用dowait
方法,第一个参数为false说明不设置超时时间
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
(3)boolean await(long timeout, TimeUnit unit)
方法
当先线程调用了CyclicBarrier
的该方法时会被阻塞,直到:
parties
个线程都调用了await
方法,也就是线程都到了屏障点,这时候返回true
- 设置的
timeout
时间到了,因为超时返回false
- 其他线程调用当前线程的
interrupt
方法中断了当前线程,则当前线程会抛出InterruptedException
- 与当前屏障点关联的
Generation
对象的broken
标志被设置为true
时,会抛出BrokenBarrierException
异常
内部调用dowait
方法,第一个参数说明设置超时,第二个参数是超时时间
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
(3)int dowait(boolean timed, long nanos)
方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
...
// (1)如果index=0则说明所有线程都到了屏障点,此时执行初始化时传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
// (2) 执行任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// (3)激活其他因调用await方法而被阻塞的线程,并且重置CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// (4)如果index!=0
for (;;) {
try {
// (5)未设置超时时间
if (!timed)
trip.await();
// (6)设置了超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
...
}
...
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// (7)唤醒条件队列中的阻塞线程
trip.signalAll();
// (8)重置CyclicBarrier
count = parties;
generation = new Generation();
}
8.2.3 小结
CycleBarrier
与CountDownLatch
的不同之处在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。
内部通过ReentrantLock
独占锁实现计数器原子性更新,并使用条件变量队列来实现线程同步。
8.3 Semaphore
Semaphore
信号量
特点:内部计数器递增。
功能:限制能同时访问共享资源的线程上限。
8.3.1 案例
主线程等待两个子任务执行完毕。
@Slf4j(topic = "CountDownLatch")
public class CountDownLatchDemo {
private static final CountDownLatch countDownLatch = new CountDownLatch(2);
private static final ExecutorService executorService = Executors.newFixedThreadPool(2);
private static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
executorService.submit(() -> {
sleep(1, TimeUnit.SECONDS);
countDownLatch.countDown();
});
executorService.submit(() -> {
sleep(1, TimeUnit.SECONDS);
countDownLatch.countDown();
});
log.debug("wait all child thread over");
countDownLatch.await();
log.debug("all child thread over");
}
}
8.3.2 原理
类图
Semaphore
还是使用AQS实现的,Sync
只是对AQS的一个修饰,并且Sync
有两个实现类,用来指定获取信号量时是否采用公平策略。
(1)构造方法
入参:permits
(必选)、fair
(可选)
Semphore
默认采用非公平策略,如果需要使用公平策略需要使用双参构造方法。初始化信号量个数permits
被赋给了AQS的状态变量state
。
// Semophore
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Sync
Sync(int permits) {
setState(permits);
}
(2)void acquire()
方法
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后直接返回。
public void acquire() throws InterruptedException {
// 传递参数为1,说明要获取一个信号量资源
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// (1)如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// (2)否则调用Sync子类方法尝试获取,这里根据构造方法确定NonfairSync还是FairSync
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
对于tryAcquireShared
,分两种情况讨论:
- 非公平锁
先获取当前信号量,然后减去需要获取的值,得到剩余信号量个数,如果剩余信号量小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonfairSync
是非公平获取的,也就是说先调用acquire
方法获取信号量的线程不一定比后来者先获取到信号量。 ```java // NonfairSync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
// Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取当前信号量 int available = getState(); // 计算当前剩余量 int remaining = available - acquires; // 如果当前剩余值小于0或者CAS设置成功则返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
2. 公平锁 <br />AQS公平性的保证就靠`hasQueuedPredecessors`这个方法,如果当前线程节点的前驱结点是否也在等待获取该资源,是则放弃自己获取信号量的资格。
```java
protected int tryAcquireShared(int acquires) {
for (;;) {
// 先检查阻塞队列中是否有前驱结点
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
(3)void acquire(int permits)
方法
该方法与acquire()
方法不同,后者只需要获取一个信号量值,而前者则获取perimits
个。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
(4)void acquireUninterruptibly()
方法
该方法与acquire()
类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly
获取资源时,其他线程调用了当前线程的interrupt
方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException
异常而返回。
(5)void accquireUninterruptibly(int permits)
方法
该方法与acquire(int permits)
方法的不同之处在于,该方法对中断不响应。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
(6)void release()
方法
该方法的作用是把当前Semaphore
对象的信号量增加1
,如果当前有线程因为调用acquire
方法被阻塞而被放入AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
// Semaphore
public void release() {
// (1)arg=1
sync.releaseShared(1);
}
// AQS
public final boolean releaseShared(int arg) {
// (2) 尝试释放资源锁
if (tryReleaseShared(arg)) {
// (3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
doReleaseShared();
return true;
}
return false;
}
// Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// (4)获取当前信号量值
int current = getState();
// (5)当前信号量值+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// (6)通过CAS更新信号量的值
if (compareAndSetState(current, next))
return true;
}
}
(7)void release(int permits)
方法
该方法与release()
的不同之处在于,前者让信号量加permits
,后者加1
。
8.3.3 小结
Semaphore
完全可以达到CountDownLatch
的效果,但是Semaphore
的计数器是不可以自动重置的,不过通过变相的改变acquire
方法的参数还是可以实现CyclicBarrier
的功能的。
8.4 经典题目
8.4.1 交错执行
/**
* A,B,C三个线程有序交替执行
*
* @author KHighness
* @since 2021-05-07
*/
@Slf4j(topic = "Alternately")
public class AlternatelyDemo {
/**
* 三个信号量分别控制A,B,C的打印
*/
private static final Semaphore[] s = { new Semaphore(1), new Semaphore(1), new Semaphore(1)};
private static final int size = 3;
private static final char[] arr = {'A', 'B', 'C'};
private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException {
s[1].acquire();
s[2].acquire();
for (int i = 0; i < size; i++) {
final int finalI = i;
executorService.submit(() -> {
while (true) {
try {
s[finalI].acquire();
TimeUnit.MILLISECONDS.sleep(100);
log.debug("{} => [{}]", arr[finalI], System.nanoTime());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s[(finalI + 1) % size].release();
}
}
});
}
}
}
8.4.2 生产者-消费者
/**
* 生产者-消费者模型
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "SemaphoreProducerConsumerModel")
public class ProducerConsumerDemo {
/** 数据队列 */
private final static Queue<Character> QUEUE = new LinkedList<>();
/** 最大容量 */
private final static int QUEUE_MAX_SIZE = 5;
/** 随机变量 */
private final static ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
/** 表示可消费的资源数 */
private final static Semaphore FULL = new Semaphore(0);
/** 表示队列的剩余空间 */
private final static Semaphore EMPTY = new Semaphore(QUEUE_MAX_SIZE);
/** 产品队列访问互斥的信号量 */
private final static Semaphore MUTEX = new Semaphore(1);
/**
* 生成随机数字/字母字符
* ascii char
* 48-57 0~9
* 65-90 A~Z
* 97-122 a~z
* @return ascii码字符
*/
private static Character randomChar() {
int choice = RANDOM.nextInt(3);
if (choice == 0)
return (char) (RANDOM.nextInt(10) + 48);
else if (choice == 1)
return (char) (RANDOM.nextInt(26) + 65);
else
return (char) (RANDOM.nextInt(26) + 97);
}
/**
* 生产者
*/
private static class Producer extends Thread {
public Producer() {
super("生产者");
}
@SneakyThrows
@Override
public void run() {
while (true) {
Character c = randomChar(); // 生产数据
TimeUnit.SECONDS.sleep(1); // 模拟延时
EMPTY.acquire(); // 请求空间
MUTEX.acquire(); // 请求访问
log.debug("生产 => [{}]", c); // 输出日志
QUEUE.add(c); // 放入队列
MUTEX.release(); // 释放队列
FULL.release(); // 数据增加
}
}
}
/**
* 消费者
*/
private static class Consumer extends Thread {
public Consumer() {
super("消费者");
}
@SneakyThrows
@Override
public void run() {
while (true) {
FULL.acquire(); // 请求数据
MUTEX.acquire(); // 请求访问
TimeUnit.SECONDS.sleep(1); // 模拟延时
Character c = QUEUE.poll(); // 消费数据
log.debug("消费 => [{}]", c); // 输出日志
MUTEX.release(); // 释放队列
EMPTY.release(); // 空间增加
}
}
}
public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
}
8.4.3 读写者
/**
* 读写者问题
* 有读者和写者两组并发进程,共享一个文件,当两个或以上的读进程同时访问共享数据时
* 不会产生副作用,但若某个写进程和其他进程(读进程或写进程)同时访问共享数据时则
* 可能导致数据不一致的错误。
*
* <p>要求:
* <li>(1) 允许多个读者可以同时对文件执行读操作
* <li>(2) 只允许一个写者往文件中写信息
* <li>(3) 任一写者在完成写操作之前不允许其他读者或写者工作
* <li>(4) 写者执行写操作前,应让已有的读者和写者全部退出
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "ReaderWriterDemo")
public class ReaderWriterDemo {
/** 用于读写者互斥访问文件 */
private final static Semaphore FILE_MUTEX = new Semaphore(1);
/** 当前读者数量 */
private static int READER_COUNT = 0;
/** 对READER_COUNT操作的互斥变量 */
private final static Semaphore COUNT_MUTEX = new Semaphore(1);
/**
* 写者
*/
private static class Writer extends Thread {
public Writer() {
super("写者");
}
@SneakyThrows
@Override
public void run() {
while (true) {
FILE_MUTEX.acquire(); // 请求访问文件
log.debug("写入文件"); // 输出日志
TimeUnit.SECONDS.sleep(1);
FILE_MUTEX.release(); // 释放共享文件
}
}
}
private static class Reader extends Thread {
public Reader() {
super("读者");
}
@SneakyThrows
@Override
public void run() {
while (true) {
COUNT_MUTEX.acquire(); // 请求访问COUNT
if (READER_COUNT == 0) // 第一个读进程
FILE_MUTEX.acquire(); // 阻止写进程写
READER_COUNT++; // 读者计数器递增
COUNT_MUTEX.release(); // 恢复访问COUNT
log.debug("阅读文件"); // 输出日志
TimeUnit.SECONDS.sleep(1);
COUNT_MUTEX.acquire(); // 请求访问COUNT
READER_COUNT--; // 读者计数器递减
if (READER_COUNT == 0) // 最后一个读进程
FILE_MUTEX.release(); // 允许写进程写
COUNT_MUTEX.release(); // 恢复访问COUNT
}
}
}
public static void main(String[] args) {
new Writer().start();
new Reader().start();
}
}
8.4.4 哲学家就餐
/**
* 哲学家就餐问题
* 当一名哲学家左右两边的筷子都可用时,才允许拿起筷子。
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "PhilosopherDinnerDemo")
public class PhilosopherDinnerDemo {
/** 筷子 */
private final static Semaphore[] CHOPSTICKS = {new Semaphore(1), new Semaphore(1),
new Semaphore(1), new Semaphore(1), new Semaphore(1)};
/** 互斥取筷子 */
private final static Semaphore MUTEX = new Semaphore(1);
/**
* 哲学家
*/
private static class Philosopher extends Thread {
private final int index;
public Philosopher(int index) {
super("哲学家-" + index);
this.index = index;
}
@SneakyThrows
@Override
public void run() {
while (true) {
MUTEX.acquire();
CHOPSTICKS[index].acquire();
CHOPSTICKS[(index + 1) % 5].acquire();
MUTEX.release();
log.debug("进餐");
CHOPSTICKS[index].release();
CHOPSTICKS[(index + 1) % 5].release();
log.debug("思考");
}
}
}
public static void main(String[] args) {
Philosopher[] philosophers = new Philosopher[5];
for (int i = 0; i < philosophers.length; i++) {
philosophers[i] = new Philosopher(i);
philosophers[i].start();
}
}
}
8.4.5 和尚和水
/**
* 和尚与水问题
* 寺庙有小和尚和老和尚若干,有一水缸,由小和尚提水入缸供老和尚引用。
* 水缸可容10桶水,井每次只能容一个桶取水。水桶总数为3个,每次入缸仅取1桶水。
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "MonkAndWaterDemo")
public class MonkAndWaterDemo {
/** 用于互斥地访问水井*/
private static final Semaphore WELL = new Semaphore(1);
/** 用于互斥地访问水缸 */
private static final Semaphore VAT = new Semaphore(1);
/** 用于表示水缸中剩余空间所能容纳的水的桶数 */
private static final Semaphore EMPTY = new Semaphore(10);
/** 表示水缸中水的桶数 */
private static final Semaphore FULL = new Semaphore(0);
/** 表示有多少个水桶可用 */
private static final Semaphore BUCKET = new Semaphore(3);
/**
* 老和尚
*/
private static class OldMonk extends Thread {
public OldMonk() {
super("老和尚");
}
@SneakyThrows
@Override
public void run() {
while (true) {
FULL.acquire(); // 请求水缸的水
BUCKET.acquire(); // 请求水桶资源
VAT.acquire(); // 请求水缸资源
log.debug("从水缸中打一桶水");
TimeUnit.SECONDS.sleep(1);
VAT.release(); // 释放水缸资源
EMPTY.release(); // 消耗水资源
log.debug("喝水水");
TimeUnit.SECONDS.sleep(1);
BUCKET.release(); // 释放水桶资源
}
}
}
/**
* 小和尚
*/
private static class YoungMonk extends Thread {
public YoungMonk() {
super("小和尚");
}
@SneakyThrows
@Override
public void run() {
while (true) {
EMPTY.acquire(); // 请求水缸空间
BUCKET.acquire(); // 请求桶资源
WELL.acquire(); // 请求水井资源
log.debug("从水井中打一桶水");
TimeUnit.SECONDS.sleep(1);
WELL.release(); // 释放水井资源
VAT.acquire(); // 请求水缸资源
log.debug("倒水水");
TimeUnit.SECONDS.sleep(1);
VAT.release(); // 释放水缸资源
FULL.release(); // 增加水缸的水
BUCKET.release(); // 释放水桶资源
}
}
}
public static void main(String[] args) {
new OldMonk().start();
new YoungMonk().start();
}
}