阻塞队列

并发容器 - 图1

BlockingQueue

在所有的并发容器中,BlockingQueue是最常见的一种。BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者

  • 第一组:add、remove、element
    • add 方法是往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满
    • remove 方法的作用是删除元素,如果我们删除的队列是空的,由于里面什么都没有,所以也无法删除任何元素,那么 remove 方法就会抛出异常。
    • element 方法是返回队列的头部节点,但是并不删除。和 remove 方法一样,如果我们用这个方法去操作一个空队列,想获取队列的头结点,可是由于队列是空的,我们什么都获取不到,会抛出和前面 remove 方法一样的异常:NoSuchElementException
  • 第二组:offer、poll、peek
    • offer 方法用来插入一个元素,并用返回值来提示插入是否成功。如果添加成功会返回 true,而如果队列已经满了,此时继续调用 offer 方法的话,它不会抛出异常,只会返回一个错误提示:false
    • poll 方法和第一组的 remove 方法是对应的,作用也是移除并返回队列的头节点。但是如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null 作为提示。正因如此,我们是不允许往队列中插入 null 的,否则我们没有办法区分返回的 null 是一个提示还是一个真正的元素
    • peek 方法和第一组的 element 方法是对应的,意思是返回队列的头元素但并不删除。如果队列里面是空的,它便会返回 null 作为提示
    • 第二组还有一些额外值得讲解的内容,offer 和 poll 都有带超时时间的重载方法
      • 通常情况下,这个方法会插入成功并返回 true;如果队列满了导致插入不成功,在调用带超时时间重载方法的 offer 的时候,则会等待指定的超时时间,如果时间到了依然没有插入成功,就会返回 false。
      • poll 和offer类似 ```java offer(E e, long timeout, TimeUnit unit) poll(long timeout, TimeUnit unit)
  1. - 第三组:puttake 容器
  2. - put 方法的作用是插入元素。通常在队列没满的时候是正常的插入,但是如果队列已满就无法继续插入,这时它既不会立刻返回 false 也不会抛出异常,而是让插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。
  3. - take 方法的作用是获取并移除队列的头结点。通常在队列里有数据的时候会正常取出数据并删除;但是如果执行 take 的时候队列里无数据,则阻塞,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据
  4. <a name="IaKRM"></a>
  5. ## 小结
  6. | scene | 方法 | 含义 | 特点 |
  7. | --- | --- | --- | --- |
  8. | 第一组<br />失败则抛出异常<br /><br />线程安全 | add(E) boolean | 添加一个元素,成功返回true | 如果队列满了,抛出IllegalStateException |
  9. | | remove() E | 返回并删除队列的头元素 | 如果队列为空,抛出NoSuchElementException |
  10. | | element() E | 返回队列的头元素 | 同上 |
  11. | 第二组<br />失败不会抛出异常<br />线程安全 | offer(E) boolean | 添加元素,成功返回true | 如果队列满了,返回null |
  12. | | poll() E | 返回并删除队列的头元素 | 如果队列为空,返回null |
  13. | | peek() E | 返回队列的头元素 | 同上 |
  14. | 第三组<br />失败会阻塞<br />线程安全 | put(E) boolean | 添加一个元素 | 如果队列满了,会阻塞 |
  15. | | take() E | 返回队列的头元素 | 如果队列为空,会阻塞 |
  16. <a name="eXOqq"></a>
  17. ## ArrayBlockingQueue
  18. - ArrayBlockingQueue 是最典型的**有界队列**,其内部是用数组存储元素的,利用 ReentrantLock 实现线程安全
  19. - ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。
  20. ```java
  21. ArrayBlockingQueue(int capacity, boolean fair)

