前言 JUC 下面的相关源码继续往下阅读,这就看到了非阻塞的无界线程安全队列 —— ConcurrentLinkedQueue,来一起看看吧。

介绍

基于链接节点的无界线程安全队列,对元素FIFO(先进先出)进行排序。 队列的头部是队列中最长时间的元素,队列的尾部是队列中最短时间的元素。 在队列的尾部插入新元素,队列检索操作获取队列头部的元素。
当许多线程共享对公共集合的访问 ConcurrentLinkedQueue 是一个合适的选择。 与大多数其他并发集合实现一样,此类不允许使用null元素。

基本使用

  1. public class ConcurrentLinkedQueueTest {
  2. public static void main(String[] args) {
  3. ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
  4. // 将指定元素插入此队列的尾部。
  5. queue.add("liuzhihang");
  6. // 将指定元素插入此队列的尾部。
  7. queue.offer("liuzhihang");
  8. // 获取但不移除此队列的头,队列为空返回 null。
  9. queue.peek();
  10. // 获取并移除此队列的头,此队列为空返回 null。
  11. queue.poll();
  12. }
  13. }

源码分析

基本结构

ConcurrentLinkedQueue - 图1

参数介绍

  1. private static class Node<E> {
  2. // 节点中的元素
  3. volatile E item;
  4. // 下一个节点
  5. volatile Node<E> next;
  6. Node(E item) {
  7. UNSAFE.putObject(this, itemOffset, item);
  8. }
  9. // CAS 的方式设置节点元素
  10. boolean casItem(E cmp, E val) {
  11. return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  12. }
  13. // 设置下一个节点
  14. void lazySetNext(Node<E> val) {
  15. UNSAFE.putOrderedObject(this, nextOffset, val);
  16. }
  17. // CAS 的方式设置下一个节点
  18. boolean casNext(Node<E> cmp, Node<E> val) {
  19. return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  20. }
  21. // 省略 ……
  22. }

在 ConcurrentLinkedQueue 内部含有一个内部类 Node,如上所示,这个内部类用来标识链表中的一个节点,通过代码可以看出,在 ConcurrentLinkedQueue 中的链表为单向链表

  1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  2. implements Queue<E>, java.io.Serializable {
  3. // 其他省略
  4. // 头结点
  5. private transient volatile Node<E> head;
  6. // 尾节点
  7. private transient volatile Node<E> tail;
  8. }

头尾节点使用 volatile 修饰,保证内存可见性。

构造函数

  1. public ConcurrentLinkedQueue() {
  2. head = tail = new Node<E>(null);
  3. }

当创建对象时,头尾节点都是指向一个空节点。
ConcurrentLinkedQueue - 图2

添加元素

  1. public boolean add(E e) {
  2. return offer(e);
  3. }
  4. public boolean offer(E e) {
  5. // 验证是否为空
  6. checkNotNull(e);
  7. // 创建节点
  8. final Node<E> newNode = new Node<E>(e);
  9. // 循环入队列
  10. // t 是当前尾节点,p 初始为 t
  11. for (Node<E> t = tail, p = t;;) {
  12. // q 为尾节点的下一个节点
  13. Node<E> q = p.next;
  14. if (q == null) {
  15. // 为空,说明后面没有节点,则 CAS 设置尾节点
  16. if (p.casNext(null, newNode)) {
  17. // 此时 p.next 是 newNode
  18. // 如果 p != t 说明有并发
  19. if (p != t)
  20. // 其他线程已经更新了 tail
  21. // q = p.next 所以 q == null 不正确了
  22. // q 取到了 t.next
  23. // 此时将 tail 更新为 新节点
  24. casTail(t, newNode); // Failure is OK.
  25. return true;
  26. }
  27. // Lost CAS race to another thread; re-read next
  28. }
  29. // 多线程情况下, poll ,操作移除元素,可能会导致 p == q
  30. // 此时要重新查找
  31. else if (p == q)
  32. //
  33. p = (t != (t = tail)) ? t : head;
  34. else
  35. // 检查 tail 并更新
  36. p = (p != t && t != (t = tail)) ? t : q;
  37. }
  38. }

画图说明:

  • 单线程情况下:
  1. 当执行到 Node<E> q = p.next; 时,当前情况如图所示:

ConcurrentLinkedQueue - 图3

  1. 判断 q == null,满足条件,此时便会执行 p.casNext(null, newNode) 使用 CAS 设置 p.next。
  2. 设置成功之后,p == t 没有变动,所以程序退出。
  • 多线程情况下:
  1. 当执行到 Node<E> q = p.next; 时,当前情况如图所示:

ConcurrentLinkedQueue - 图4

  1. 多个线程执行 p.casNext(null, newNode) 使用 CAS 设置 p.next。
  2. A 线程 CAS 设置成功:

ConcurrentLinkedQueue - 图5

  1. B 线程 CAS 执行失败, 重新循环,会执行到 p = (p != t && t != (t = tail)) ? t : q

ConcurrentLinkedQueue - 图6

  1. 再次循环就可以成功设置上了。

    获取元素

    1. public E poll() {
    2. restartFromHead:
    3. // 无限循环
    4. for (;;) {
    5. for (Node<E> h = head, p = h, q;;) {
    6. // 头结点的 iterm
    7. E item = p.item;
    8. // 当前节点如果不为 null CAS 设置为 null
    9. if (item != null && p.casItem(item, null)) {
    10. // CAS 成功 则标记移除
    11. if (p != h) // hop two nodes at a time
    12. updateHead(h, ((q = p.next) != null) ? q : p);
    13. return item;
    14. }
    15. // 当前队列未空 返回 null
    16. else if ((q = p.next) == null) {
    17. updateHead(h, p);
    18. return null;
    19. }
    20. // 自引用了, 重新进行循环
    21. else if (p == q)
    22. continue restartFromHead;
    23. else
    24. p = q;
    25. }
    26. }
    27. }

    画图过程如下:

  2. 在执行内层循环时,如果队列为空:E item = p.item; 此时,iterm 为 null,会 updateHead(h, p) 并返回 null。

  3. 假设同时有并发插入操作,添加了一个元素,此时如图所示:

ConcurrentLinkedQueue - 图7
这时会执行最后的 else 将 p = q
ConcurrentLinkedQueue - 图8

  1. 继续循环获取 item,并执行 p.casItem(item, null) , 然后判断 p != h,更新 head 并返回 item。

ConcurrentLinkedQueue - 图9
这里的情况比较复杂,这里只是列举一种,如果需要可以自己多列举几种。
而查看元素的代码和获取元素代码类似就不多介绍了。

size 操作

  1. public int size() {
  2. int count = 0;
  3. for (Node<E> p = first(); p != null; p = succ(p))
  4. if (p.item != null)
  5. // Collection.size() spec says to max out
  6. if (++count == Integer.MAX_VALUE)
  7. break;
  8. return count;
  9. }

CAS 没有加锁,所以 size 是不准确的。并且 size 会遍历一遍列表,比较耗费性能。

总结

ConcurrentLinkedQueue 在工作中使用的相对较少,所以阅读相关源码的时候也只是大概看了一下,了解常用 API,以及底层原理。
简单总结就是使用单向链表来保存队列元素,内部使用非阻塞的 CAS 算法,没有加锁。所以计算 size 时可能不准确,同样 size 会遍历链表,所以并不建议使用。