14.1 状态依赖性的管理

  1. acquire lock on object state
  2. while (precondition does not hold){
  3. release lock
  4. wait until precondition might hold
  5. optionally fail if interrupted or timeout expires
  6. reacquire lock
  7. }
  8. perform action
  9. release lock

程序清单 14-1 可阻塞的状态依赖操作的结构

14.1.1 实例:将前提条件的失败传递给调用者 - GrumpyBoundedBuffer

14.1.2 实例:通过轮询与休眠来实现简单的阻塞

14.1.3 条件队列 - BoundedBuffer

“条件队列”这个名字来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。传统队列的元素是-一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。
Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获取锁。从直观上来理解,调用wait意味着“我要去休息了,但当发生特定的事情时唤醒我”,而调用通知方法就意味着“特定的事情发生了”。

14.2 使用条件队列

14.2.1 条件谓语

将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
条件谓词是使某个操作成为状态依赖操作的前提条件。在有界缓存中,只有当缓存不为空时,take 方法才能执行,否则必须等待。对take方法来说,它的条件谓词就是“缓存不为空”,take方法在执行之前必须首先测试该条件谓词。同样,put 方法的条件谓词是“缓存不满”。条件谓词是由类中各个状态变量构成的表达式。
每一次wait调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,开且这个锁必须保护着构成条件谓语的状态变量。

14.2.2 过早唤醒

内置条件队列可以与多个条件谓词-起使用。当一个线程由于调用notifyAll而醒来时,并不意味该线程正在等待的条件谓词已经变成真了。(这就像烤面包机和咖啡机共用一个铃声,当响铃后,你必须查看是哪个设备发出的铃声。)另外,wait 方法还可以“假装”返回,而不是由于某个线程调用了notify。
基于所有这些原因,每当线程从wait 中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在-一个循环中调用wait,并在每次迭代中都测试条件谓词。程序清单14-7给出了条件等待的标准形式。

  1. void stateDependentMethod() throws InterruptedException{
  2. //必须通过一个锁来保护条件谓词
  3. synchroni zed (1ock) {
  4. while (!conditionPredicate() )
  5. lock. wait() ;
  6. //现在对象处于合适的状态
  7. }
  8. }

程序清单14-7 状态依赖方法的标准形式
当使用条件等待时(例如Object,wait或Condition,await):
●通常都有一个条件谓词一一包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
●在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
●在一个循环中调用wait。
●确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
●当调用wait、 notify或notifyAll等方法时,一定要持有与条件队列相关的锁。
●在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

14.2.3 丢失的信号

第10章曾经讨论过活跃性故障,例如死锁和活锁。另一种形式的活跃性故障是丢失的信号。
丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发过的事件。
像上述程序清单中警示之类的编码错误(例如,没有在调用wait之前检测条件谓词)就会导致信号的丢失。如果按照程序清单14-7 的方式来设计条件等待,那么就不会发生信号丢失的问题。

14.2.4 通知 - BoundedBuffer

在条件队列API中有两个发出通知的方法,即notify和notifyAll。无论调用哪-一个, 都必须持有与条件队列对象相关联的锁。在调用notify时,JVM会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll则会唤醒所有在这个条件队列上等待的线程。
这里的条件队列用于两个不同的条件谓词:“非空”和“非满”。假设线程A在条件队列上等待条件谓词PA,同时线程B在同一个条件队列上等待条件谓词PB。现在,假设PB变成真,并且线程C执行一个notify : JVM将从它拥有的众多线程中选择-一个并唤醒。如果选择了线程A,那么它被唤醒,并且看到PA尚未变成真,因此将继续等待。同时,线程B本可以开始执行,却没有被唤醒。这并不是严格意义上的“丢失信号”,而更像一种“被劫持的”信号,但导致的问题是相同的:线程正在等待一-个已经 (或者本应该)发生过的信号。 :::tips 应优先使用notifyAll而不是notify ::: 只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  • 所有等待线程的类型都相同: 只有一个条件谓语与条件队列相关,并且每个线程在从wait返回后将执行相同的操作
  • 单进单出:在条件变量的每次通知,最多只唤醒一个线程来执行。

