4.构建块
4.1同步容器
Vector、Hashtable、Collections.synchronizedXxx工厂。这些类通过封装它们的状态,并对每一个公共方法进行同步而实现了线程安全。
4.1.1同步容器中出现的问题
虽然同步容器都是线程安全的,但是对于复合操作,我们还是需要使用额外的客户端加锁进行保护。
示例:
//这是错误的,存在线程安全问题,有“检查再运行”错误public static Object getLast(Vector list){int lastIndex = list.size() - 1;return list.get(lastIndex);}public static void deleteLast(Vector list){int lastIndex = list.size() - 1;list.remove(lastIndex);}//这是安全的,加了额外的锁进行保护public static Object getLast(Vector list){synchronized (list) {int lastIndex = list.size() - 1;return list.get(lastIndex);}}public static void deleteLast(Vector list){synchronized (list) {int lastIndex = list.size() - 1;list.remove(lastIndex);}}
4.1.2迭代器和ConcurrentModificationException
在设计同步容器返回的迭代器时,并没有考虑到并发修改的问题。迭代器是“即时失败”的——意思是当它们察觉容器在迭代开始后被修改,会抛出一个未检查的ConcurrentModificationException。在迭代期间,对容器加锁的一个代替方法是复制容器。应为复制是线程限制的,没有其它线程能够在迭代期间对其进行修改。
4.1.3隐藏迭代器
//字符串的拼接操作经过编译转换成调用StringBuilder.append(Object)来完成,它会调用容器的toString方法。标准容器中的toString的实现会通过迭代容器中的每个元素,来获取容器内容。public class HiddenIterator {@GuradedBy("this")private final Set<Integer> set = new HashSet<Integer>();public synchronized void add(Integer i) {set.add(i);}public synchronized void remove(Integer i) {set.remove(i);}public void addTenThings(){Random random = new Random();for (int i = 0 ; i< 10 ; i++){add(random.nextInt());}//不要这样做System.out.printf("DEVUG: added ten elements to" + set);}}
正如封装一个对象的状态,能够使它更加容易地保持不变约束一样,封装它的同步则可以迫使它符合同步策略。容器的hashCode和equals方法也会间接地调用迭代。
4.2并发容器
用来改进同步容器。因为同步容器保证了线程的安全,但是却消弱了并发。
用并发容器替换同步容器,这种作法以有很小风险带来了可扩展性显著的提高。
4.2.1ConcurrentHashMap
ConcurrentHashMap和HashMap一样是一个哈希表,但是它使用完全不同的**锁策略**。ConcurrentHashMap使用了一个更加细化的锁机制,名叫分离锁。
ConcurrentHashMap允许**任意数量的读线程并发访问**Map,**读者和写者也可以并发访问**Map,并且**有限数量的写线程还可以并发修改**Map。还提供了不会抛出ConcurrentModificationException的迭代器。
ComcurrentHashMap返回的迭代器具有**弱一致性**,而非“及时失败”的。弱一致性的迭代器允许并发修改,当迭代器被创建时,它会遍历已有的元素,并且**可以(但是不保证)感应到在迭代器被创建后,对容器的修改**。
大多数情况下用 ConcurrentHashMap 取代同步Map只会带来更好的可伸缩性。
只有当你的程序需要在独占访问中加锁时,ConcurrentHashMap才无法胜任。
public interface ConcurrentMap<K,V> extends Map<K,V>{
//只有当没有找到匹配K的值时才插入
V putIfAbsent(K key, V value);
//只有当k与v匹配时才移除
boolean remove(K key, V value);
//只有当K与oldValue匹配时才取代
boolean replace(K key, V oldValue, V newvalue);
//只有当k匹配时才取代。
V replace(K key, V newValue);
}
4.2.2 Map附加的原子操作
一些常见的符合操作,比如"缺少即加入",“相等便移除”和“相等便替换”都已被实现为原子操作,并且这些操作已在ConcurrentMap接口中声明。
4.2.3CopyOnWriteArrayList
CopyOnWriteArrayList是同步List的一个并发替代品,提供了更好的并发性,并避免了在迭代期间对容器加锁和复制。(CopyOnWriteArraySet是同步Set的一个并发替代品)。
CopyOnWrite(写入时复制)。
顾名思义:在每次需要**修改时**,它们会**创建并重新发布一个新的容器拷贝**,以此来实现可变性。CopyOnWrite 容器的迭代器**保留一个底层基础数组的引用**。这个数组作为起点,永远不会被改变。因此,多个线程可以对这个容器进行迭代。
当对容器迭代操作的频率远远高于对容器修改的频率时,使用"CopyOnWrite"容器是个合理的选择。
//例子:提交一个通知需要迭代已注册的监听器,并调用其中一个。多数情况下,注册和注销一个事件监听器的次数要比收到事件通知的次数少很多。
4.3阻塞队列和生产者-消费者模式
阻塞队列:**提供了可阻塞的put和take方法**。
生产者消费者模式:生产者将数据放入队列,消费者从队列中获取数据。
生产者不需要直到消费者的身份或数量,消费者也不需要知道生产者是谁。
最常见的生产者-消费者设计:线程池与工作队列相结合。
阻塞队列简化了消费者的编码,因为**take会保持阻塞直到可用数据出现**。
同样也简化了生产者的编码,因为**put会使队列在充满的时候阻塞**,不再生成更多的工作。
阻塞队列提供了一个offer方法:**如果条目不能被加入到队列里,它会返回一个失败状态**。
有界队列是强大的资源管理工具,用来建立可靠的应用程序:它们可以遏制那些产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。
如果阻塞队列并不完全适合你的设计,也可以用**信号量**创建其他的阻塞数据结构。
类库中包含一些BolckingQueue的实现:
- LinkedBlockingQueue和ArrayBLockingQueue是FIFO(先进先出)队列,与LinkedList和ArrayList相似,但却拥有比同步List更好的并发性能
- PriorityBlockingQueue是一个按优先级顺序排序的队列。它可以比较元素本身的自然顺序,也可以使用一个Comparator进行排序。
- SynchronousQueue,它本质上不是一个真正的队列,因为它不会为队列元素维护任何的存储空间。它会非常直接的移交整个工作。因为SynchronousQueue没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻塞。
4.3.1连续的线程限制
对于**可变对象**,生产者-消费者设计和阻塞队列一起,为生产者和消费者之间移交对象所有权提供了**连续的线程限制**。
连续的线程限制:一个线程约束的**对象完全由单一线程所有**,但是**所有权可以通过安全的发布被“转移”**。并且移交之后**原线程不能再访问它**。
示例:对象池,只要对象池中含有充足的内部同步,使对象能安全地发布,并且客户端本身不会发布对象池,或者在返回对象池后不再继续使用,所有权就可以在线程间安全地传递。
4.3.2双端队列和窃取工作(Deque)
**Deque和BolckingDeque分别扩展了Queue和BlockingQueue**。
Deque是一个双端队列,允许高效地在头和尾分别进行插入和移除。
实现它们的是ArrayDeque和LinkedBlockingDeque。
窃取工作:
在窃取工作的设计中,**每一个消费者都有一个自己的双端队列**。如果一个消费者完成了自己双端队列中的全部工作,它可以**偷取其他消费者的双端队列中的末尾任务**。
4.4阻塞和可中断的方法
线程可能会因为集中原因被阻塞或暂停:
等待I/O操作结束、等待获得一个锁、等待从Thread.sleep中唤醒、等待另一个线程的计算结果。(此时的线程状态:BLOCKED、WAITING或TIMED_WAITING)
可阻塞线程一般都会抛出**InterruptedException异常**。当一个方法抛出此异常时,是在告诉你,这个方法是一个**可阻塞的**。
**Thread提供了interrupt方法,用来中断一个线程**,或者查询某线程是否已经被中断。
**中断**:是一种**协作机制**。一个线程**不能迫使其他线程停止**正在做的事情,或者去做其他事情。当线程A中断B时,**A仅仅是要求B在达成某个方便停止的关键点时,停止正在做的事情**。
**如何响应中断**?
- 传递:把InterruptedException传递给调用者。这可能包括不捕获异常,也可能是先捕获,然后对其中特定活动进行简洁处理后,再抛出。
- 恢复中断:有时候你不能抛出异常,此时必须捕获,并且,在当前线程中通过调用interrupt从中断中恢复。这样调用栈中更高层的代码可以发现中断已经发生。
示例:
public class TaskRunnable implements Runnable{
BlockingQueue<Task> queue;
......
public void run(){
try{
processTask(queue.take());
}catch(InterruptedException e){
//恢复中断状态
Thread.currentThread().interrupt();
}
}
}
4.5Synchronizer
Synchronizer是一个**对象**,他**根据本身的状态调节线程的控制流**。
**阻塞队列**可以扮演一个Synchronizer的角色,其他类型的Synchronizer包括**信号量**、**关卡**、以及**闭锁**。
所有Synchronizer都有类似的结构特性:它们**封装操控状态的方法**,以及高效地等待Synchronizer**进入到期望状态的方法**。
4.5.1闭锁(CountDownLatch)
闭锁是一种Synchronizer,它可以**延迟线程进度直到线程到达终止状态**。
在终点状态到来之前,是不允许任何线程通过的,直到终点状态到来,允许所有线程通过。并且,**一旦闭锁到达了终点状态,它就不能够再被改变状态了**。
闭锁的应用场景:
- 确保一个计算不会执行,直到它需要的资源被初始化。
- 确保一个服务不会开始,直到它依赖的其他服务都已经开始。
- 等待,直到活动的所有部分都为继续处理做好充分准备。(游戏中玩家是否就绪)
CountDownLatch是一个灵活的闭锁实现:
- 闭锁的状态包括一个计数器,初始化为一个正数,用来表现需要等待的事件数。
- countDown方法对计数器做减操作,表示一个事件已经发生
- await方法等待计数器达到0,此时所有需要等待的事件都已发生。
示例:
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end = start;
}
}
4.5.2FutureTask
FutureTask同样可以作为**闭锁**。(**FutureTask的计算是通过Callable实现的。**)一旦FutureTask进入完成状态,它会永远停止在这个状态上。
**FutureTask.get**的行为依赖于任务的状态,如果已完成,get可以立刻得到返回结果,否则会被阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
示例:
public class Preloader {
private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {
@Override
public ProductInfo call() throws Exception {
return loadProductInfo();
}
}
);
private final Thread thread = new Thread(future);
public void start(){thread.start();}
public ProductInfo get() throws InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw launderThrowable(cause);
}
}
}
4.5.3信号量(Semaphore)
**计数信号量**用来**控制**能够**同时访问某特定资源的活动的数量**,或者**同时执行某一给定操作的数量**。
一个**Semaphore**管理一个有效的**许可集**:许可的初始量通过构造函数传递给Semaphore。活动能够**获得许可**,并在使用之后**释放许可**。如果已经**没有可用的许可**,那么acquire会被阻塞。
release方法向信号量返回一个许可。
**二元信号量**:**一个计数初始值为1的Semaphore**。(**互斥**锁)
应用场景:
- 数据库连接池。(以池的大小初始化一个Semaphore,在从池中获取资源时,先调用acquire方法获取一个许可,调用release把许可放回资源池)
- 可以用Semaphore把任何容器转化为有界的阻塞容器。
示例:
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
this.sem = new Semaphore(bound);
}
//成功添加元素后,减一个计数
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}
//成功删除一个元素后,加一个计数
public boolean remove(Object o){
boolean remove = set.remove(o);
if (remove){
sem.release();
}
return remove;
}
}
4.5.4关卡(CyclicBarrier)
关卡**类似于闭锁**,它们都能够阻塞一组线程,直到某些事件发生。
关卡与闭锁的不同点在于:
- 所有线程必须同时到达关卡点,才能继续处理。
- 闭锁等待的是事件,关卡等待的是其他线程。
CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点。
当线程到达关卡点时,调用await,await会被阻塞,直到所有线程都到达关卡点。
如果所有线程都到达了关卡点,关卡就会被突破,所有线程被释放,关卡重置。
如果对await的调用超时,或者阻塞中的线程被中断,那么关卡就会被认为是失败的,所有对await未完成的调用都通过BrokenBarrierException终止。
如果成功通过关卡,await为每一个线程返回一个唯一的到达索引号,可以用它来“选举”产生一个领导,在下一次迭代中承担一些特殊任务。
CycliBarrier也允许向构造函数传递一个关卡行为:一个Runnable,当成功通过关卡时,会在一个子任务线程中执行。
4.6为计算结果建立高效、可伸缩的高速缓存
保存计算结果,避免重复计算。可以提高吞吐量,缩短等待事件,但是会占用一定的内存。
方案一:
最简单的方案:HashMap(要注意线程安全问题,需要做额外的同步)
public interface Computable<A, V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String, BigInteger> {
public BigInteger compute(String arg) {
//一些计算
return new BigInteger(arg);
}
}
public class Memoizer1<A, V> implements Computable<A, V> {
@GuardedBy("this")
private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return null;
}
}
方案二:
ConcurrentHashMap(因为它是线程安全的,所以不需要额外的同步,对比方案一稍有进步,但是,当两个线程同时使用时,可能会出现重复计算)
public class Memoizer1<A, V> implements Computable<A, V> {
@GuardedBy("this")
private final Map<A, V> cache = new ConcurrentHashMap<A,V>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return null;
}
}
方案三:
重新定义可存储的Map,用ConcurrentHashMap取代了ConcurrentHashMap
这个方案近乎完美,展现了非常号的并发性(来自于ConcurrentHashMap),如果有新到的线程请求的是其他线程正在计算的结果,它也会等待。(仅仅存在一个缺陷,两个线程可能同时计算相同的值)
@Override
public V compute(A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(eval);
f = ft;
cache.put(arg, ft);
//调用c.compute()计算发生在这里
ft.run();
}
try {
//如果集合里有对应的值,直接取出
return f.get();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
方案四:
在方案三的基础上,利用ConcurrentMap中原子化的putIfAbsent方法,消除了同时计算的隐患。
同时,在计算取消或是失败时,也要把Future从缓存中移除。
通过FutureTask的一个子类,还可以设置缓存的过期时间。(删除旧缓存,给新的腾出空间)
@Override
public V compute(A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(eval);
//如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
总结
- 所有并发问题都归结为如何协调访问并发状态,可变状态越少,保证线程安全就越容易。
- 尽量将域声明为final类型。
- 封装使管理复杂度变得更可行。
- 用锁来守护每一个可变变量。
- 对同一不变约束中的所有变量都使用同样的锁。
- 在运行符合操作期间持有锁。
- 在设计过程中就考虑线程安全,或者在文档中明确地说明它不是线程安全的。