LinkedBlockingQueue

  • 如果我们不指定它的初始容量,那么它容量默认就为整型的最大值 Integer.MAX_VALUE,由于这个数非常大,我们通常不可能放入这么多的数据,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限
  • LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是2个指针分开操作的, 所以用了2把锁+2个条件,同时有1个AtomicInteger的原子变量记录count数

    SynchronousQueue

  • SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

    PriorityBlockingQueue

  • 前面我们所说的 ArrayBlockingQueue 和 LinkedBlockingQueue 都是采用先进先出的顺序进行排序,可是如果有的时候我们需要自定义排序怎么办呢?这时就需要使用 PriorityBlockingQueue。

  • PriorityBlockingQueue 是一个支持优先级无界阻塞队列,可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。
  • 它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功,也正因为如此,它的成员变量里只有一个 Condition:

    1. private final Condition notEmpty;
  • 这和之前的 ArrayBlockingQueue 拥有两个 Condition(分别是 notEmpty 和 notFull)形成了鲜明的对比,我们的 PriorityBlockingQueue 不需要 notFull,因为它永远都不会满,真是“有空间就可以任性”

  • 在阻塞的实现方面,和ArrayBlockingQueue的机制相似,主要区别是用数组实现 了一个二叉堆,从而实现按优先级从小到大出队列。另一个区别是没有notFull条件,当元素个数超出数 组长度时,执行扩容操作

    DelayQueue

  • DelayQueue 这个队列比较特殊,具有“延迟”的功能。我们可以设定让队列中的任务延迟多久之后执行,比如 10 秒钟之后执行,这在例如“30 分钟后未付款自动取消订单”等需要延迟执行的场景中被大量使用

  • 它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下: ```java public interface Delayed extends Comparable {

    long getDelay(TimeUnit unit);

} ```

  • 可以看出这个 Delayed 接口继承自 Comparable,里面有一个需要实现的方法,就是 getDelay。这里的 getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,如果返回 0 或者负数则代表任务已过期。
  • 元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期
  • DelayQueue 内部使用了 PriorityQueue 的能力来进行排序,而不是自己从头编写

    BlockingDeque

  • BlockingDeque定义了一个阻塞的双端队列接口

  • 对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而

LinkedBlockingDeque是双向链表

CopyOnWrite

CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观 锁或者乐观锁的方式写回。 那为什么不直接修改,而是要拷贝一份修改呢? 这是为了在“读”的时候不加锁

  • CopyOnWriteArrayList
  • CopyOnWriteArraySet

    • CopyOnWriteArraySet 就是用 Array 实现的一个 Set,保证所有元素都不重复。其内部是封装的一一个CopyOnWriteArrayList。

      ConcurrentLinkedQueue/Deque

  • AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队

  • ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。

    • 首先,它是一个单向链表
    • 其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定后移一个 位置,以保证head指向队列头部,tail指向链表尾部。
    • 但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直 接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作。

      ConcurrentHashMap

      HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。ConcurrentHashMap在这个 基本原理之上进行了各种优化

  • 首先是所有数据都放在一个大的HashMap中;其次是引入了红黑树

  • image.png
  • 如果头节点是Node类型,则尾随它的就是一个普通的链表;如果头节点是TreeNode类型,它的后

面就是一颗红黑树,TreeNode是Node的子类。

  • 这样设计的原因
    • 使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突的问 题由此得到较好的解决。
    • 加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是 Node数组的长度,初始长度为16
    • 并发扩容,这是难度最大的。当一个线程要扩容Node数组的时候,其他线程还要读写,因此 处理过程很复杂
  • 扩容的代码中可以看到,MIN_TREEIFY_CAPACITY=64,意味着当数组的长度没有超过64的时候,树的 每个节点里都是链表,只会扩容,不会转换成红黑树。只有当数组长度大于或等于64时,才考虑把表 转换成红黑树

    ConcurrentSkipListMap/Set

    ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的, 实现了NavigableMap接口,此接口又继承了SortedMap接口。

  • ConcurrentSkipListMap

    • 在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。
      • 目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法
    • 无锁链表
      • image.png
      • AtomicMarkableReference
        • 保证每个 next 是 AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在 ConcurrentSkipListMap的实现中用了另一种办法。
      • Mark节点
        • 以新造一个marker节点,使 节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的 同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里面完成
    • 解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表 叠起来的。
  • ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装