10. ConcurrentLinkedQueue
ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是
- 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
- dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
- 只是这【锁】使用了 cas 来实现
事实上,ConcurrentLinkedQueue 应用还是非常广泛的
例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
ConcurrentLinkedQueue 原理
1. 模仿 ConcurrentLinkedQueue
初始代码
package cn.itcast.concurrent.thirdpart.test;import java.util.Collection;import java.util.Iterator;import java.util.Queue;import java.util.concurrent.atomic.AtomicReference;public class Test3 {public static void main(String[] args) {MyQueue<String> queue = new MyQueue<>();queue.offer("1");queue.offer("2");queue.offer("3");System.out.println(queue);}}class MyQueue<E> implements Queue<E> {@Overridepublic String toString() {StringBuilder sb = new StringBuilder();for (Node<E> p = head; p != null; p = p.next.get()) {E item = p.item;if (item != null) {sb.append(item).append("->");}}sb.append("null");return sb.toString();}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return false;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<E> iterator() {return null;}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return null;}@Overridepublic boolean add(E e) {return false;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends E> c) {return false;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic E remove() {return null;}@Overridepublic E element() {return null;}@Overridepublic E peek() {return null;}public MyQueue() {head = last = new Node<>(null, null);}private volatile Node<E> last;private volatile Node<E> head;private E dequeue() {/*Node<E> h = head;Node<E> first = h.next;h.next = h;head = first;E x = first.item;first.item = null;return x;*/return null;}@Overridepublic E poll() {return null;}@Overridepublic boolean offer(E e) {return true;}static class Node<E> {volatile E item;public Node(E item, Node<E> next) {this.item = item;this.next = new AtomicReference<>(next);}AtomicReference<Node<E>> next;}}
offer方法
public boolean offer(E e) {
Node<E> n = new Node<>(e, null);
while(true) {
// 获取尾节点
AtomicReference<Node<E>> next = last.next;
// S1: 真正尾节点的 next 是 null, cas 从 null 到新节点
if(next.compareAndSet(null, n)) {
// 这时的 last 已经是倒数第二, next 不为空了, 其它线程的 cas 肯定失败
// S2: 更新 last 为倒数第一的节点
last = n;
return true;
}
}
}