BoundedBuffer满足“单进单出”的条件,但不满足“所有等待线程的类型都相同”的条件,因此正在等待的线程可能在等待“非满”,也可能在等待“非空”。例如第5章的TestHarness中使用的“开始阀门”闭锁(单个事件释放一组线程)并不满足“单进单出”的需求,因为这个“开始阀门”将使得多个线程开始执行。
单次通知和条件通知都属于优化措施。通常,在使用这些优化措施时,应该遵循“首选使程序正确地执行,然后才使其运行得更快”这个原则。如果不正确地使用这些优化措施,那么很容易在程序中引入奇怪的活跃性故障。

14.2.5 实例:阀门类 - ThreadGate

14.2.6 子类的安全问题

14.2.7 封装条件队列

14.2.8 入口协议与出口协议

14.3 显式的Condition对象 - ConditionBoundedBuffer

https://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/Condition.html#method_summary
第13章曾介绍过,在某些情况下,当内置锁过于灵活时,可以使用显式锁。正如Lock是一种广义的内置锁,Condition (如上链接)也是一种广义的内置条件队列。
特别注意,在Conditon对象中,与wait、noify和noltifyAll方法对应的分别是awaitesignal和signaLAl18但是、CohditionfObject进行了扩展,因而它也包含wait和notify方法现与定要确保使用正确的版本—await和signal。
在使用显式的Condition和内置条件队列之间进行选择时,与在ReentrantLock和synchronized之间进行选择是一样的:如果需要一些高级功能,例如使用公平的队列操作或者在每个锁上对应多个等待线程集,那么应该优先使用Condition而不是内置条件队列。(如果需要ReentrantLock的高级功能,并且已经使用了它,那么就已经做出了选择。)

14.4 Synchronizer 剖析 - SemaphoreOnLock

事实上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS), 这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLock和Semaphore是基于AQS构建的,还包括CountDownLatch. ReentrantReadWriteLock、 SynchronousQueue 和FutureTask.
在Java 6中将基于AQS的SynchronousQueue替换为一个(可伸缩性更高的)非阻塞的版本。

14.5 AbstractQueuedSynchronizer(AQS) - OneShotLatch

关于AQS,书中14.5 节,当前看着有点吃力,作为面试重点,应多回头看看这一小节。
先把看得懂的地方小总结一下
AQS中包括两个操作:

  • 获取操作

    获取操作是一种依赖状态的操作,并且通常会阻塞。但获取也会包含多种获取:

    • 使用锁或信号量的获取,就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。
    • 在使用CountDownLatch时,获取”操作意味着“等待并直到闭锁到达结束状态”,
    • 在使用FutureTask时,则意味着“等待并直到任务已经完成”。
  • 释放操作

    “释放”并不是一个可阻塞的操作,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行。
    如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState, setState以及compareAndSetState等protected 类型方法来进行操作。这个整数可以用于表示任意状态。例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如,ReentrantLock 保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。
    程序清单14-13 给出了AQS中的获取操作与释放操作的形式。根据同步器的不同,获取操作可以是一种独占操作(例如ReentrantLock),也可以是-一个非独占操作(例如Semaphore和CountDownLatch)。一个获取操作包括两部分。首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是由同步器的语义决定的。例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取。

    1. boolean acquire{) throws InterruptedException{
    2. while ( 当前状态不允许获取操作) {
    3. 1f ( 需要阻塞获取请求) {
    4. 如果当前线程不在队列中,则将其插入队列.
    5. 阻塞当前线程
    6. }
    7. else
    8. 返回失败
    9. }
    10. 可能更新同步器的状态
    11. 如果线程位于队列中,则将其移出队列
    12. 返回成功
    13. }
    14. void release () {
    15. 更新同步器的状态
    16. if(新的状态允许某个被阻塞的线程获取成功)
    17. 解除队列中一个或多个线程的阻塞状态
    18. }

    程序清单14-13 AQS中获取操作和释放操作的标准形式

    14.6 java.util.concurrent同步器类中的AQS

    java.util.concurrent中的许多可阻塞类,例如ReentrantLock、Semaphore、 ReentrantReadWriteLock、CountDownLatch、 SynchronousQueue 和FutureTask等,都是基于AQS构建的。

14.6.1 ReentrantLock

