4.构建块

4.1同步容器

  1. VectorHashtableCollections.synchronizedXxx工厂。
  2. 这些类通过封装它们的状态,并对每一个公共方法进行同步而实现了线程安全。

4.1.1同步容器中出现的问题

  1. 虽然同步容器都是线程安全的,但是对于复合操作,我们还是需要使用额外的客户端加锁进行保护。

示例:

  1. //这是错误的,存在线程安全问题,有“检查再运行”错误
  2. public static Object getLast(Vector list){
  3. int lastIndex = list.size() - 1;
  4. return list.get(lastIndex);
  5. }
  6. public static void deleteLast(Vector list){
  7. int lastIndex = list.size() - 1;
  8. list.remove(lastIndex);
  9. }
  10. //这是安全的,加了额外的锁进行保护
  11. public static Object getLast(Vector list){
  12. synchronized (list) {
  13. int lastIndex = list.size() - 1;
  14. return list.get(lastIndex);
  15. }
  16. }
  17. public static void deleteLast(Vector list){
  18. synchronized (list) {
  19. int lastIndex = list.size() - 1;
  20. list.remove(lastIndex);
  21. }
  22. }

4.1.2迭代器和ConcurrentModificationException

  1. 在设计同步容器返回的迭代器时,并没有考虑到并发修改的问题。
  2. 迭代器是“即时失败”的——意思是当它们察觉容器在迭代开始后被修改,会抛出一个未检查的ConcurrentModificationException
  3. 在迭代期间,对容器加锁的一个代替方法是复制容器。应为复制是线程限制的,没有其它线程能够在迭代期间对其进行修改。

4.1.3隐藏迭代器

  1. //字符串的拼接操作经过编译转换成调用StringBuilder.append(Object)来完成,它会调用容器的toString方法。标准容器中的toString的实现会通过迭代容器中的每个元素,来获取容器内容。
  2. public class HiddenIterator {
  3. @GuradedBy("this")
  4. private final Set<Integer> set = new HashSet<Integer>();
  5. public synchronized void add(Integer i) {set.add(i);}
  6. public synchronized void remove(Integer i) {set.remove(i);}
  7. public void addTenThings(){
  8. Random random = new Random();
  9. for (int i = 0 ; i< 10 ; i++){
  10. add(random.nextInt());
  11. }
  12. //不要这样做
  13. System.out.printf("DEVUG: added ten elements to" + set);
  14. }
  15. }
  1. 正如封装一个对象的状态,能够使它更加容易地保持不变约束一样,封装它的同步则可以迫使它符合同步策略。
  2. 容器的hashCodeequals方法也会间接地调用迭代。

4.2并发容器

用来改进同步容器。因为同步容器保证了线程的安全,但是却消弱了并发。

用并发容器替换同步容器,这种作法以有很小风险带来了可扩展性显著的提高。

4.2.1ConcurrentHashMap

ConcurrentHashMap和HashMap一样是一个哈希表,但是它使用完全不同的**锁策略**。ConcurrentHashMap使用了一个更加细化的锁机制,名叫分离锁。

ConcurrentHashMap允许**任意数量的读线程并发访问**Map,**读者和写者也可以并发访问**Map,并且**有限数量的写线程还可以并发修改**Map。还提供了不会抛出ConcurrentModificationException的迭代器。

ComcurrentHashMap返回的迭代器具有**弱一致性**,而非“及时失败”的。弱一致性的迭代器允许并发修改,当迭代器被创建时,它会遍历已有的元素,并且**可以(但是不保证)感应到在迭代器被创建后,对容器的修改**。
大多数情况下用 ConcurrentHashMap 取代同步Map只会带来更好的可伸缩性。
只有当你的程序需要在独占访问中加锁时,ConcurrentHashMap才无法胜任。
public interface ConcurrentMap<K,V> extends Map<K,V>{
    //只有当没有找到匹配K的值时才插入
    V putIfAbsent(K key, V value);

    //只有当k与v匹配时才移除
    boolean remove(K key, V value);

