同步容器类

Vector, HashTable, Collections.synchronizedXxx等工厂方法创建的同步的封装器类

并发容器

  • ConcurrentHashMap
  • CopyOnWriteArrayList

    查询的时候,都不需要加锁,随便访问,只有在更新的时候,才会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。修改操作的同时,读操作不会被阻塞,而是继续读取旧的数据。

  • Queue, 临时保存一组等待处理的元素, 操作不会阻塞, 如果队列为空, 则会获取到空值

    • ConcurrentLinkQueue, 先进先出对列
    • PriorityQueue, 非并发的优先队列
  • BlockingQueue, 拓展了Queue, 如果队列为空, 则获取元素的操作将一直阻塞, 直到队列中出现可用的元素, 如果队列已满, 那么插入元素的操作会一直阻塞, 直到队列中出现可用的空间
    • LinkedBlockingQueue, ArrayBlockingQueue, 两者都是FIFO队列
    • PriorityBlockingQueue, 按优先级排序的队列
    • SynchronousQueue, 并不为队列中的元素维护存储空间, 它维护一组线程, 这些线程在等待着将元素加入或移出队列, 相当于没有入列出列, 直接将生产者任务交付给了消费者
  • ConcurrentSkipListMap, 同步的sortedMap
  • ConcurrentSkipListSet, 同步的sortedSet

阻塞队列与生产者消费者模式

用阻塞队列实现生产者消费者模型示例
image.png
image.png

串行线程封闭

  • 简单说就是线程封闭对象只能由单个线程拥有, 生产者-消费者这种设计与阻塞队列一起, 安全的交付了对象所有权, 对象由封闭在生产者线程转移至封闭在消费者线程, 是线程安全的
  • 对象池也利用了串行线程封闭, 将对象租借给一个请求线程

双端队列与工作密取

  • Deque和BlockingDeque是对Queue和BlockingQueue的拓展
  • Deque是一个双端队列, 实现了在队列头以及队列尾的高效插入和移除, 具体实现包括ArrayDeque和LinkedBlockingDeque
  • 双端队列适用于工作密取模式(Working Stealing)

工作密取模式, 每个消费者都有各自的双端队列, 如果一个消费者完成了自己队列的全部工作, 可以从其他消费者队列末尾秘密的获取工作

  • 避免多个消费者在同一个共享队列上发生竞争
  • 当访问其他消费者队列时, 从队列尾部获取工作, 进一步减少竞争

工作密取模式, 适用于既是消费者也是生产者问题-当执行某个工作时可能导致出现更多的工作. 当双端队列为空时, 它会在另一个线程的队列末尾查找新的任务, 从而确保每个线程都保持忙碌状态

阻塞方法与中断方法

BlockingQueue的put和take等方法会抛出InterruptedException的受检异常, 表示该方法是阻塞方法

每个线程都有一个布尔类型的属性, 表示线程的中断状态, Thread的interrupt方法用于中断线程或者查询线程是否已被中断. 中断是一种协作机制, 最常使用中断的情况就是取消某个操作

对于InterruptedException, 有两种处理方式:

  • 多数情况下, 直接往上传递, 或者执行简单的清理工作后再抛出
  • 少数不能抛异常的情况, 比如Runnable中, 必须捕获, 然后再调用interrupt方法恢复中断状态

image.png

同步工具类

阻塞队列, 信号量(Semaphore), 栅栏(Barrier), 闭锁(Latch)

闭锁

CountDownLatch

CountDownLatch是一种灵活的闭锁实现, 它包含一个初始化为正数的计数器, 表示需要等待的事件数量, 通过countDown方法递减, 如果计数器值非0, await方法会一直阻塞或者等待中的线程中断或等待超时

如下, 可以创建一定数量的线程并且同时启动并发的执行任务, 可以测试n个线程并发执行某个任务时所需的时间
image.png

FutureTask

FutureTask也可以用作闭锁.
FutureTask实现了Future语义, 表示一种抽象的可生成结果的Runnable, 并且可以处于以下3种状态: 等待运行, 正在运行和运行完成
image.png

信号量

  • 计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量, 或者同时执行某个指定操作的数量, 计数信号量还可以用来实现某种资源池, 或者对容器施加边界
  • Semaphore中管理着一组虚拟的许可(permit), 许可的初始数量可通过构造函数来指定, 在执行操作时可以首先获得许可, 并在使用后释放许可, 若没有许可, 那么acquire方法将阻塞(直到线程中断或超时), release方法将返回一个许可给信号量.
  • 计算信号量的一种简化形式是二值信号量, 即初始值为1的Semaphore, 二值信号量可以用作互斥体, 具备不可重入的加锁含义

image.png

栅栏

  • 栅栏能阻塞一组线程直到某个事件发生, 栅栏与闭锁的关键区别在于, 所有线程必须同时到达栅栏位置, 才能继续执行, 闭锁用于等待事件, 而栅栏用于等待其他线程
  • 线程到达栅栏时将调用await方法, 这个方法将阻塞直到所有的线程都到达栅栏位置, 然后所有线程被释放, 栅栏也被重置.
    • 如果对await方法调用超时或者线程被中断, 那么所有阻塞await调用都将被终止, 且抛出BrokenBarrierException.
    • 如果成功通过栅栏, 那么await方法将为每个线程返回一个唯一的到达索引号, 可以利用这些索引来选举产生一个新的领导线程, 并在下一次迭代中由该领导线程执行一些特殊的工作
    • CyclicBarrier还可以在构造函数中传入一个Runnable, 当成功通过栅栏时会执行它
  • 栅栏在并行迭代算法中非常有用(归并)

image.png

构建高效且可伸缩的结果缓存

原:
image.png

改后:

  1. concurrentHashMap线程安全性能更佳, 且使用了putIfAbsent原子方法
  2. 使用FutureTask来包装结果, 避免存在两个线程同时计算的情况

FutureTask表示一个计算的过程, 这个过程可能已经计算完成, 也可能正在进行, 如果有结果可用, 那么FutureTask.get将立即返回结果, 否则会一直阻塞, 知道结果计算出来
image.png