10. ConcurrentLinkedQueue

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是

  • 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
  • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
  • 只是这【锁】使用了 cas 来实现

事实上,ConcurrentLinkedQueue 应用还是非常广泛的
例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
ConcurrentLinkedQueue - 图1

ConcurrentLinkedQueue 原理

1. 模仿 ConcurrentLinkedQueue

初始代码

  1. package cn.itcast.concurrent.thirdpart.test;
  2. import java.util.Collection;
  3. import java.util.Iterator;
  4. import java.util.Queue;
  5. import java.util.concurrent.atomic.AtomicReference;
  6. public class Test3 {
  7. public static void main(String[] args) {
  8. MyQueue<String> queue = new MyQueue<>();
  9. queue.offer("1");
  10. queue.offer("2");
  11. queue.offer("3");
  12. System.out.println(queue);
  13. }
  14. }
  15. class MyQueue<E> implements Queue<E> {
  16. @Override
  17. public String toString() {
  18. StringBuilder sb = new StringBuilder();
  19. for (Node<E> p = head; p != null; p = p.next.get()) {
  20. E item = p.item;
  21. if (item != null) {
  22. sb.append(item).append("->");
  23. }
  24. }
  25. sb.append("null");
  26. return sb.toString();
  27. }
  28. @Override
  29. public int size() {
  30. return 0;
  31. }
  32. @Override
  33. public boolean isEmpty() {
  34. return false;
  35. }
  36. @Override
  37. public boolean contains(Object o) {
  38. return false;
  39. }
  40. @Override
  41. public Iterator<E> iterator() {
  42. return null;
  43. }
  44. @Override
  45. public Object[] toArray() {
  46. return new Object[0];
  47. }
  48. @Override
  49. public <T> T[] toArray(T[] a) {
  50. return null;
  51. }
  52. @Override
  53. public boolean add(E e) {
  54. return false;
  55. }
  56. @Override
  57. public boolean remove(Object o) {
  58. return false;
  59. }
  60. @Override
  61. public boolean containsAll(Collection<?> c) {
  62. return false;
  63. }
  64. @Override
  65. public boolean addAll(Collection<? extends E> c) {
  66. return false;
  67. }
  68. @Override
  69. public boolean removeAll(Collection<?> c) {
  70. return false;
  71. }
  72. @Override
  73. public boolean retainAll(Collection<?> c) {
  74. return false;
  75. }
  76. @Override
  77. public void clear() {
  78. }
  79. @Override
  80. public E remove() {
  81. return null;
  82. }
  83. @Override
  84. public E element() {
  85. return null;
  86. }
  87. @Override
  88. public E peek() {
  89. return null;
  90. }
  91. public MyQueue() {
  92. head = last = new Node<>(null, null);
  93. }
  94. private volatile Node<E> last;
  95. private volatile Node<E> head;
  96. private E dequeue() {
  97. /*Node<E> h = head;
  98. Node<E> first = h.next;
  99. h.next = h;
  100. head = first;
  101. E x = first.item;
  102. first.item = null;
  103. return x;*/
  104. return null;
  105. }
  106. @Override
  107. public E poll() {
  108. return null;
  109. }
  110. @Override
  111. public boolean offer(E e) {
  112. return true;
  113. }
  114. static class Node<E> {
  115. volatile E item;
  116. public Node(E item, Node<E> next) {
  117. this.item = item;
  118. this.next = new AtomicReference<>(next);
  119. }
  120. AtomicReference<Node<E>> next;
  121. }
  122. }

offer方法

public boolean offer(E e) {
    Node<E> n = new Node<>(e, null);
    while(true) {
        // 获取尾节点
        AtomicReference<Node<E>> next = last.next;
        // S1: 真正尾节点的 next 是 null, cas 从 null 到新节点
        if(next.compareAndSet(null, n)) {
            // 这时的 last 已经是倒数第二, next 不为空了, 其它线程的 cas 肯定失败
            // S2: 更新 last 为倒数第一的节点
            last = n;
            return true;
        }
    }
}