    //只有当K与oldValue匹配时才取代
    boolean replace(K key, V oldValue, V newvalue);

    //只有当k匹配时才取代。
    V replace(K key, V newValue);
}

4.2.2 Map附加的原子操作

一些常见的符合操作,比如"缺少即加入",“相等便移除”和“相等便替换”都已被实现为原子操作,并且这些操作已在ConcurrentMap接口中声明。

4.2.3CopyOnWriteArrayList

CopyOnWriteArrayList是同步List的一个并发替代品,提供了更好的并发性,并避免了在迭代期间对容器加锁和复制。(CopyOnWriteArraySet是同步Set的一个并发替代品)。

CopyOnWrite(写入时复制)。

顾名思义:在每次需要**修改时**,它们会**创建并重新发布一个新的容器拷贝**,以此来实现可变性。CopyOnWrite 容器的迭代器**保留一个底层基础数组的引用**。这个数组作为起点,永远不会被改变。因此,多个线程可以对这个容器进行迭代。
当对容器迭代操作的频率远远高于对容器修改的频率时,使用"CopyOnWrite"容器是个合理的选择。

//例子:提交一个通知需要迭代已注册的监听器,并调用其中一个。多数情况下,注册和注销一个事件监听器的次数要比收到事件通知的次数少很多。

4.3阻塞队列和生产者-消费者模式

阻塞队列:**提供了可阻塞的put和take方法**。

生产者消费者模式:生产者将数据放入队列,消费者从队列中获取数据。

生产者不需要直到消费者的身份或数量,消费者也不需要知道生产者是谁。

最常见的生产者-消费者设计:线程池与工作队列相结合。

阻塞队列简化了消费者的编码,因为**take会保持阻塞直到可用数据出现**。

同样也简化了生产者的编码,因为**put会使队列在充满的时候阻塞**,不再生成更多的工作。

阻塞队列提供了一个offer方法:**如果条目不能被加入到队列里,它会返回一个失败状态**。
有界队列是强大的资源管理工具,用来建立可靠的应用程序:它们可以遏制那些产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。
如果阻塞队列并不完全适合你的设计,也可以用**信号量**创建其他的阻塞数据结构。

类库中包含一些BolckingQueue的实现:

  • LinkedBlockingQueueArrayBLockingQueue是FIFO(先进先出)队列,与LinkedList和ArrayList相似,但却拥有比同步List更好的并发性能
  • PriorityBlockingQueue是一个按优先级顺序排序的队列。它可以比较元素本身的自然顺序,也可以使用一个Comparator进行排序。
  • SynchronousQueue,它本质上不是一个真正的队列,因为它不会为队列元素维护任何的存储空间。它会非常直接的移交整个工作。因为SynchronousQueue没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻塞。

4.3.1连续的线程限制

对于**可变对象**,生产者-消费者设计和阻塞队列一起,为生产者和消费者之间移交对象所有权提供了**连续的线程限制**。

连续的线程限制:一个线程约束的**对象完全由单一线程所有**,但是**所有权可以通过安全的发布被“转移”**。并且移交之后**原线程不能再访问它**。

示例:对象池,只要对象池中含有充足的内部同步,使对象能安全地发布,并且客户端本身不会发布对象池,或者在返回对象池后不再继续使用,所有权就可以在线程间安全地传递。

4.3.2双端队列和窃取工作(Deque)

    **Deque和BolckingDeque分别扩展了Queue和BlockingQueue**。

Deque是一个双端队列,允许高效地在头和尾分别进行插入和移除

实现它们的是ArrayDequeLinkedBlockingDeque

窃取工作:

在窃取工作的设计中,**每一个消费者都有一个自己的双端队列**。如果一个消费者完成了自己双端队列中的全部工作,它可以**偷取其他消费者的双端队列中的末尾任务**。

4.4阻塞和可中断的方法

线程可能会因为集中原因被阻塞或暂停:

等待I/O操作结束、等待获得一个锁、等待从Thread.sleep中唤醒、等待另一个线程的计算结果。(此时的线程状态:BLOCKED、WAITING或TIMED_WAITING)

