同步容器类
Vector, HashTable, Collections.synchronizedXxx等工厂方法创建的同步的封装器类
并发容器
- ConcurrentHashMap
CopyOnWriteArrayList
查询的时候,都不需要加锁,随便访问,只有在更新的时候,才会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。修改操作的同时,读操作不会被阻塞,而是继续读取旧的数据。
Queue, 临时保存一组等待处理的元素, 操作不会阻塞, 如果队列为空, 则会获取到空值
- ConcurrentLinkQueue, 先进先出对列
- PriorityQueue, 非并发的优先队列
- BlockingQueue, 拓展了Queue, 如果队列为空, 则获取元素的操作将一直阻塞, 直到队列中出现可用的元素, 如果队列已满, 那么插入元素的操作会一直阻塞, 直到队列中出现可用的空间
- LinkedBlockingQueue, ArrayBlockingQueue, 两者都是FIFO队列
- PriorityBlockingQueue, 按优先级排序的队列
- SynchronousQueue, 并不为队列中的元素维护存储空间, 它维护一组线程, 这些线程在等待着将元素加入或移出队列, 相当于没有入列出列, 直接将生产者任务交付给了消费者
- ConcurrentSkipListMap, 同步的sortedMap
- ConcurrentSkipListSet, 同步的sortedSet
阻塞队列与生产者消费者模式
用阻塞队列实现生产者消费者模型示例
串行线程封闭
- 简单说就是线程封闭对象只能由单个线程拥有, 生产者-消费者这种设计与阻塞队列一起, 安全的交付了对象所有权, 对象由封闭在生产者线程转移至封闭在消费者线程, 是线程安全的
- 对象池也利用了串行线程封闭, 将对象租借给一个请求线程
双端队列与工作密取
- Deque和BlockingDeque是对Queue和BlockingQueue的拓展
- Deque是一个双端队列, 实现了在队列头以及队列尾的高效插入和移除, 具体实现包括ArrayDeque和LinkedBlockingDeque
- 双端队列适用于工作密取模式(Working Stealing)
工作密取模式, 每个消费者都有各自的双端队列, 如果一个消费者完成了自己队列的全部工作, 可以从其他消费者队列末尾秘密的获取工作
- 避免多个消费者在同一个共享队列上发生竞争
- 当访问其他消费者队列时, 从队列尾部获取工作, 进一步减少竞争
工作密取模式, 适用于既是消费者也是生产者问题-当执行某个工作时可能导致出现更多的工作. 当双端队列为空时, 它会在另一个线程的队列末尾查找新的任务, 从而确保每个线程都保持忙碌状态
阻塞方法与中断方法
BlockingQueue的put和take等方法会抛出InterruptedException的受检异常, 表示该方法是阻塞方法
每个线程都有一个布尔类型的属性, 表示线程的中断状态, Thread的interrupt方法用于中断线程或者查询线程是否已被中断. 中断是一种协作机制, 最常使用中断的情况就是取消某个操作
对于InterruptedException, 有两种处理方式:
- 多数情况下, 直接往上传递, 或者执行简单的清理工作后再抛出
- 少数不能抛异常的情况, 比如Runnable中, 必须捕获, 然后再调用interrupt方法恢复中断状态
同步工具类
阻塞队列, 信号量(Semaphore), 栅栏(Barrier), 闭锁(Latch)
闭锁
CountDownLatch
CountDownLatch是一种灵活的闭锁实现, 它包含一个初始化为正数的计数器, 表示需要等待的事件数量, 通过countDown方法递减, 如果计数器值非0, await方法会一直阻塞或者等待中的线程中断或等待超时
如下, 可以创建一定数量的线程并且同时启动并发的执行任务, 可以测试n个线程并发执行某个任务时所需的时间
FutureTask
FutureTask也可以用作闭锁.
FutureTask实现了Future语义, 表示一种抽象的可生成结果的Runnable, 并且可以处于以下3种状态: 等待运行, 正在运行和运行完成
信号量
- 计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量, 或者同时执行某个指定操作的数量, 计数信号量还可以用来实现某种资源池, 或者对容器施加边界
- Semaphore中管理着一组虚拟的许可(permit), 许可的初始数量可通过构造函数来指定, 在执行操作时可以首先获得许可, 并在使用后释放许可, 若没有许可, 那么acquire方法将阻塞(直到线程中断或超时), release方法将返回一个许可给信号量.
- 计算信号量的一种简化形式是二值信号量, 即初始值为1的Semaphore, 二值信号量可以用作互斥体, 具备不可重入的加锁含义
栅栏
- 栅栏能阻塞一组线程直到某个事件发生, 栅栏与闭锁的关键区别在于, 所有线程必须同时到达栅栏位置, 才能继续执行, 闭锁用于等待事件, 而栅栏用于等待其他线程
- 线程到达栅栏时将调用await方法, 这个方法将阻塞直到所有的线程都到达栅栏位置, 然后所有线程被释放, 栅栏也被重置.
- 如果对await方法调用超时或者线程被中断, 那么所有阻塞await调用都将被终止, 且抛出BrokenBarrierException.
- 如果成功通过栅栏, 那么await方法将为每个线程返回一个唯一的到达索引号, 可以利用这些索引来选举产生一个新的领导线程, 并在下一次迭代中由该领导线程执行一些特殊的工作
- CyclicBarrier还可以在构造函数中传入一个Runnable, 当成功通过栅栏时会执行它
- 栅栏在并行迭代算法中非常有用(归并)
构建高效且可伸缩的结果缓存
原:
改后:
- concurrentHashMap线程安全性能更佳, 且使用了putIfAbsent原子方法
- 使用FutureTask来包装结果, 避免存在两个线程同时计算的情况
FutureTask表示一个计算的过程, 这个过程可能已经计算完成, 也可能正在进行, 如果有结果可用, 那么FutureTask.get将立即返回结果, 否则会一直阻塞, 知道结果计算出来