构建自定义的同步工具
类库中包含了大量依赖于状态的类——这些类拥有基于状态的先验条件——FutureTask、Semaphore、BlockingQueue。创建状态依赖类最简单的方法通常是将它构建于已有的状态依赖库之上。如果类库没有提供你需要的功能,你可以使用语言和类库提供的底层机制,包括内部条件队列、显式的Condition对象和AbstractQueuedSynchronizer框架,构建属于自己的Synchronized。
12.1管理状态依赖性
对于并发对象,依赖于状态的方法有时可以在不能满足先验条件的情况下选择失败,不过更好的选择是等待先验条件变为真。
有一种依赖于状态的操作,能够被阻塞直到可以继续执行。
//有限缓存不同实现的基类(基于队列的循环缓存)
public abstract class BaseBoundedBuffer<V>{
@GuardedBy("this")
private final V[] buf;
@GuardedBy("this")
private int tail;
@GuardedBy("this")
private int head;
@GuardedBy("this")
private int count;
protected BaseBoundedBuffer(int capactiy){
this.buf = (V[]) new Object(capactiy);
}
protected synchronized final void doPut(V v){
buf[tail] = v;
if(++tail == buf.length){
tail = 0;
}
++ count;
}
protected synchronized final V doTake(){
V v = buf[head];
buf[head] = null;
if(++head == buf.length){
head = 0;
}
-- count;
return v;
}
public synchronized final boolean isFull(){
return count == buf.length;
}
public synchronized final boolean isEmpty(){
return count == 0;
}
}
12.2条件队列
条件队列可以让一组线程——称作等待集——以某种方式等待相关条件变成真。
不同于传统的队列,它们的元素是数据项。条件队列的元素是等待相关条件的线程。
示例:
//有限缓存使用条件队列
public class BoundedBuffer<V> extends BaseBoundedBuffer<V>{
//条件谓词:not—full (!isFull())
//条件谓词:not_empty(!isEmpty())
public BoundedBuffer(int size){
super(size);
}
//阻塞,直到 not-full
public synchronized void put(V v) throws InterruptedException{
while(isFull()){
wait();//等待,自动释放锁
}
doPut(v);
notifyAll();//唤醒其他线程(take())
}
public synchronized void V take() throws InterruptedException{
while(isEmpty()){
wait();
}
V v = doTake();
notifyAll(); //唤醒其他线程(put())
return v;
}
}
12.2.1条件谓词
正确使用条件队列的关键在于识别出**对象可以等待的条件谓词**。
条件谓词是先验条件的第一站,他在一个操作与状态之间建立起依赖关系。(例如,有限缓存中,只有缓存不为空时take才能执行,所以 take的条件谓词是“缓存不空”,类似的,put的条件谓词是“缓存不满”)
条件谓词是由类的状态变量构成的表达式:BaseBoundedBuffer是通过比较count与0,测试是否“缓存不空”,通过比较count与缓存大小,测试是否“缓存不满”。
在涉及了加锁、wait方法和条件谓词的条件等待中,存在着一种三元关系:
**条件谓词涉及状态变量,状态变量是由锁保护的,所以在测试条件谓词之前,必须先持有锁,锁对象与条件队列对象(wait和notify方法调用的对象)也必须是同一个对象**。
每次调用wait都会隐式地与特定的条件谓词相关联,当调用特定条件谓词的wait时,调用者必须已经持有了与条件队列相关的锁,这个锁必须同时还保护着组成条件谓词的状态变量。
12.2.2过早地唤醒
锁、条件谓词和条件队列之间存在的三元关系看似并不错综复杂,但wait的返回并不一定意味着线程正在等待的条件谓词已经变成真的了。
**一个单独的内部条件队列可以与多个条件谓词共同使用**。当有人调用notifyAll,从而唤醒了你的线程时,并不意味着你正在等待的条件谓词比现在变成真了。
当控制流重新进入调用wait的代码时,他会重新请求与条件队列相关联的锁。现在条件谓词就一定是真吗?不一定。它可能在通知线程调用notifyAll的时候变为真,但是在你重新请求锁时又变为假。在你的线程被唤醒到wait重新请求锁的这段时间内,其他线程可能已经请求到了锁,并改变了对象的状态。也可能自从你调用了wait之后,条件谓词就没有变为真过。你无法知道另一个线程为什么调用notify或notifyAll。
基于所有的这些原因,当你从wait中唤醒后,必须再次测试条件谓词。如果条件谓词为假,就继续等待(或失败)。
示例:
//状态依赖方法的规范式
void stateDependentMethod() throws InterruptedException{
//条件谓词必须被锁守护
synchronized(lock){
while(!conditionPredicate()){
loca.wait();
}
//现在,对象处于期望状态。
}
}
当使用条件等待时(Object.wait胡总和Condition.await):
- 永远设置一个条件谓词——一些对象状态的测试,线程执行前必须满足它。
- 永远在调用wait前测试条件谓词,并且从wait中返回后再次测试。
- 永远在循环中调用wait。
- 确保构成条件谓词的状态变量被锁守护,而这个锁正数与条件队列相关联的。
- 当调用wait、notify、notifyAll时,要持有与条件队列相关联的锁。
- 在检查条件谓词之后,开始执行被保护的逻辑之前,不要释放锁。
12.2.3丢失的信号
活跃度失败:死锁、活锁、丢失的信号。
丢失的信号:当一个线程等待的特定条件已经为真,但是进入等待前检查条件谓词却返回了假,这就出现了一个丢失的信号。
未能在调用wait之前先检测条件谓词,就会导致信号的丢失。如果按照上面的状态依赖规范式来架构等待条件,就不会再遇到信号丢失的问题。
12.2.4通知
条件等待所发生的事情可以分为两半:等待、通知。
在有限缓存中,如果缓存尾空,调用take将会阻塞,在缓存变为非空时,为了能够让take解除阻塞,我们必须确保每一条能够让缓存变为非空的代码路径都执行一个通知。
无论如何,当你在等待一个条件,一定要确保有人会在条件谓词变为真时通知你。
由于会有多个线程因为不同的原因在同一个条件队列中等待,因此不用notifyAll而使用notify是危险的。因为单一的通知容易导致同类的线程丢失全部信号。
notify:JVM将从它所拥有的众多线程中选择一个并唤醒。notifyAll:唤醒所有。
只有同时满足下述条件后,才能用单一的notify取代notifyAll:
相同的等待者,只有一个条件谓词与条件队列相关,每个线程从wait返回后执行相同的逻辑,并且,一进一出,一个对条件变量的通知,之多只激活一个线程执行。
但是,如果只有一个线程可以执行,使用notifyAll效率很低。如果有10个线程在条件队列中等待,调用notifyAll会唤醒每一个线程,让他们去竞争锁,然后大多数又回到休眠状态,这意味着带来了大量的上下文切换和竞争锁的请求。
优化:
以BoundedBuffer中的put和take举例,我们可以对其进行优化,首先,观察只有当缓存从空转为非空时,或者从满转为非满时,才需要从等待队列中释放一个线程。并且,只有当put或take影响到这些状态转换的某一种时,才发出通知。(这叫做“**依据条件通知**”)
示例:
public synchronized void put(V v) throws InterruptedException{
while(isFUll()){
wait();
}
boolean wasEmpty = isEmpty();
doPut(v);
if(wasEmpty){
notifyAll;
}
}
12.2.5示例:阀门类
闭锁会阻止线程通过开始阀门,知道阀门被打开,此时所有的线程都可以通过。
虽然闭锁机制通常能够准确的满足我们的需要,但是在闭锁的行为下构建的“阀门”一旦被打开,就不能再重新关闭,有时这会成为一个缺陷。
使用条件等待开发一个可冲关闭的ThreadGate很容易。
@ThreadSafr
public class ThreadGate{
//条件谓词:opened-since(n) {isOpen || generation>n}
@GuardedBy("this")
private boolean isOpen;
@GuardedBy("this")
private int generation;
public synchronized void close(){
isOpen = false;
}
public synchronized void open(){
++generation;
isOpen = true;
notifyAll();
}
//阻塞,直到:opened-since(generation on entry)
public synchronized void await() throws InterruptedException{
int arrivalGeneration = generation;
while(!isOpen && arrivalGeneration == generation){
wait();
}
}
}
12.2.6子类的安全问题
使用依据条件的或者单一的通知会引入一些约束,导致子类化变得更加复杂。如果你的基类在子类化时,会被某种方式破坏单一或条件通知的某个要求。
**一个依赖于状态的类,要么完全将它的等待和通知协议暴露给子类,要么完全阻止子类参与其中**。
**还有另一种选择就是直接禁止子类化,可以通过把类声明为fianl类型的,或者通过对子类隐藏条件队列、锁和状态变量来完成**。
(例子:如果有一个无限的阻塞栈,它的pop操作在栈为空的时候被阻塞,但是它的push永远都可以执行。这符合使用单一通知的条件。如果这个类使用的正是单一通知,而且在一个子类中添加了一个阻塞的“弹出两个连续元素”的方法,这样就有两种等待者了。等待弹出一个元素和等待弹出两个元素)
12.2.7封装条件队列
通常,最好可以把条件队列封装起来,这样在使用它的类层次结构外,是不能访问它的。
但是,这条建议——使用封装的对象作为条件队列——并没有被一个最常见的用于线程安全类涉及模式所遵循:使用对象的内部所来保护对象自身的状态。
但是,它却可以很容易地被重新构建,并使用一个私有的锁对象和条件队列:唯一的不同在于,它将不再支持任何形式的客户端加锁。
12.2.8入口协议和出口协议
对于每个依赖于状态的操作,以及每个修改了其他状态的操作(对于每一个修改状态的操作,并且其他操作对该状态有状态依赖),你都应该为其定义一个入口协议和出口协议。
入口协议:操作的条件谓词。
出口协议:检查任何被操作改变的状态变量。确认它们是否引起其他一些条件谓词变为真。
AbstractQueueSynchronized 采用了出口协议的概念,位于java.util.concurrent包下的大部分状态依赖类都构建于它之上。它没有让synchronized类自已去执行通知,而是要求同步方法返回一个值,让这个值说明它的动作是否可能已经阻塞了一个或多个等待线程。
12.3显式的Condition对象
当内部锁非常不灵活时,显式锁就排上用场。
正如Lock是广义的内部锁,Condition也是广义的内部条件队列。
一个Condition和一个单独的Lock相关联,就像条件队列和单独的内部锁相关联一样,调用与Condition相关联的Lock和Lock.newCondition方法,可以创建以一个Condition。
Condition也提供了比内部条件队列要丰富的多的特征集:每个锁可以有多个等待集、可中断/不可中断的条件队列、基于时限的等待以及公平/非公平队列之间的选择。
Condition接口:
public interface Condition{
void await() throws InterruptedException;
boolean await(Long time, TimeUnit unit) throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
void awaitUninterruptibly();
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
不同于内部条件队列,你可以让每个Lock都有任意数量的Condition对象。Condition对象继承了与之相关的锁的公平性特征:如果是公平锁,线程会依照FIFO的顺序从Condition.await中释放。
wait、notify、notifyAll 在Condition对象中的对等体是 await、signal和signalAll。
但是Condition继承于Object,这意味着它也有wait和notify方法,但是,不要使用!
示例:
@ThreadSafe
public class ConditionBoundedBuffer<T>{
protected final Lock lock = new ReentrantLock();
//条件谓词:notFull
private final Condition notFull = lock.newCondition();
//条件谓词:notEmpty
private final Condition notEmpty = lock.newCondition();
@GuardedBy("lock")
private final T[] items = (T[]) new Object(BUFFER_SIZE);
@GuardedBt("lock")
private int tail,head,count;
//阻塞,直到:notFull
public void put(T x) throws InterruptedException{
lock.lock();
try{
while (count == items.length){
notFull.await();
}
items[tail] = x;
if(++tail == items.length){
tail = 0;
}
++ count;
notEmpty.signal();
}finally{
lock.unlock();
}
}
//阻塞,直到:notEmpty
public T take() throws InterruptedException{
lock.lock();
try{
while(count == 0){
notEmpty.await();
}
T x = items[head];
items[head] = null;
if(++head == items.length){
head = 0;
}
-- count;
notFull.signal();
return x;
}finally{
lock.unlock();
}
}
}
ConditionBoundedBuffer 的行为和 BoundedBuffer 相同,但是它使用条件队列的方式,具有更好的可读性——分析使用多个Condition的类,要不分析一个使用单一内部队列加多个条件谓词的累简单得多。
Condition简化了使用单一通知的条件,使用更有效的signal,减少了上下文切换,而且每次缓存操作都会触发对锁的请求。
就像内置的锁和条件队列一样,当使用显式Lock和Condition时,也必须要满足锁、条件谓词和条件变量之间的三元关系。涉及条件谓词的变量必须由Lock保护,检查条件谓词时以及调用await、和signal时,必须持有Lock对象。
12.4剖析Synchronizer
ReentrantLock 和Semaphore这两个接口有很多共同点:
- 这些类都扮演了”阀门“的角色,每次只允许有限数目的线程通过它
- 线程到达阀门后,可以允许通过(lock或acquire成功返回)
- 线程到达阀门后,可以允许等待(lock或acquire阻塞)
- 线程到达阀门后,可以被取消(tryLock或tryAcquire返回false,指明在允许的时间内,锁或者”许可“不可用)
- 它们都允许可中断、不可中断的、可限时的请求尝试,它们也都被允许选择公平、非公平的等待线程队列。
事实上,它们都公用一个基类,AbstractQueueSynchronizer(AQS)——和其他很多Synchronizer一样。
AQS是一个用来构建锁和Synchronizer的框架,使用AQS能够简单且高效地构造出应用广泛的大量的Synchronizer。不仅ReentrantLock和Semaphore是构建与AQS 上的,其他的还有CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。
练习题:使用Lock实现一个计数信号量:
@ThreadSafe
public class SemaphoreOnLock{
private final Lock lock = new ReentrantLock();
//条件谓词:permitsAvailale(permits > 0)
private final Condition permitsAvailable = lock.newCondition();
@GuardedBy("lock")
private int permits;
SemaphoreOnLock(int initialPermits){
lock.lock();
try{
permits = initialPermits;
}finally{
lock.unlock();
}
}
//阻塞,直到:permitsAvailable
public void acquire() throws InterruptedException{
lock.lock();
try{
while(permits <= 0){
permitsAvailable.await();
}
-- permits;
}finally{
lock.unlock();
}
}
public void release(){
lock.lock();
try{
++permits;
permitsAvailable.signal();
}finally{
lock.unlock();
}
}
}
12.5AbstractQueuedSynchronizer
大多数开发者可能永远不会直接用到AQS:标准的Synchronizer集涵盖了范围极广的情况。但是了解一些标准的Synchronizer是如何实现的,有助于理解它们的运作机理。
一个**基于AQS的Synchronizer所执行的基本操作**,是一些不同形式的**获取**和**释放**。
**获取**操作是状态依赖的操作,总能够**阻塞**。**释放不是一个可阻塞的操作**,**释放可以允许线程在请求执行前阻塞**。
为了让一个类具有状态依赖性,它必须拥有一些状态。同步类中有一些**状态需要管理**,这项任务落在了**AQS**上。它**管理一个关于状态信息的单一整数**,**状态信息可以通过protected类型的getState、setStatus、和compareAndSetState等方法进行操作**。(当然,Synchronizer也可以自己管理一些额外的状态变量)
例如:ReentrantLock用它来表现拥有它的线程已经请求了多少次锁,Semaphore用它来表现剩余的许可数、FutureTask用它来表现任务的状态。
AQS中获取和释放操作的规范式:
//获取操作分为两步。
//1:Synchronizer判断当前状态是否允许被获得,如果是,就让线程执行,如果不是,获取操作阻塞或失败
//2.可能需要更新的状态,一个想获取Synchronizer的线程会影响到其他线程是否能够获取它。
boolean acquire() throws InterruptedException{
while(state does not permit acquire){//状态不允许被获取
if(blocking acquisition requested){//获得请求阻塞
enqueue current thread if not already queued//使当前线程排队(如果尚未排队)
block current thread//阻塞当前线程
}else{return failure;}//返回失败
}
possibly update synchronization state//允许更新同步状态
dequeue thread if it was queued//如果线程已排队,则将其退出队列
return success;//返回成功
}
void release(){
update synchronization state//更新同步状态
if(new state may permit a blocked thread to acquire){//新状态可能允许阻塞线程获取
unblock one or more queued threads//将一个或多个线程取消阻塞
}
}
支持独占锁获取的Synchronizer应该实现tryAcquire、tryRelease和isHeldExclusively。
支持共享获取的Synchronizer应该实现tryAcquireShared、tryReleaseShared。
12.5.1一个简单的闭锁
//二元闭锁使用AbstractQueuedSynchronizer
@ThreadSafe
public class OneShotLatch{
private final Sync sync = new Sync();
public void signal(){
sync.releaseShared(0);
}
public void await() throws InterruptedException{
sync.acquireSharedInterruptiblly(0);
}
private class Sync extends AbstractQueueSynchronizer{
//必须返回一个值,表面请求操作能否进行
protected int tryAcquireShared(int ignored){
//如果闭锁打开则成功(state == 1),否则失败
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored){
setState(1);//闭锁现在已打开
return true;//现在,其他线程可以获得闭锁
}
}
}
在OneShotLatch中,AQS类型的状态管理者闭锁的状态——关闭(0)或打开(1)。**await方法调用AQS的acquireSharedInterruptibly,后者随后请求OneShotLatch中的tryAcquireShared方法**。
**signal调用releaseShared,后者随后请求OneShotLatch中的tryReleaseShared方法**。
虽然不通过委托,直接扩展AQS也是可以实现OneShotLatch,但是会破坏OneShotLatch接口的简洁性。**java.util.concurrent源码中没有一个Synchronizer是直接扩展AQS的,它们都委托了AQS的私有内部子类**。
12.6java.util.concurrent的Synchronizer类中AQS
java.util.concurrent中很多可阻塞的类,比如:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue、FutureTask。全部都是用AQS构建的。
12.6.1ReentrantLock
ReentrantLock只支持独占的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。
ReentrantLock使用同步状态持有锁获取操作的计数,还维护一个owner变量来持有当前拥有的线程标识符。只有在当前线程刚刚获取到锁,或者刚刚释放了锁的时候,才会修改owner。
示例:
//非公平的ReentrantLock中tryAcquire的实现
protected boolean tryAcquire(int ignored){
final Thread current = Thread.currentThread();
int c = getState();
if(c==0){
//如果锁未被占有,使用compareAndSetState原子地更新状态,表明这个锁现在已经被占有
if(compareAndSetState(0,1)){
owner = current;//标识当前线程
return true;
}
}else if(current == owner){
setState(c+1);//执行获取操作次数的计数
return true;
}
return false;
}
12.6.2Semaphore和CountDownLatch
Semaphore使用AQS类型的同步状态持有当前可用许可的数量。
CountDownLatch使用AQS 的方式与Semaphore相似:同步状态持有当前的计数,countDown方法调用release,后者会导致计数器递减,并且在计数器已经到达零时,解除所有等待线程的阻塞。await调用acquire,如果计数器已经到达零,acquire会立即返回,否则它会被阻塞。
示例:
//Semaphore的tryAcquireShared和tryAcquireSHared方法
protected int tryAcquireShared(int acquires){
while(true){
int available = getState();
//计算剩余的许可,如果还有充足的许可剩余,会使用compareAndSetState原子的更新许可数
int remaining = available - acquires;
if(remaining < 0 || compareAndSetState(available, remaining)){
return remaining;
}
}
protected boolean tryReleaseShared(int releases){
while(true){
int p = getState();
if(compareAndSetState(p, p+releases)){
return true;
}
}
}
}
12.6.3FutureTask
Future.get的语义非常类似于闭锁——如果发生了某些事件(FutureTask表现的任务的完成或取消),线程就可以执行,否则线程会留在队列中,直到有事发生。
FutureTask使用AQS类型的同步状态来持有任务的状态——允许、完成、取消。
FutureTask也维护了一些额外的状态变量,来持有计算的结果或者抛出的异常。
它还维护了一个引用,指向正在运行计算任务的线程,如果任务被取消,就可以中断该线程。
(**FutureTask同样可以作为闭锁,FutureTask的计算是通过Callable实现的,一旦FutureTask进入状态完成,它会永远停止在这个状态上**)
12.6.4ReentrantReadWriteLock
ReadWriteLock的接口要求了两个锁——读者锁和写者锁——但是在基于AQS的ReentrantReadWriteLock**使用一个16位的状态为写锁计数,使用另一个16位的状态为读者锁计数**。
对读者锁的操作使用共享的获取与释放的方法,对写者锁的操作使用独占的获取与释放的方法。
AQS在内部维护一个等待线程的队列,持续追踪一个线程是否被独占请求,或者被共享访问。
在**ReentrantReadWriteLock**中,当锁可用时,如果位于**队列头部的线程**同时也正在准备**写访问**,**线程会得到锁**,如果位于**队列头部的线程**正在准备**读访问**,那么队列中所有**首个写线程之前的线程都会得到锁**。
总结
如果你需要**实现一个依赖于状态的类**——**如果不能满足依赖于状态的前提条件,类的方法必须阻塞**——最佳的策略通常是将它构建于现有的类库之上。(比如Semaphore、BlockingQueue、CountDownLatch)
如果**现有的类库不能提供足够的功能**,你可以使用内部条件队列、显式的Condition对象或者AbstractQueueSynchronizer,来构建属于自己的Synchronizer。
由于“管理状态的独立性”机制必须紧密依赖于"确保状态一致性"机制,**所以内部条件队列与内部锁紧密地绑定到了一起**。(显式的Condition与显式的Lock是紧密地绑定到一起的)
(内部条件队列管理状态的独立性,内部锁保证状态的一致性)
显式的**Condition**提供了一个可扩展的特征集:**多等待集每锁、可中断或不可中断的条件等待、公平或非公平的队列、基于最终时限的等待**。