可阻塞线程一般都会抛出**InterruptedException异常**。当一个方法抛出此异常时,是在告诉你,这个方法是一个**可阻塞的**。

**Thread提供了interrupt方法,用来中断一个线程**,或者查询某线程是否已经被中断。

**中断**:是一种**协作机制**。一个线程**不能迫使其他线程停止**正在做的事情,或者去做其他事情。当线程A中断B时,**A仅仅是要求B在达成某个方便停止的关键点时,停止正在做的事情**。

**如何响应中断**?
  • 传递:把InterruptedException传递给调用者。这可能包括不捕获异常,也可能是先捕获,然后对其中特定活动进行简洁处理后,再抛出
  • 恢复中断:有时候你不能抛出异常,此时必须捕获,并且,在当前线程中通过调用interrupt从中断中恢复。这样调用栈中更高层的代码可以发现中断已经发生

示例:

public class TaskRunnable implements Runnable{
    BlockingQueue<Task> queue;
    ......
    public void run(){
        try{
            processTask(queue.take());
        }catch(InterruptedException e){
            //恢复中断状态
            Thread.currentThread().interrupt();
        }
    }
}

4.5Synchronizer

Synchronizer是一个**对象**,他**根据本身的状态调节线程的控制流**。

**阻塞队列**可以扮演一个Synchronizer的角色,其他类型的Synchronizer包括**信号量**、**关卡**、以及**闭锁**。

所有Synchronizer都有类似的结构特性:它们**封装操控状态的方法**,以及高效地等待Synchronizer**进入到期望状态的方法**。

4.5.1闭锁(CountDownLatch)

闭锁是一种Synchronizer,它可以**延迟线程进度直到线程到达终止状态**。

在终点状态到来之前,是不允许任何线程通过的,直到终点状态到来,允许所有线程通过。并且,**一旦闭锁到达了终点状态,它就不能够再被改变状态了**。

闭锁的应用场景

  • 确保一个计算不会执行,直到它需要的资源被初始化。
  • 确保一个服务不会开始,直到它依赖的其他服务都已经开始。
  • 等待,直到活动的所有部分都为继续处理做好充分准备。(游戏中玩家是否就绪)

CountDownLatch是一个灵活的闭锁实现:

  • 闭锁的状态包括一个计数器,初始化为一个正数,用来表现需要等待的事件数。
  • countDown方法对计数器做减操作,表示一个事件已经发生
  • await方法等待计数器达到0,此时所有需要等待的事件都已发生。

示例:

public class TestHarness {

    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end = start;
    }
}

4.5.2FutureTask

FutureTask同样可以作为**闭锁**。(**FutureTask的计算是通过Callable实现的。**)一旦FutureTask进入完成状态,它会永远停止在这个状态上。

**FutureTask.get**的行为依赖于任务的状态,如果已完成,get可以立刻得到返回结果,否则会被阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

示例:

public class Preloader {
    private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
            new Callable<ProductInfo>() {
                @Override
                public ProductInfo call() throws Exception {
                    return loadProductInfo();
                }
            }
    );

    private final Thread thread = new Thread(future);

    public void start(){thread.start();}

    public ProductInfo get() throws InterruptedException {
        try {
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException)
                throw (DataLoadException) cause;
            else
                throw launderThrowable(cause);
        }
    }
}

4.5.3信号量(Semaphore)

**计数信号量**用来**控制**能够**同时访问某特定资源的活动的数量**,或者**同时执行某一给定操作的数量**。

一个**Semaphore**管理一个有效的**许可集**:许可的初始量通过构造函数传递给Semaphore。活动能够**获得许可**,并在使用之后**释放许可**。如果已经**没有可用的许可**,那么acquire会被阻塞。

release方法向信号量返回一个许可。

**二元信号量**:**一个计数初始值为1的Semaphore**。(**互斥**锁)

应用场景

  • 数据库连接池。(以池的大小初始化一个Semaphore,在从池中获取资源时,先调用acquire方法获取一个许可,调用release把许可放回资源池)
  • 可以用Semaphore把任何容器转化为有界的阻塞容器。

