只实现了 Queue 接口,并没有实现 BlockingQueue 接口,所以它是非阻塞队列,也不能用于线程池中,但是它是线程安全的,可用于多线程环境中。无界队列

成员变量

  1. private transient volatile Node<E> head;//头结点
  2. private transient volatile Node<E> tail;//尾节点,指向最后一个节点或者倒数第二个节点
  3. private static final sun.misc.Unsafe UNSAFE;//操作cas的类
  4. private static final long headOffset;
  5. private static final long tailOffset;
  6. private static class Node<E> {
  7. volatile E item;
  8. volatile Node<E> next;//单链表
  9. }

构造器

  1. public ConcurrentLinkedQueue() {
  2. head = tail = new Node<E>(null);// 初始化头尾节点,指向同一个对象
  3. }
  4. public ConcurrentLinkedQueue(Collection<? extends E> c) {
  5. Node<E> h = null, t = null;
  6. for (E e : c) {// 遍历c,并把它元素全部添加到单链表中
  7. checkNotNull(e);
  8. Node<E> newNode = new Node<E>(e);
  9. if (h == null)
  10. h = t = newNode;
  11. else {
  12. t.lazySetNext(newNode);//不保证可见性的方式提升性能
  13. t = newNode;
  14. }
  15. }
  16. if (h == null)
  17. h = t = new Node<E>(null);
  18. head = h;
  19. tail = t;
  20. }

入队

  1. public boolean add(E e) {
  2. return offer(e);
  3. }
  4. // 入队到链表尾
  5. public boolean offer(E e) {
  6. checkNotNull(e);// 不能添加空元素
  7. final Node<E> newNode = new Node<E>(e); // 创建新节点
  8. for (Node<E> t = tail, p = t;;) { //定义了两个变量 t,p,自旋
  9. Node<E> q = p.next;
  10. if (q == null) { // 如果没有next,说明到链表尾部了(tail指向最后一个元素),就入队
  11. 334 if (p.casNext(null, newNode)) {//CAS更新p.next为新节点,如果成功了,就返回true,如果不成功就自旋
  12. //如果p不等于t,说明有其它线程先一步更新tail, 也就不会走到q==null这个分支了,p取到的可能是t后面的值
  13. if (p != t)
  14. casTail(t, newNode); // 把tail原子更新为新节点,失败也ok(指向倒数第二)
  15. 340 return true;// 返回入队成功
  16. }
  17. }
  18. else if (p == q)
  19. p = (t != (t = tail)) ? t : head;// 如果p的next等于p,说明p已经被删除了(已经出队了),重新设置p的值
  20. else
  21. p = (p != t && t != (t = tail)) ? t : q;// t后面还有值,重新设置p的值
  22. }
  23. }

变量t代表的是tail节点,p用来表示队列的尾节点,初始情况下等于tail节点,q指向p的next
判断是不是尾节点的依据是next是不是null
第一次添加元素:p,t 都指向tail,q为null,执行334行代码,
cas成功则返回true,此时p==t 直接执行340行,循环终止,
返回为false,则说明被其他线程抢先添加了节点,此时进入下一轮循环:
p,t 的值都未变,指向tail,但q不为null而是真正的尾节点,p != q,进入else的逻辑,此时p==t(三目表达式为false),p赋值为q,p指向真正的尾节点,进入下一轮循环,q为null,重复上面第一步的逻辑,一直到cas成功返回true为止,

第二次添加元素:此时tail指向倒数第二个元素,p,t都指向tail,q指向最后一个节点,执行else逻辑,p==t(三目表达式为false),p赋值为q,p指向真正的尾节点,进入下一轮循环,q为null,
cas成功:则执行336行 此时p != t(t执行tail,p指向最后一个节点),执行casTail,将 t 指向新节点(tail也指向新节点),casTail执行失败也没关系(说明有另外一个线程设置了tail节点),并结束循环。 tail指向最后一个节点,
cas失败:则说明被其他线程抢先添加了节点,此时进入下一轮循环,q指向最后一个节点(p指向倒数第二个),进入else,p !=t成立,t !=tail不成立(如果成立说明tail节点已经指向了新节点,p赋值为tail),p被赋值为q。p都指向最后一个节点
p==q : (这里其实又是另外一种场景)说明p指向的节点的next也指向它自己,这种节点称之为哨兵节点,这种节点在队列中存在的价值不大,一般表示为要删除的节点或者是空节点
tail没有一直指向尾节点的原因:固定指向尾节点的话,每次放入元素都需要移动tail,需要一次cas操作。不固定指向的话,根据tail找到尾节点,可以节省移动tail操作,1.8版本的jdk为offer两次移动一次tail(1.7版本为一个常量值,可配),根据tail遍历找到尾节点比cas操作效率要高

出队

  1. public E remove() {
  2. E x = poll();
  3. if (x != null)
  4. return x;
  5. else
  6. throw new NoSuchElementException();
  7. }
  8. public E poll() {
  9. restartFromHead:
  10. for (;;) {
  11. for (Node<E> h = head, p = h, q;;) {
  12. E item = p.item;// 尝试弹出链表的头节点
  13. if (item != null && p.casItem(item, null)) {// 如果节点的值不为空,并且将其更新为null成功了
  14. // 如果头节点变了,则不会走到这个分支
  15. // 会先走下面的分支拿到新的头节点
  16. // 这时候p就不等于h了,就更新头节点
  17. // 在updateHead()中会把head更新为新节点
  18. // 并让head的next指向其自己
  19. if (p != h) // hop two nodes at a time
  20. updateHead(h, ((q = p.next) != null) ? q : p);
  21. // 上面的casItem()成功,就可以返回出队的元素了
  22. return item;
  23. }
  24. // 下面三个分支说明头节点变了
  25. // 且p的item肯定为null
  26. else if ((q = p.next) == null) {
  27. // 如果p的next为空,说明队列中没有元素了
  28. // 更新h为p,也就是空元素的节点
  29. updateHead(h, p);
  30. // 返回null
  31. return null;
  32. }
  33. else if (p == q)
  34. // 如果p等于p的next,说明p已经出队了,重试
  35. continue restartFromHead;
  36. else
  37. // 将p设置为p的next
  38. p = q;
  39. }
  40. }
  41. }
  42. // 更新头节点的方法
  43. final void updateHead(Node<E> h, Node<E> p) {
  44. // 原子更新h为p成功后,延迟更新h的next为它自己
  45. // 这里用延迟更新是安全的,因为head节点已经变了
  46. // 只要入队出队的时候检查head有没有变化就行了,跟它的next关系不大
  47. if (h != p && casHead(h, p))
  48. h.lazySetNext(h);
  49. }

出队的整个逻辑比较清晰:
(1)定位到头节点,尝试更新其值为null;
(2)如果成功了,就成功出队;
(3)如果失败或者头节点变化了,就重新寻找头节点,并重试;
(4)整个出队过程没有一点阻塞相关的代码,所以出队的时候不会阻塞线程,没找到元素就返回null;

ConcurrentLinkedQueue与LinkedBlockingQueue对比

(1)两者都是线程安全的队列;
(2)两者都可以实现取元素时队列为空直接返回null,后者的poll()方法可以实现此功能;
(3)前者全程无锁,后者全部都是使用重入锁控制的;
(4)前者效率较高,后者效率较低;
(5)前者无法实现如果队列为空等待元素到来的操作;
(6)前者是非阻塞队列,后者是阻塞队列;
(7)前者无法用在线程池中,后者可以;