可见:
公平版本的tryAcquire - java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire
非公平版本的tryAcquire - java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire
ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、 tryRelease 和isHeldExclusively,程序清单14-15给出了非公平版本的tryAcquire。ReentrantLock 将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量目。在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁:在tryAcquire中将使用这个域来区分获取操作是重人的还是竞争的。

  1. protected boolean tryAcquire (int ignored) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if(c == 0){
  5. if (compareAndsetState (01)){
  6. owner = current;
  7. return true;
  8. }
  9. } else if (current == owner) {
  10. .setstate(c+1);
  11. return true;
  12. }
  13. return false;
  14. }

程序清单14-15 基于非公平的 ReentrantLock 实现 tryAcquire
当一个线程尝试获取锁时,tryAcquire 将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。(请参见15.3节中对compareAndSet的描述)。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。

14.6.2 Semaphore 与 CountDownLatch

Semaphore将AQS的同步状态用于保存当前可用许可的数量。tryAcquireShared方法(请参见程序清单14-16)首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared 会通过compareAndSetState以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数自从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。

  1. protected int tryAcquireShared(int acquires) {
  2. while (true) {
  3. int available = getState();
  4. int remaining = available - acquires;
  5. if (remaining < 0
  6. || compareAndSetState (available, remaining) )
  7. return remaining ;
  8. }
  9. }
  10. protected boolean tryReleaseShared(int releases) {
  11. while (true) {
  12. int p = getState();
  13. if (compareAndSetState(p, p + releases))
  14. return true;
  15. }
  16. }

程序清单14-16 基于非公平的 Semaphore 中的tryAcquireShared 与 tryReleaseShared
当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可的计数以响应获取操作时,while 循环将终止。虽然对compareAndSetState的调用可能由于与另一个线程发生竞争而失败(请参见15.3节),并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一一个会变为真。同样,tryReleaseShared将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared 的返回值表示在这次释放操作中解除了其他线程的阻塞。
CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值。countDown 方法调用release, 从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。
虽然书中没有提到 CountDownLatch 的代码,不过自己看看吧,应该不是问题

  1. java.util.concurrent.CountDownLatch.Sync.tryReleaseShared
  2. // 该方法的参数acquires并没有用到
  3. protected int tryAcquireShared(int acquires) {
  4. // 当state == 0时,即开锁
  5. return (getState() == 0) ? 1 : -1;
  6. }
  7. // 该方法的参数releases并没有用到
  8. protected boolean tryReleaseShared(int releases) {
  9. // Decrement count; signal when transition to zero
  10. for (;;) {
  11. int c = getState();
  12. // 检查state的状态
  13. if (c == 0)
  14. return false;
  15. int nextc = c-1;
  16. if (compareAndSetState(c, nextc))
  17. return nextc == 0;
  18. }
  19. }
  20. public boolean await(long timeout, TimeUnit unit)
  21. throws InterruptedException {
  22. // await将会调用acquire计数器减一
  23. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  24. }

就和书中说的一样,CountDownLatch的同步状态中保存的是当前的计数值。
看tryAcquireShared,当state == 0时,即开锁。
看tryReleaseShared,调用后,将会自动减一。 int nextc = c-1;
* 关于以上两个方法的参数,其实可有用可无用,看锁的实现策略。比如Semaphore,他需要这个值,而CountDownLatch不需要。

14.6.3 FutureTask - 这个是真不懂了=- -,先留着 java.util.concurrent.FutureTask.get()

初看上去,FutureTask甚至不像一个同步器,但Future.get的语义非常类似于闭锁的语义一如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以 恢复执行,否则这些线程将停留在队列中并直到该事件发生。在FutureTask中,AQS同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。FutureTask 还维护- -些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

14.6.4 ReentrantReadWriteLock - 还是懵逼

ReadWriteLock接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。Reentrant-ReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16 位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写人线程之前的所有线程都将获得这个锁。
这种机制并不允许选择读取线程优先或写人线程优先等策略,在某些读写锁实现中也采用了这种方式。因此,要么AQS的等待队列不能是一个FIFO队列,要么使用两个队列。然而,在实际中很少需要这么严格的排序策略。如果非公平版本的ReentrantReadWriteLock无法提供足够的活跃性,那么公平版本的ReentrantReadWriteLock通常会提供令人满意的排序保证,井且能确保读取线程和写入线程不会发生饥饿问题。

其他

另外,关于AQS,敖丙的文章也有提到一点 - 在比较下面的,其次这篇文章提到的ABA问题,比较并交换等,本书的后面会介绍。
https://zhuanlan.zhihu.com/p/111266704