示例:

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        this.sem = new Semaphore(bound);
    }
    //成功添加元素后,减一个计数
    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                sem.release();
            }
        }
    }
    //成功删除一个元素后,加一个计数
    public boolean remove(Object o){
        boolean remove = set.remove(o);
        if (remove){
            sem.release();
        }
        return remove;
    }
}

4.5.4关卡(CyclicBarrier)

关卡**类似于闭锁**,它们都能够阻塞一组线程,直到某些事件发生。

关卡与闭锁的不同点在于:

  • 所有线程必须同时到达关卡点,才能继续处理
  • 闭锁等待的是事件,关卡等待的是其他线程

CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点

线程到达关卡点时,调用await,await会被阻塞,直到所有线程都到达关卡点。

如果所有线程都到达了关卡点,关卡就会被突破,所有线程被释放关卡重置

如果对await的调用超时,或者阻塞中的线程被中断,那么关卡就会被认为是失败的,所有对await未完成的调用都通过BrokenBarrierException终止

如果成功通过关卡,await为每一个线程返回一个唯一的到达索引号,可以用它来“选举”产生一个领导,在下一次迭代中承担一些特殊任务。

CycliBarrier也允许向构造函数传递一个关卡行为:一个Runnable,当成功通过关卡时,会在一个子任务线程中执行

4.6为计算结果建立高效、可伸缩的高速缓存

保存计算结果,避免重复计算。可以提高吞吐量,缩短等待事件,但是会占用一定的内存。

方案一

最简单的方案:HashMap(要注意线程安全问题,需要做额外的同步)

public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}

public class ExpensiveFunction implements Computable<String, BigInteger> {
    public BigInteger compute(String arg) {
        //一些计算
        return new BigInteger(arg);
    }
}

public class Memoizer1<A, V> implements Computable<A, V> {

    @GuardedBy("this")
    private final Map<A, V> cache = new HashMap<A, V>();
    private final Computable<A, V> c;

    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return null;
    }
}

方案二

ConcurrentHashMap(因为它是线程安全的,所以不需要额外的同步,对比方案一稍有进步,但是,当两个线程同时使用时,可能会出现重复计算)

public class Memoizer1<A, V> implements Computable<A, V> {

    @GuardedBy("this")
    private final Map<A, V> cache = new ConcurrentHashMap<A,V>();
    private final Computable<A, V> c;

    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return null;
    }
}

方案三:

重新定义可存储的Map,用ConcurrentHashMap取代了ConcurrentHashMap

这个方案近乎完美,展现了非常号的并发性(来自于ConcurrentHashMap),如果有新到的线程请求的是其他线程正在计算的结果,它也会等待。(仅仅存在一个缺陷,两个线程可能同时计算相同的值)

    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<>(eval);
            f = ft;
            cache.put(arg, ft);
            //调用c.compute()计算发生在这里
            ft.run();
        }
        try {
            //如果集合里有对应的值,直接取出
            return f.get();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }

方案四:

在方案三的基础上,利用ConcurrentMap中原子化的putIfAbsent方法,消除了同时计算的隐患。

同时,在计算取消或是失败时,也要把Future从缓存中移除。

通过FutureTask的一个子类,还可以设置缓存的过期时间。(删除旧缓存,给新的腾出空间)

    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<>(eval);
            //如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
            f = cache.putIfAbsent(arg, ft);
            if (f == null) {
                f = ft;
                ft.run();
            }
        }
        try {
            return f.get();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }

总结

  • 所有并发问题都归结为如何协调访问并发状态,可变状态越少,保证线程安全就越容易。
  • 尽量将域声明为final类型。
  • 封装使管理复杂度变得更可行。
  • 用锁来守护每一个可变变量。
  • 对同一不变约束中的所有变量都使用同样的锁。
  • 在运行符合操作期间持有锁。
  • 在设计过程中就考虑线程安全,或者在文档中明确地说明它不是线程安全的。