ConcurrentLinkedQueue 是使用非阻塞 CAS 的方式实现的线程安全的 FIFO 队列,是一个基于链表结构实现的线程安全队列。当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

  1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  2. implements Queue<E>, java.io.Serializable {
  3. // 链表节点
  4. private transient volatile Node<E> head;
  5. private transient volatile Node<E> tail;
  6. // Node节点结构,通过next形成链表结构
  7. private static class Node<E> {
  8. volatile E item;
  9. volatile Node<E> next;
  10. // 对Node进行操作时,采用了CAS
  11. boolean casItem(E cmp, E val) {
  12. return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  13. }
  14. boolean casNext(Node<E> cmp, Node<E> val) {
  15. return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  16. }
  17. }
  18. }

ConcurrentLinkedQueue 的每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,在创建队列时头、尾节点都指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素,通过 head 及 succ() 后继方法能完整地遍历整个链表。对于 tail 节点来说,由于 tail 的更新并不总是及时的,可能会产生延迟,所以它并不总是表示队列真正的末尾节点。

常用方法

  1. // 将指定元素插入此队列的尾部,由于队列采用链表实现,所以是无界的,因此将永远不会抛出IllegalStateException或返回false
  2. public boolean add(E e) {
  3. return offer(e);
  4. }
  5. // 获取队列头部元素并删除,如果队列为空则返回null
  6. public E poll()
  7. // 获取队列头部元素但不删除,如果队列为空则返回null
  8. public E peek()
  9. // 判断队列是否为空,根据头结点是否为空进行判断
  10. public boolean isEmpty() {
  11. return first() == null;
  12. }

源码分析

1. offer

offer 方法主要做两件事情:一是将入队节点设置成当前队列尾节点的下一个节点;二是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,因此增加一个元素后 tail 并不总会被更新,所以 tail 节点并不总是尾节点。

值得注意的是,offer 方法没有任何锁操作,线程安全完全由 CAS 操作和队列的算法来保证。整个方法的核心是 for 循环,这个循环没有出口,直到尝试成功,这也符合 CAS 操作的流程。

  1. public boolean offer(E e) {
  2. // 待添加元素不能为null
  3. checkNotNull(e);
  4. // 创建入队节点
  5. final Node<E> newNode = new Node<E>(e);
  6. // t,p为尾节点,默认相等,死循环CAS尝试添加到队列尾部
  7. for (Node<E> t = tail, p = t;;) {
  8. // p表示队列尾结点(默认队尾结点就是tail结点),获取p结点的next节点
  9. Node<E> q = p.next;
  10. // q为空,说明tail节点的next节点为空,此时p就是tail节点
  11. if (q == null) {
  12. // 设置p的next指向当前节点,通过CAS操作将入列节点设置为当前队尾节点的next节点
  13. if (p.casNext(null, newNode)) {
  14. // 判断tail节点和p节点距离达到两个节点
  15. if (p != t)
  16. // 如果tail不是尾节点则将入队节点设置为tail,如果失败了则说明有其他线程已经把tail移动过
  17. casTail(t, newNode);
  18. return true;
  19. }
  20. }
  21. // 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回
  22. else if (p == q)
  23. p = (t != (t = tail)) ? t : head;
  24. else
  25. // Check for tail updates after two hops.
  26. p = (p != t && t != (t = tail)) ? t : q;
  27. }
  28. }

tail 节点不一定为尾节点的设计意图

对于 FIFO 队列来说,入队所要做的事情就是将入队节点设置成尾节点,如果让 tail 节点永远作为队列的尾节点的话,这样实现代码量会少很多,而且逻辑清楚易懂。但这么做有个缺点,就是每次都需要使用循环 CAS 更新 tail 节点。如果能减少 CAS 更新 tail 节点的次数,就能提高入队的效率。

在 JDK 1.8 的实现中,tail 的更新时机是通过 p 和 t 是否相等来判断的,即当 tail 节点和尾节点的距离大于等于常量 HOPS 的值(默认为 1)时才更新 tail。tail 和尾节点的距离越长使用 CAS 更新 tail 节点的次数就会越少,但距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但这样仍能提高入队的效率,因为从本质上来看它通过增加对 volatile 变量的读操作来减少了对 volatile 变量的写操作,所以入队效率会有所提升。
20180625162704934.png

2. poll

出队操作就是从队列里返回一个最早插入的节点元素,并清空该节点对元素的引用。但并不是每次出队都需要更新 head 节点,当 head 节点有元素时,直接弹出 head 节点的元素,并以 CAS 方式设置节点的 item 为 null,不会更新 head 节点(代码 9-14 行);如果 head 节点为 null,则表示已经有一个线程刚刚进行了出列操作,然后更新 head 节点;

  1. public E poll() {
  2. // 设置起始点
  3. restartFromHead:
  4. // 死循环CAS尝试出队
  5. for (;;) {
  6. // 定义p,h两个指针都指向head,即获取最早入队的元素
  7. for (Node<E> h = head, p = h, q;;) {
  8. // 获取头节点元素
  9. E item = p.item;
  10. // 如果头节点元素不为null,通过cas设置p节点引用的元素为null
  11. if (item != null && p.casItem(item, null)) {
  12. if (p != h)
  13. // 更新头结点,预期值是h,当p的next指向不为空,更新值是q,为空则是p
  14. updateHead(h, ((q = p.next) != null) ? q : p);
  15. return item;
  16. }
  17. // 如果item为空说明已经被出队了,然后判断q是否null,是空则说明当前队列为空了
  18. else if ((q = p.next) == null) {
  19. updateHead(h, p);
  20. return null;
  21. }
  22. // 节点出队失败,重新跳到restartFromHead来进行出队
  23. else if (p == q)
  24. continue restartFromHead;
  25. else
  26. p = q;
  27. }
  28. }
  29. }

只有当 head 节点没有元素值时,出队操作才会更新 head 节点(代码 16-19 行),这种做法是为了减少 CAS 方式更新 head 节点的消耗,提供出队的效率。
20180625144156632.png

3. size

由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不精确,所以在井发情况下 size 函数不是很有用。