BlockingQueue 阻塞队列

BlockingQueue是最常见的一种。BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者
在Concurrent包中,BlockingQueue是一个接口,有许多个不同的实现类,如图所示。
image.png
接口定义:

  1. public interface BlockingQueue<E> extends Queue<E> {
  2. //...
  3. boolean add(E e);
  4. boolean offer(E e);
  5. void put(E e) throws InterruptedException;
  6. boolean remove(Object o); E take() throws InterruptedException;
  7. E poll(long timeout, TimeUnit unit) throws InterruptedException;
  8. //...
  9. }

add(…)和offer(..)是无阻塞的,也是Queue本身定义的接口,
而put(..)是阻塞的。add(…)和offer(..)的区别不大,当队列为满的时候,前者会抛出异常,后者则直接返回false。
出队列与之类似,提供了remove()、poll()、take()等方法,remove()是非阻塞式的,take()和poll()是阻塞式的。

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的环形队列,在构造方法中,会要求传入数组的容量。

  1. public ArrayBlockingQueue(int capacity) {
  2. this(capacity, false);
  3. }
  4. public ArrayBlockingQueue(int capacity, boolean fair) {
  5. // ...
  6. }
  7. public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
  8. this(capacity, fair);
  9. // ...
  10. }

其核心数据结构:

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  2. //...
  3. final Object[] items;
  4. // 队头指针
  5. int takeIndex;
  6. // 队尾指针
  7. int putIndex;
  8. int count;
  9. // 核心为1个锁外加两个条件
  10. final ReentrantLock lock;
  11. private final Condition notEmpty;
  12. private final Condition notFull;
  13. //...
  14. }

其put方法:

  1. /**
  2. * Inserts the specified element at the tail of this queue, waiting
  3. * for space to become available if the queue is full.
  4. *
  5. * @throws InterruptedException {@inheritDoc}
  6. * @throws NullPointerException {@inheritDoc}
  7. */
  8. public void put(E e) throws InterruptedException {
  9. Objects.requireNonNull(e);
  10. final ReentrantLock lock = this.lock;
  11. //可中断的Lock
  12. lock.lockInterruptibly();
  13. try {
  14. while (count == items.length)
  15. //若队列满,则阻塞
  16. notFull.await();
  17. enqueue(e);
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. private void enqueue(E e) {
  23. // assert lock.isHeldByCurrentThread();
  24. // assert lock.getHoldCount() == 1;
  25. // assert items[putIndex] == null;
  26. final Object[] items = this.items;
  27. items[putIndex] = e;
  28. if (++putIndex == items.length) putIndex = 0;
  29. count++;
  30. notEmpty.signal();//当数据put到queue中,通知非空条件
  31. }

take方法:

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. //如果队列为空 则阻塞
  7. notEmpty.await();
  8. return dequeue();
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
  13. private E dequeue() {
  14. // assert lock.isHeldByCurrentThread();
  15. // assert lock.getHoldCount() == 1;
  16. // assert items[takeIndex] != null;
  17. final Object[] items = this.items;
  18. @SuppressWarnings("unchecked")
  19. E e = (E) items[takeIndex];
  20. items[takeIndex] = null;
  21. if (++takeIndex == items.length) takeIndex = 0;
  22. count--;
  23. if (itrs != null)
  24. itrs.elementDequeued();
  25. notFull.signal(); //take结束 通知非满条件
  26. return e;
  27. }

LinkedBlockingQueue

LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是2个指针分开操作的,所以用了2把锁+2个条件,同时有1个AtomicInteger的原子变量记录count数。

  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  2. // ...
  3. private final int capacity;
  4. // 原子变量
  5. private final AtomicInteger count = new AtomicInteger(0);
  6. // 单向链表的头部
  7. private transient Node<E> head;
  8. // 单向链表的尾部
  9. private transient Node<E> last;
  10. // 两把锁,两个条件
  11. private final ReentrantLock takeLock = new ReentrantLock();
  12. private final Condition notEmpty = takeLock.newCondition();
  13. private final ReentrantLock putLock = new ReentrantLock();
  14. private final Condition notFUll = putLock.newCondition();
  15. // ...
  16. }

在其构造方法中,也可以指定队列的总容量。如果不指定,默认为Integer.MAX_VALUE。

  1. /**
  2. * Creates a {@code LinkedBlockingQueue} with a capacity of
  3. * {@link Integer#MAX_VALUE}.
  4. */
  5. public LinkedBlockingQueue() {
  6. this(Integer.MAX_VALUE);
  7. }
  8. /**
  9. * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
  10. *
  11. * @param capacity the capacity of this queue
  12. * @throws IllegalArgumentException if {@code capacity} is not greater
  13. * than zero
  14. */
  15. public LinkedBlockingQueue(int capacity) {
  16. if (capacity <= 0) throw new IllegalArgumentException();
  17. this.capacity = capacity;
  18. last = head = new Node<E>(null);
  19. }

put方法:

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. final int c;
  4. final Node<E> node = new Node<E>(e);
  5. final ReentrantLock putLock = this.putLock;
  6. final AtomicInteger count = this.count;
  7. putLock.lockInterruptibly();
  8. try {
  9. //无空间生产元素
  10. while (count.get() == capacity) {
  11. notFull.await();
  12. }
  13. enqueue(node);
  14. //拿到count值后将count加一
  15. c = count.getAndIncrement();
  16. //当前元素个数如果小于容量
  17. if (c + 1 < capacity)
  18. notFull.signal(); //如果队列还有剩余空间则通知其他put线程
  19. } finally {
  20. putLock.unlock();
  21. }
  22. if (c == 0) //上面写入说明队列有值了
  23. signalNotEmpty(); //唤醒其他消费者
  24. }

take 方法:

  1. public E take() throws InterruptedException {
  2. final E x;
  3. final int c;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly();
  7. try {
  8. //没有元素则阻塞
  9. while (count.get() == 0) {
  10. notEmpty.await();
  11. }
  12. x = dequeue();
  13. //获取到count值给c count之后减一
  14. c = count.getAndDecrement();//如果还有元素,则通知其他take线程
  15. if (c > 1)
  16. notEmpty.signal();
  17. } finally {
  18. takeLock.unlock();
  19. }
  20. if (c == capacity) //上面消费一个之后有空位
  21. signalNotFull();//唤醒生产者
  22. return x;
  23. }

LinkedBlockingQueue和ArrayBlockingQueue的差异:
1. 为了提高并发度,用2把锁,分别控制队头、队尾的操作。意味着在put(…)和put(…)之间、take()与take()之间是互斥的,put(…)和take()之间并不互斥。但对于count变量,双方都需要操作,所以必须是原子类型。
2. 因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须再加上对方的锁,就是signalNotEmpty()和signalNotFull()方法。示例如下所示。

  1. /**
  2. * Signals a waiting take. Called only from put/offer (which do not
  3. * otherwise ordinarily lock takeLock.)
  4. */
  5. private void signalNotEmpty() {
  6. final ReentrantLock takeLock = this.takeLock;
  7. takeLock.lock(); //必须获取takeLock锁 才可以调用notEmpty.signal()
  8. try {
  9. notEmpty.signal();
  10. } finally {
  11. takeLock.unlock();
  12. }
  13. }
  14. /**
  15. * Signals a waiting put. Called only from take/poll.
  16. */
  17. private void signalNotFull() {
  18. final ReentrantLock putLock = this.putLock;
  19. putLock.lock();
  20. try {
  21. notFull.signal();
  22. } finally {
  23. putLock.unlock();
  24. }
  25. }
  1. 不仅put会通知 take,take 也会通知 put。当put 发现非满的时候,也会通知其他 put线程;当take发现非空的时候,也会通知其他take线程。

PriorityBlockingQueue

队列通常是先进先出的,而PriorityQueue是按照元素的优先级从小到大出队列的。正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并实现Comparable接口。
其核心数据结构如下:

  1. public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  2. //...
  3. // 用数组实现的二插小根堆
  4. private transient Object[] queue;
  5. private transient int size;
  6. private transient Comparator<? super E> comparator;
  7. // 1个锁+一个条件,没有非满条件
  8. private final ReentrantLock lock;
  9. private final Condition notEmpty;
  10. //...
  11. }

其构造方法如下所示,如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自动扩容。

  1. /**
  2. * Default array capacity.
  3. */
  4. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  5. /**
  6. * The maximum size of array to allocate.
  7. * Some VMs reserve some header words in an array.
  8. * Attempts to allocate larger arrays may result in
  9. * OutOfMemoryError: Requested array size exceeds VM limit
  10. */
  11. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  12. public PriorityBlockingQueue() {
  13. this(DEFAULT_INITIAL_CAPACITY, null);
  14. }
  15. /**
  16. * Creates a {@code PriorityBlockingQueue} with the specified
  17. * initial capacity that orders its elements according to their
  18. * {@linkplain Comparable natural ordering}.
  19. *
  20. * @param initialCapacity the initial capacity for this priority queue
  21. * @throws IllegalArgumentException if {@code initialCapacity} is less
  22. * than 1
  23. */
  24. public PriorityBlockingQueue(int initialCapacity) {
  25. this(initialCapacity, null);
  26. }
  27. /**
  28. * Creates a {@code PriorityBlockingQueue} with the specified initial
  29. * capacity that orders its elements according to the specified
  30. * comparator.
  31. *
  32. * @param initialCapacity the initial capacity for this priority queue
  33. * @param comparator the comparator that will be used to order this
  34. * priority queue. If {@code null}, the {@linkplain Comparable
  35. * natural ordering} of the elements will be used.
  36. * @throws IllegalArgumentException if {@code initialCapacity} is less
  37. * than 1
  38. */
  39. public PriorityBlockingQueue(int initialCapacity,
  40. Comparator<? super E> comparator) {
  41. if (initialCapacity < 1)
  42. throw new IllegalArgumentException();
  43. this.comparator = comparator;
  44. this.queue = new Object[Math.max(1, initialCapacity)];
  45. }

put方法:

  1. public void put(E e) {
  2. offer(e); // never need to block 自定扩容,不会阻塞
  3. }
  4. /**
  5. * Inserts the specified element into this priority queue.
  6. * As the queue is unbounded, this method will never return {@code false}.
  7. *
  8. * @param e the element to add
  9. * @return {@code true} (as specified by {@link Queue#offer})
  10. * @throws ClassCastException if the specified element cannot be compared
  11. * with elements currently in the priority queue according to the
  12. * priority queue's ordering
  13. * @throws NullPointerException if the specified element is null
  14. */
  15. public boolean offer(E e) {
  16. if (e == null)
  17. throw new NullPointerException();
  18. final ReentrantLock lock = this.lock;//唯一的同步锁
  19. lock.lock();
  20. int n, cap;
  21. Object[] es;
  22. while ((n = size) >= (cap = (es = queue).length))
  23. tryGrow(es, cap); //元素数超过了数据的长度,则扩容
  24. try {
  25. final Comparator<? super E> cmp;
  26. if ((cmp = comparator) == null) //如果没有定义比较操作,则使用元素自带的比较功能
  27. siftUpComparable(n, e, es);
  28. else //元素入堆, 执行siftUp操作
  29. siftUpUsingComparator(n, e, es, cmp);
  30. size = n + 1;
  31. notEmpty.signal();
  32. } finally {
  33. lock.unlock();
  34. }
  35. return true;
  36. }

take方法:

  1. public E take() throws InterruptedException {
  2. //获取锁 可打断
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. E result;
  6. try {
  7. //出队列
  8. while ( (result = dequeue()) == null)
  9. notEmpty.await();
  10. } finally {
  11. lock.unlock();
  12. }
  13. return result;
  14. }
  15. /**
  16. * Mechanics for poll(). Call only while holding lock.
  17. */
  18. private E dequeue() {
  19. // assert lock.isHeldByCurrentThread();
  20. final Object[] es;
  21. final E result;
  22. //因为是最小二叉堆,堆顶就是要出队的元素
  23. if ((result = (E) ((es = queue)[0])) != null) {
  24. final int n;
  25. final E x = (E) es[(n = --size)];
  26. es[n] = null;
  27. if (n > 0) {
  28. final Comparator<? super E> cmp;
  29. if ((cmp = comparator) == null) //出了一个元素 需要调整最小二叉堆
  30. siftDownComparable(0, x, es, n);
  31. else //调整堆,执行siftDown方法
  32. siftDownUsingComparator(0, x, es, n, cmp);
  33. }
  34. }
  35. return result;
  36. }

从上面可以看到,在阻塞的实现方面,和ArrayBlockingQueue的机制相似,主要区别是用数组实现了一个二叉堆,从而实现按优先级从小到大出队列。另一个区别是没有notFull条件,当元素个数超出数组长度时,执行扩容操作。

DelayQueue

DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间,就是“未来将要执行的时间”减去“当前时间”。为此,放入DelayQueue中的元素,必须实现Delayed接口,如下所示。

  1. /**
  2. * A mix-in style interface for marking objects that should be
  3. * acted upon after a given delay.
  4. *
  5. * <p>An implementation of this interface must define a
  6. * {@code compareTo} method that provides an ordering consistent with
  7. * its {@code getDelay} method.
  8. *
  9. * @since 1.5
  10. * @author Doug Lea
  11. */
  12. public interface Delayed extends Comparable<Delayed> {
  13. /**
  14. * Returns the remaining delay associated with this object, in the
  15. * given time unit.
  16. *
  17. * @param unit the time unit
  18. * @return the remaining delay; zero or negative values indicate
  19. * that the delay has already elapsed
  20. */
  21. long getDelay(TimeUnit unit);
  22. }

关于该接口:
1. 如果getDelay的返回值小于或等于0,则说明该元素到期,需要从队列中拿出来执行。
2. 该接口首先继承了 Comparable 接口,所以要实现该接口,必须实现 Comparable 接口。具体来说,就是基于getDelay()的返回值比较两个元素的大小。
核心数据结构:

  1. public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
  2. // ...
  3. // 一把锁和一个非空条件
  4. private final transient ReentrantLock lock = new ReentrantLock();
  5. private final Condition available = lock.newCondition();
  6. // 优先级队列
  7. private final PriorityQueue<E> q = new PriorityQueue<E>();
  8. // ...
  9. }

put/take的实现,先从take说起,因为这样更能看出DelayQueue的特性

  1. /**
  2. * Retrieves and removes the head of this queue, waiting if necessary
  3. * until an element with an expired delay is available on this queue.
  4. *
  5. * @return the head of this queue
  6. * @throws InterruptedException {@inheritDoc}
  7. */
  8. public E take() throws InterruptedException {
  9. final ReentrantLock lock = this.lock;
  10. lock.lockInterruptibly();
  11. try {
  12. for (;;) {
  13. E first = q.peek(); //取出二叉堆的堆顶元素, 即延迟时间最小的
  14. if (first == null)
  15. available.await();//队列为空 则take线程阻塞
  16. else {
  17. //堆顶元素的延迟时间小于等于0 出队列返回
  18. long delay = first.getDelay(NANOSECONDS);
  19. if (delay <= 0L)
  20. return q.poll();
  21. first = null; // don't retain ref while waiting
  22. //如果有其他线程也在等待该元素,则无限期等待。
  23. if (leader != null)
  24. available.await();
  25. else {
  26. //如果当前线程是leader 则表示该我获取元素了
  27. Thread thisThread = Thread.currentThread();
  28. leader = thisThread;
  29. try {
  30. available.awaitNanos(delay); //否则阻塞有限时间
  31. } finally {
  32. if (leader == thisThread)
  33. leader = null;
  34. }
  35. }
  36. }
  37. }
  38. } finally {
  39. if (leader == null && q.peek() != null)
  40. available.signal(); //当前线程是leader 已经获取了堆顶元素,唤醒其他线程
  41. lock.unlock();
  42. }
  43. }

关于take()方法:
1. 不同于一般的阻塞队列,只在队列为空的时候,才阻塞。如果堆顶元素的延迟时间没到,也会阻塞。
2. 在上面的代码中使用了一个优化技术,用一个Thread leader变量记录了等待堆顶元素的第1个线程。为什么这样做呢?通过 getDelay(..)可以知道堆顶元素何时到期,不必无限期等待,可以使用condition.awaitNanos()等待一个有限的时间;只有当发现还有其他线程也在等待堆顶元素(leader!=NULL)时,才需要无限期等待。

put的实现:

  1. public void put(E e) {
  2. offer(e);
  3. }
  4. /**
  5. * Inserts the specified element into this delay queue.
  6. *
  7. * @param e the element to add
  8. * @return {@code true}
  9. * @throws NullPointerException if the specified element is null
  10. */
  11. public boolean offer(E e) {
  12. final ReentrantLock lock = this.lock;
  13. lock.lock();
  14. try {
  15. q.offer(e); //元素放入二叉堆
  16. //如果放进去的元素刚好在堆顶,说明放入的元素延迟时间最小,需要通知等待的线程;否则放入的元素不在堆顶,没有必要通知等待的线程。
  17. if (q.peek() == e) {
  18. leader = null;
  19. available.signal();
  20. }
  21. return true;
  22. } finally {
  23. lock.unlock();
  24. }
  25. }

注意:不是每放入一个元素,都需要通知等待的线程。放入的元素,如果其延迟时间大于当前堆顶的元素延迟时间,就没必要通知等待的线程;只有当延迟时间是最小的,在堆顶时,才有必要通知等待的线程,也就是上面代码中的if (q.peek() == e)

SynchronousQueue

SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(…),线程会阻塞;直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然。对于多个线程而言,例如3个线程,调用3次put(…),3个线程都会阻塞;直到另外的线程调用3次take(),6个线程才同时解锁,反之亦然
构造方法:

  1. /**
  2. * Creates a {@code SynchronousQueue} with the specified fairness policy.
  3. *
  4. * @param fair if true, waiting threads contend in FIFO order for
  5. * access; otherwise the order is unspecified.
  6. */
  7. public SynchronousQueue(boolean fair) {
  8. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  9. }

和锁一样,也有公平和非公平模式。如果是公平模式,则用TransferQueue实现;如果是非公平模式,则用TransferStack实现。这两个类分别是什么呢?先看一下put/take的实现。

  1. /**
  2. * Adds the specified element to this queue, waiting if necessary for
  3. * another thread to receive it.
  4. *
  5. * @throws InterruptedException {@inheritDoc}
  6. * @throws NullPointerException {@inheritDoc}
  7. */
  8. public void put(E e) throws InterruptedException {
  9. if (e == null) throw new NullPointerException();
  10. if (transferer.transfer(e, false, 0) == null) {
  11. Thread.interrupted();
  12. throw new InterruptedException();
  13. }
  14. }
  15. /**
  16. * Retrieves and removes the head of this queue, waiting if necessary
  17. * for another thread to insert it.
  18. *
  19. * @return the head of this queue
  20. * @throws InterruptedException {@inheritDoc}
  21. */
  22. public E take() throws InterruptedException {
  23. E e = transferer.transfer(null, false, 0);
  24. if (e != null)
  25. return e;
  26. Thread.interrupted();
  27. throw new InterruptedException();
  28. }

可以看到,put/take都调用了transfer(…)接口。而TransferQueue和TransferStack分别实现了这个接口。该接口在SynchronousQueue内部,如下所示。如果是put(…),则第1个参数就是对应的元素;如果是take(),则第1个参数为null。后2个参数分别为是否设置超时和对应的超时时间。

  1. /**
  2. * Shared internal API for dual stacks and queues.
  3. */
  4. abstract static class Transferer<E> {
  5. /**
  6. * Performs a put or take.
  7. *
  8. * @param e if non-null, the item to be handed to a consumer;
  9. * if null, requests that transfer return an item
  10. * offered by producer.
  11. * @param timed if this operation should timeout
  12. * @param nanos the timeout, in nanoseconds
  13. * @return if non-null, the item provided or received; if null,
  14. * the operation failed due to timeout or interrupt --
  15. * the caller can distinguish which of these occurred
  16. * by checking Thread.interrupted.
  17. */
  18. abstract E transfer(E e, boolean timed, long nanos);
  19. }

公平模式和非公平模式
设3个线程分别调用了put(…),3个线程会进入阻塞状态,直到其他线程调用3次take(),和3个put(…)一一配对。

如果是公平模式(队列模式),则第1个调用put(…)的线程1会在队列头部,第1个到来的take()线程和它进行配对,遵循先到先配对的原则,所以是公平的;如果是非公平模式(栈模式),则第3个调用put(…)的线程3会在栈顶,第1个到来的take()线程和它进行配对,遵循的是后到先配对的原则,所以是非公平的
image.png

TransferQueue和TransferStack的实现:
TransferQueue:

  1. public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  2. // ...
  3. static final class TransferQueue<E> extends Transferer<E> {
  4. static final class QNode {
  5. volatile QNode next;
  6. volatile Object item;
  7. volatile Thread waiter;
  8. final boolean isData;
  9. //...
  10. }
  11. transient volatile QNode head;
  12. transient volatile QNode tail;
  13. // ...
  14. }
  15. }

TransferQueue是一个基于单向链表而实现的队列,通过head和tail 2个指针记录头部和尾部。初始的时候,head和tail会指向一个空节点,构造方法如下所示。

  1. TransferQueue() {
  2. QNode h = new QNode(null, false); // initialize to dummy node.
  3. head = h;
  4. tail = h;
  5. }

阶段(a):队列中是一个空的节点,head/tail都指向这个空节点。
阶段(b):3个线程分别调用put,生成3个QNode,进入队列。
阶段(c):来了一个线程调用take,会和队列头部的第1个QNode进行配对。
阶段(d):第1个QNode出队列。
image.png

这里有一个关键点:put节点和take节点一旦相遇,就会配对出队列,所以在队列中不可能同时存在put节点和take节点,要么所有节点都是put节点,要么所有节点都是take节点。

  1. @SuppressWarnings("unchecked")
  2. E transfer(E e, boolean timed, long nanos) {
  3. /* Basic algorithm is to loop trying to take either of
  4. * two actions:
  5. *
  6. * 1. If queue apparently empty or holding same-mode nodes,
  7. * try to add node to queue of waiters, wait to be
  8. * fulfilled (or cancelled) and return matching item.
  9. *
  10. * 2. If queue apparently contains waiting items, and this
  11. * call is of complementary mode, try to fulfill by CAS'ing
  12. * item field of waiting node and dequeuing it, and then
  13. * returning matching item.
  14. *
  15. * In each case, along the way, check for and try to help
  16. * advance head and tail on behalf of other stalled/slow
  17. * threads.
  18. *
  19. * The loop starts off with a null check guarding against
  20. * seeing uninitialized head or tail values. This never
  21. * happens in current SynchronousQueue, but could if
  22. * callers held non-volatile/final ref to the
  23. * transferer. The check is here anyway because it places
  24. * null checks at top of loop, which is usually faster
  25. * than having them implicitly interspersed.
  26. */
  27. QNode s = null; // constructed/reused as needed
  28. boolean isData = (e != null);
  29. for (;;) {
  30. QNode t = tail;
  31. QNode h = head; //队列还未初始化,自旋等待
  32. if (t == null || h == null) // saw uninitialized value
  33. continue; // spin
  34. //队列为空或者当前线程和队列中元素为同一种模式
  35. if (h == t || t.isData == isData) { // empty or same-mode
  36. QNode tn = t.next;
  37. //不一致读 重新执行for循环
  38. if (t != tail) // inconsistent read
  39. continue;
  40. if (tn != null) { // lagging tail
  41. advanceTail(t, tn);
  42. continue;
  43. }
  44. if (timed && nanos <= 0L) // can't wait
  45. return null;
  46. if (s == null)
  47. s = new QNode(e, isData); //新建一个节点
  48. //加入尾部
  49. if (!t.casNext(null, s)) // failed to link in
  50. continue;
  51. //后移tail指针
  52. advanceTail(t, s); // swing tail and wait
  53. Object x = awaitFulfill(s, e, timed, nanos); //进入阻塞状态
  54. if (x == s) { // wait was cancelled
  55. clean(t, s);
  56. return null;
  57. }
  58. //从阻塞中唤醒,确定已经处于队列中的第一个元素
  59. if (!s.isOffList()) { // not already unlinked
  60. advanceHead(t, s); // unlink if head
  61. if (x != null) // and forget fields
  62. s.item = s;
  63. s.waiter = null;
  64. }
  65. return (x != null) ? (E)x : e;
  66. } else { // complementary-mode 当前线程可以和队列中第一个元素进行配对
  67. QNode m = h.next; // node to fulfill 取队列中第一个元素
  68. if (t != tail || m == null || h != head)
  69. continue; // inconsistent read 不一致读,重新执行for循环
  70. Object x = m.item;
  71. if (isData == (x != null) || // m already fulfilled 已经配对
  72. x == m || // m cancelled
  73. !m.casItem(x, e)) { // lost CAS 尝试配对
  74. advanceHead(h, m); // dequeue and retry 已经配对,直接出队列
  75. continue;
  76. }
  77. advanceHead(h, m); // successfully fulfilled 配对成功 出队列
  78. LockSupport.unpark(m.waiter); // 唤醒队列中与第一个元素对应的线程
  79. return (x != null) ? (E)x : e;
  80. }
  81. }
  82. }

整个 for 循环有两个大的 if-else 分支,如果当前线程和队列中的元素是同一种模式(都是put节点或者take节点),则与当前线程对应的节点被加入队列尾部并且阻塞;如果不是同一种模式,则选取队列头部的第1个元素进行配对。
这里的配对就是m.casItem(x,e),把自己的item x换成对方的item e,如果CAS操作成功,则配对成功。如果是put节点,则isData=true,item!=null;如果是take节点,则isData=false,item=null。如果CAS操作不成功,则isData和item之间将不一致,也就是isData!=(x!=null),通过这个条件可以判断节点是否已经被匹配过了。

TransferStack

TransferStack的定义如下所示,首先,它也是一个单向链表。不同于队列,只需要head指针就能实现入栈和出栈操作。

  1. static final class TransferStack extends Transferer {
  2. static final int REQUEST = 0;
  3. static final int DATA = 1;
  4. static final int FULFILLING = 2;
  5. static final class SNode {
  6. volatile SNode next;
  7. // 单向链表
  8. volatile SNode match;
  9. // 配对的节点
  10. volatile Thread waiter;
  11. // 对应的阻塞线程
  12. Object item; int mode;
  13. // 三种模式
  14. //...
  15. }
  16. volatile SNode head;
  17. }

链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。

阶段(a):head指向NULL。不同于TransferQueue,这里没有空的头节点。
阶段(b):3个线程调用3次put,依次入栈。
阶段(c):线程4调用take,和栈顶的第1个元素配对,生成FULLFILLING节点,入栈。
阶段(d):栈顶的2个元素同时入栈。
image.png

  1. E transfer(E e, boolean timed, long nanos) {
  2. /*
  3. * Basic algorithm is to loop trying one of three actions:
  4. *
  5. * 1. If apparently empty or already containing nodes of same
  6. * mode, try to push node on stack and wait for a match,
  7. * returning it, or null if cancelled.
  8. *
  9. * 2. If apparently containing node of complementary mode,
  10. * try to push a fulfilling node on to stack, match
  11. * with corresponding waiting node, pop both from
  12. * stack, and return matched item. The matching or
  13. * unlinking might not actually be necessary because of
  14. * other threads performing action 3:
  15. *
  16. * 3. If top of stack already holds another fulfilling node,
  17. * help it out by doing its match and/or pop
  18. * operations, and then continue. The code for helping
  19. * is essentially the same as for fulfilling, except
  20. * that it doesn't return the item.
  21. */
  22. SNode s = null; // constructed/reused as needed
  23. int mode = (e == null) ? REQUEST : DATA;
  24. for (;;) {
  25. SNode h = head;
  26. if (h == null || h.mode == mode) { // empty or same-mode 同一种模式 同是消费者或同是生产者
  27. if (timed && nanos <= 0L) { // can't wait
  28. if (h != null && h.isCancelled())
  29. casHead(h, h.next); // pop cancelled node
  30. else
  31. return null;
  32. } else if (casHead(h, s = snode(s, e, h, mode))) {
  33. SNode m = awaitFulfill(s, timed, nanos); //入栈 阻塞等待
  34. if (m == s) { // wait was cancelled
  35. clean(s);
  36. return null;
  37. }
  38. if ((h = head) != null && h.next == s)
  39. casHead(h, s.next); // help s's fulfiller
  40. return (E) ((mode == REQUEST) ? m.item : s.item);
  41. }
  42. } else if (!isFulfilling(h.mode)) { // try to fulfill //非同一种模式 待匹配
  43. if (h.isCancelled()) // already cancelled
  44. casHead(h, h.next); // pop and retry
  45. else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //生成一个FULFULLING节点 入栈
  46. for (;;) { // loop until matched or waiters disappear
  47. SNode m = s.next; // m is s's match
  48. if (m == null) { // all waiters are gone
  49. casHead(s, null); // pop fulfill node
  50. s = null; // use new node next time
  51. break; // restart main loop
  52. }
  53. SNode mn = m.next;
  54. if (m.tryMatch(s)) { //两个节点一起出栈
  55. casHead(s, mn); // pop both s and m
  56. return (E) ((mode == REQUEST) ? m.item : s.item);
  57. } else // lost match
  58. s.casNext(m, mn); // help unlink
  59. }
  60. }
  61. } else { // help a fulfiller //已经匹配过了 出栈
  62. SNode m = h.next; // m is h's match
  63. if (m == null) // waiter is gone
  64. casHead(h, null); // pop fulfilling node
  65. else {
  66. SNode mn = m.next;
  67. if (m.tryMatch(h)) // help match
  68. casHead(h, mn); // pop both h and m //配对 一起出栈
  69. else // lost match
  70. h.casNext(m, mn); // help unlink
  71. }
  72. }
  73. }
  74. }

BlockingDeque 双端口阻塞队列

BlockingDeque定义了一个阻塞的双端队列接口,如下所示。

  1. public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
  2. void putFirst(E e) throws InterruptedException;
  3. void putLast(E e) throws InterruptedException;
  4. E takeFirst() throws InterruptedException;
  5. E takeLast() throws InterruptedException;
  6. // ...
  7. }

该接口继承了BlockingQueue接口,同时增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque

其核心数据结构如下所示,是一个双向链表。

  1. public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {
  2. static final class Node<E> {
  3. E item; Node<E> prev;
  4. // 双向链表的Node
  5. Node<E> next;
  6. Node(E x) {
  7. item = x;
  8. }
  9. }
  10. transient Node<E> first;
  11. // 队列的头和尾
  12. transient Node<E> last;
  13. private transient int count;
  14. // 元素个数
  15. private final int capacity;
  16. // 容量
  17. // 一把锁+两个条件 生产者和消费者互斥 在同一时间要么在生产要么在消费
  18. final ReentrantLock lock = new ReentrantLock();
  19. private final Condition notEmpty = lock.netCondition(); //非空 唤醒消费者
  20. private final Condition notFull = lock.newCondition(); //非满 唤醒生产 中断生产者阻塞
  21. // ...
  22. }

对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

take方法

  1. public E takeFirst() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. E x;
  6. while ( (x = unlinkFirst()) == null) //取第一个元素
  7. notEmpty.await(); //未取到 阻塞
  8. return x;
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
  13. //取第一个元素
  14. private E unlinkFirst() {
  15. // assert lock.isHeldByCurrentThread();
  16. Node<E> f = first;
  17. if (f == null)
  18. return null;
  19. Node<E> n = f.next;
  20. E item = f.item;
  21. f.item = null;
  22. f.next = f; // help GC 自己指向自己 帮助垃圾回收器回收
  23. first = n;
  24. if (n == null) //链表没有元素
  25. last = null;
  26. else
  27. n.prev = null;
  28. --count;
  29. notFull.signal(); //通知生产者生产
  30. return item;
  31. }
  32. //消费最后一个元素 逻辑与上面相似
  33. public E takeLast() throws InterruptedException {
  34. final ReentrantLock lock = this.lock;
  35. lock.lock();
  36. try {
  37. E x;
  38. while ( (x = unlinkLast()) == null)
  39. notEmpty.await();
  40. return x;
  41. } finally {
  42. lock.unlock();
  43. }
  44. }
  45. /**
  46. * Removes and returns last element, or null if empty.
  47. */
  48. private E unlinkLast() {
  49. // assert lock.isHeldByCurrentThread();
  50. Node<E> l = last;
  51. if (l == null)
  52. return null;
  53. Node<E> p = l.prev;
  54. E item = l.item;
  55. l.item = null;
  56. l.prev = l; // help GC
  57. last = p;
  58. if (p == null)
  59. first = null;
  60. else
  61. p.next = null;
  62. --count;
  63. notFull.signal();
  64. return item;
  65. }

put方法

  1. /**
  2. * @throws NullPointerException {@inheritDoc}
  3. * @throws InterruptedException {@inheritDoc}
  4. */
  5. public void putFirst(E e) throws InterruptedException {
  6. if (e == null) throw new NullPointerException();
  7. Node<E> node = new Node<E>(e);
  8. final ReentrantLock lock = this.lock;
  9. lock.lock();
  10. try {
  11. while (!linkFirst(node))
  12. notFull.await();
  13. } finally {
  14. lock.unlock();
  15. }
  16. }
  17. /**
  18. * Links node as first element, or returns false if full.
  19. */
  20. private boolean linkFirst(Node<E> node) {
  21. // assert lock.isHeldByCurrentThread();
  22. if (count >= capacity)
  23. return false;
  24. Node<E> f = first;
  25. node.next = f;
  26. first = node;
  27. if (last == null)
  28. last = node;
  29. else
  30. f.prev = node;
  31. ++count;
  32. notEmpty.signal();
  33. return true;
  34. }
  35. //尾部插入元素
  36. /**
  37. * @throws NullPointerException {@inheritDoc}
  38. * @throws InterruptedException {@inheritDoc}
  39. */
  40. public void putLast(E e) throws InterruptedException {
  41. if (e == null) throw new NullPointerException();
  42. Node<E> node = new Node<E>(e);
  43. final ReentrantLock lock = this.lock;
  44. lock.lock();
  45. try {
  46. while (!linkLast(node))
  47. notFull.await();
  48. } finally {
  49. lock.unlock();
  50. }
  51. }
  52. /**
  53. * Links node as last element, or returns false if full.
  54. */
  55. private boolean linkLast(Node<E> node) {
  56. // assert lock.isHeldByCurrentThread();
  57. if (count >= capacity)
  58. return false;
  59. Node<E> l = last;
  60. node.prev = l;
  61. last = node;
  62. if (first == null)
  63. first = node;
  64. else
  65. l.next = node;
  66. ++count;
  67. notEmpty.signal();
  68. return true;
  69. }

CopyOnWrite

CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在“读”的时候不加锁。
实现类:

CopyOnWriteArrayList

和ArrayList一样,CopyOnWriteArrayList的核心数据结构也是一个数组,代码如下:

  1. public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
  2. // ...
  3. private volatile transient Object[] array;
  4. }

“读”方法都没有加锁,那么是如何保证“线程安全”呢?答案在“写”方法里面

  1. public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
  2. // 锁对象
  3. final transient Object lock = new Object();
  4. public boolean add(E e) {
  5. synchronized (lock) { // 同步锁对象
  6. Object[] es = getArray();
  7. int len = es.length;
  8. es = Arrays.copyOf(es, len + 1);// CopyOnWrite,写的时候,先拷贝一 份之前的数组。
  9. es[len] = e;
  10. setArray(es);
  11. return true;
  12. }
  13. }
  14. public void add(int index, E element) {
  15. synchronized (lock) { // 同步锁对象
  16. Object[] es = getArray();
  17. int len = es.length;
  18. if (index > len || index < 0)
  19. throw new IndexOutOfBoundsException(outOfBounds(index, len));
  20. Object[] newElements;
  21. int numMoved = len - index;
  22. if (numMoved == 0)
  23. newElements = Arrays.copyOf(es, len + 1);
  24. else {
  25. newElements = new Object[len + 1];
  26. System.arraycopy(es, 0, newElements, 0, index); // CopyOnWrite,写的时候,先拷贝一份之前的数组。
  27. System.arraycopy(es, index, newElements, index + 1, numMoved);
  28. }
  29. newElements[index] = element;
  30. setArray(newElements); // 把新数组赋值给老数组
  31. }
  32. }
  33. }

写方法,例如remove和add类似,此处不再详述

CopyOnWriteArraySet

CopyOnWriteArraySet 就是用 Array 实现的一个 Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList。

  1. public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {
  2. // 新封装的 CopyOnWriteArrayList
  3. private final CopyOnWriteArrayList<E> al;
  4. public CopyOnWriteArraySet() {
  5. al = new CopyOnWriteArrayList<E>();
  6. }
  7. public boolean add(E e) {
  8. return al.addIfAbsent(e); // 不重复的加进去
  9. }
  10. }

ConcurrentLinkedQueue/Deque

AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。

ConcurrentLinkedQueue

ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。
它是一个单向链表,定义如下:

  1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
  2. private static class Node<E> {
  3. volatile E item;
  4. volatile Node<E> next;
  5. //...
  6. }
  7. private transient volatile Node<E> head;
  8. private transient volatile Node<E> tail;
  9. //...
  10. }

其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定后移一个位置,以保证head指向队列头部,tail指向链表尾部。
但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作。下面进行详细分析:
初始化:
初始的时候, head 和 tail 都指向一个 null 节点。对应的代码如下

  1. public ConcurrentLinkedQueue() {
  2. head = tail = new Node<E>(null);
  3. }

image.png
2.入队列

  1. public boolean offer(E e) {
  2. final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
  3. for (Node<E> t = tail, p = t;;) {
  4. Node<E> q = p.next;
  5. if (q == null) {
  6. // p is last node
  7. if (NEXT.compareAndSet(p, null, newNode)) { //对tail的next指针而不是tail指针进行cas操作
  8. // Successful CAS is the linearization point
  9. // for e to become an element of this queue,
  10. // and for newNode to become "live".
  11. if (p != t) // hop two nodes at a time; failure is OK
  12. TAIL.weakCompareAndSet(this, t, newNode); //每入队两个节点 移动一次tail指针 失败也无所谓
  13. return true;
  14. }
  15. // Lost CAS race to another thread; re-read next
  16. }
  17. else if (p == q)
  18. // We have fallen off list. If tail is unchanged, it
  19. // will also be off-list, in which case we need to
  20. // jump to head, from which all live nodes are always
  21. // reachable. Else the new tail is a better bet.
  22. p = (t != (t = tail)) ? t : head; //已经到达尾部
  23. else
  24. // Check for tail updates after two hops.
  25. p = (p != t && t != (t = tail)) ? t : q; //后移p指针
  26. }
  27. }

上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点。如下图所示:
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
step1:p=tail,q=p.next=NULL.
step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail方法不会执行,直接返回。此时tail指针没有变化。
image.png
之后,假设线程2要入队item3节点,如下图所示:
step3:p=tail,q=p.next.
step4:q!=NULL,因此不会入队新节点。p,q都后移1位。
step5:q=NULL,对p的next执行CAS操作,入队item3节点。
step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。
image.png
最后总结一下入队列的两个关键点:
1. 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
2. 只有当 p != tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指
针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。

出队列
上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?

  1. public E poll() {
  2. restartFromHead: for (;;) {
  3. for (Node<E> h = head, p = h, q;; p = q) {
  4. final E item;
  5. if ((item = p.item) != null && p.casItem(item, null)) { //出队列时并没有移动head指针,而是把item置为null
  6. // Successful CAS is the linearization point
  7. // for item to be removed from this queue.
  8. if (p != h) // hop two nodes at a time //每产生2个NULL节点才把head指针后移2位
  9. updateHead(h, ((q = p.next) != null) ? q : p);
  10. return item;
  11. }
  12. else if ((q = p.next) == null) {
  13. updateHead(h, p);
  14. return null;
  15. }
  16. else if (p == q)
  17. continue restartFromHead;
  18. }
  19. }
  20. }

出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3 三个节点。
step1:p=head,q=p.next.p!=q.
step2:后移p指针,使得p=q。
step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
step4:p!=head,此时队列中有了2个 NULL 节点,再前移1次head指针,对其执行updateHead操作。
image.png
最后总结一下出队列的关键点:
1. 出队列的判断并非观察 tail 指针的位置,而是依赖于 head 指针后续的节点是否为NULL这一条件。
2. 只要对节点的item执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。

队列判空
因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较 head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示:

  1. public boolean isEmpty() {
  2. return first() == null; //寻找第一个不是NULL的节点
  3. }
  4. /**
  5. * Returns the first live (non-deleted) node on list, or null if none.
  6. * This is yet another variant of poll/peek; here returning the
  7. * first node, not element. We could make peek() a wrapper around
  8. * first(), but that would cost an extra volatile read of item,
  9. * and the need to add a retry loop to deal with the possibility
  10. * of losing a race to a concurrent poll().
  11. */
  12. Node<E> first() {
  13. restartFromHead: for (;;) {
  14. for (Node<E> h = head, p = h, q;; p = q) { //从head指针开始遍历 查找第一个不是NULL的节点
  15. boolean hasItem = (p.item != null);
  16. if (hasItem || (q = p.next) == null) {
  17. updateHead(h, p);
  18. return hasItem ? p : null;
  19. }
  20. else if (p == q)
  21. continue restartFromHead;
  22. }
  23. }
  24. }

ConcurrentHashMap

HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。ConcurrentHashMap在这个基本原理之上进行了各种优化。
首先是所有数据都放在一个大的HashMap中;其次是引入了红黑树。

image.png

如果头节点是Node类型,则尾随它的就是一个普通的链表;如果头节点是TreeNode类型,它的后面就是一颗红黑树,TreeNode是Node的子类。
链表和红黑树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转换成红黑树;反之,当红黑树中的元素个数小于某个阈值时,再转换为链表。

那为什么要做这种设计呢?
1. 使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突的问题由此得到较好的解决。
2. 加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是Node数组的长度,初始长度为16。
3. 并发扩容,这是难度最大的。当一个线程要扩容Node数组的时候,其他线程还要读写,因此处理过程很复杂,后面会详细分析。

由上述对比可以总结出来:这种设计一方面降低了Hash冲突,另一方面也提升了并发度。

1、构造方法:

  1. /**
  2. * The largest possible table capacity. This value must be
  3. * exactly 1<<30 to stay within Java array allocation and indexing
  4. * bounds for power of two table sizes, and is further required
  5. * because the top two bits of 32bit hash fields are used for
  6. * control purposes.
  7. */
  8. private static final int MAXIMUM_CAPACITY = 1 << 30;
  9. /**
  10. * The default initial table capacity. Must be a power of 2
  11. * (i.e., at least 1) and at most MAXIMUM_CAPACITY.
  12. */
  13. private static final int DEFAULT_CAPACITY = 16;
  14. /**
  15. * The largest possible (non-power of two) array size.
  16. * Needed by toArray and related methods.
  17. */
  18. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  19. /**
  20. * The default concurrency level for this table. Unused but
  21. * defined for compatibility with previous versions of this class.
  22. */
  23. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  24. /**
  25. * The load factor for this table. Overrides of this value in
  26. * constructors affect only the initial table capacity. The
  27. * actual floating point value isn't normally used -- it is
  28. * simpler to use expressions such as {@code n - (n >>> 2)} for
  29. * the associated resizing threshold.
  30. */
  31. private static final float LOAD_FACTOR = 0.75f;
  32. /**
  33. * The bin count threshold for using a tree rather than list for a
  34. * bin. Bins are converted to trees when adding an element to a
  35. * bin with at least this many nodes. The value must be greater
  36. * than 2, and should be at least 8 to mesh with assumptions in
  37. * tree removal about conversion back to plain bins upon
  38. * shrinkage.
  39. */
  40. static final int TREEIFY_THRESHOLD = 8; //触发转换成红黑树操作阈值 有可能不转成红黑树而是触发扩容 需要看元素个数是否超过了MIN_TREEIFY_CAPACITY
  41. /**
  42. * The bin count threshold for untreeifying a (split) bin during a
  43. * resize operation. Should be less than TREEIFY_THRESHOLD, and at
  44. * most 6 to mesh with shrinkage detection under removal.
  45. */
  46. static final int UNTREEIFY_THRESHOLD = 6;
  47. /**
  48. * The smallest table capacity for which bins may be treeified.
  49. * (Otherwise the table is resized if too many nodes in a bin.)
  50. * The value should be at least 4 * TREEIFY_THRESHOLD to avoid
  51. * conflicts between resizing and treeification thresholds.
  52. */
  53. static final int MIN_TREEIFY_CAPACITY = 64; //桶元素数量 当元素量小于此数时不会转为红黑树
  54. /**
  55. * Creates a new, empty map with an initial table size based on
  56. * the given number of elements ({@code initialCapacity}), initial
  57. * table density ({@code loadFactor}), and number of concurrently
  58. * updating threads ({@code concurrencyLevel}).
  59. *
  60. * @param initialCapacity the initial capacity. The implementation
  61. * performs internal sizing to accommodate this many elements,
  62. * given the specified load factor.
  63. * @param loadFactor the load factor (table density) for
  64. * establishing the initial table size
  65. * @param concurrencyLevel the estimated number of concurrently
  66. * updating threads. The implementation may use this value as
  67. * a sizing hint.
  68. * @throws IllegalArgumentException if the initial capacity is
  69. * negative or the load factor or concurrencyLevel are
  70. * nonpositive
  71. */
  72. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
  73. if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
  74. throw new IllegalArgumentException();
  75. if (initialCapacity < concurrencyLevel) // Use at least as many bins
  76. initialCapacity = concurrencyLevel; // as estimated threads
  77. long size = (long)(1.0 + (long)initialCapacity / loadFactor);
  78. int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
  79. this.sizeCtl = cap;
  80. }

在上面的代码中,变量cap就是Node数组的长度,保持为2的整数次方。tableSizeFor(…)方法是根据传入的初始容量,计算出一个合适的数组长度。具体而言:1.5倍的初始容量+1,再往上取最接近的2的整数次方,作为数组长度cap的初始值。
这里的 sizeCtl,其含义是用于控制在初始化或者并发扩容时候的线程数,只不过其初始值设置成cap

2、初始化
在上面的构造方法里只计算了数组的初始大小,并没有对数组进行初始化。当多个线程都往里面放入元素的时候,再进行初始化。这就存在一个问题:多个线程重复初始化。下面看一下是如何处理的

  1. /**
  2. * Initializes table, using the size recorded in sizeCtl.
  3. */
  4. private final Node<K,V>[] initTable() {
  5. Node<K,V>[] tab; int sc;
  6. while ((tab = table) == null || tab.length == 0) {
  7. if ((sc = sizeCtl) < 0)
  8. Thread.yield(); // lost initialization race; just spin 自旋等待
  9. else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { //重点: 将sizeCtl设置为-1 哪个线程抢到-1 则执行下面逻辑
  10. try {
  11. if ((tab = table) == null || tab.length == 0) {
  12. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  13. @SuppressWarnings("unchecked")
  14. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //初始化
  15. table = tab = nt;
  16. // sizeCtl不是数组长度,因此初始化成功后,就不再等于数组长度
  17. // 而是n-(n>>>2)=0.75n,表示下一次扩容的阈值:n-n/4
  18. sc = n - (n >>> 2);
  19. }
  20. } finally {
  21. sizeCtl = sc; // 设置sizeCtl的值为sc。
  22. }
  23. break;
  24. }
  25. }
  26. return tab;
  27. }

多个线程的竞争是通过对sizeCtl进行CAS操作实现的。如果某个线程成功地把 sizeCtl 设置为-1,它就拥有了初始化的权利,进入初始化的代码模块,等到初始化完成,再把sizeCtl设置回去;其他线程则一直执行while循环,自旋等待,直到数组不为null,即当初始化结束时,退出整个方法。
因为初始化的工作量很小,所以此处选择的策略是让其他线程一直等待,而没有帮助其初始化。

3.put(..)实现分析

  1. /**
  2. * Maps the specified key to the specified value in this table.
  3. * Neither the key nor the value can be null.
  4. *
  5. * <p>The value can be retrieved by calling the {@code get} method
  6. * with a key that is equal to the original key.
  7. *
  8. * @param key key with which the specified value is to be associated
  9. * @param value value to be associated with the specified key
  10. * @return the previous value associated with {@code key}, or
  11. * {@code null} if there was no mapping for {@code key}
  12. * @throws NullPointerException if the specified key or value is null
  13. */
  14. public V put(K key, V value) {
  15. return putVal(key, value, false);
  16. }
  17. /** Implementation for put and putIfAbsent */
  18. final V putVal(K key, V value, boolean onlyIfAbsent) {
  19. if (key == null || value == null) throw new NullPointerException();
  20. int hash = spread(key.hashCode());
  21. int binCount = 0;
  22. for (Node<K,V>[] tab = table;;) {
  23. Node<K,V> f; int n, i, fh; K fk; V fv;
  24. // 分支1:整个数组初始化
  25. if (tab == null || (n = tab.length) == 0)
  26. tab = initTable();
  27. // 分支2:第i个元素初始化
  28. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  29. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
  30. break; // no lock when adding to empty bin
  31. }
  32. // 分支3:扩容
  33. else if ((fh = f.hash) == MOVED)
  34. tab = helpTransfer(tab, f);
  35. else if (onlyIfAbsent // check first node without acquiring lock
  36. && fh == hash
  37. && ((fk = f.key) == key || (fk != null && key.equals(fk)))
  38. && (fv = f.val) != null)
  39. return fv;
  40. // 分支4:放入元素
  41. else {
  42. V oldVal = null;
  43. // 重点:加锁
  44. synchronized (f) {
  45. // 链表
  46. if (tabAt(tab, i) == f) {
  47. if (fh >= 0) {
  48. binCount = 1;
  49. for (Node<K,V> e = f;; ++binCount) {
  50. K ek;
  51. if (e.hash == hash &&
  52. ((ek = e.key) == key ||
  53. (ek != null && key.equals(ek)))) {
  54. oldVal = e.val;
  55. if (!onlyIfAbsent)
  56. e.val = value;
  57. break;
  58. }
  59. Node<K,V> pred = e;
  60. if ((e = e.next) == null) {
  61. pred.next = new Node<K,V>(hash, key, value);
  62. break;
  63. }
  64. }
  65. }
  66. else if (f instanceof TreeBin) { //红黑树
  67. Node<K,V> p;
  68. binCount = 2;
  69. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  70. value)) != null) {
  71. oldVal = p.val;
  72. if (!onlyIfAbsent)
  73. p.val = value;
  74. }
  75. }
  76. else if (f instanceof ReservationNode)
  77. throw new IllegalStateException("Recursive update");
  78. }
  79. }
  80. // 如果是链表,上面的binCount会一直累加
  81. if (binCount != 0) {
  82. if (binCount >= TREEIFY_THRESHOLD) //超出阈值 转换为红黑树
  83. treeifyBin(tab, i);
  84. if (oldVal != null)
  85. return oldVal;
  86. break;
  87. }
  88. }
  89. }
  90. addCount(1L, binCount); // 总元素个数累加1
  91. return null;
  92. }

上面的for循环有4个大的分支:
第1个分支,是整个数组的初始化,前面已讲;
第2个分支,是所在的槽为空,说明该元素是该槽的第一个元素,直接新建一个头节点,然后返回;
第3个分支,说明该槽正在进行扩容,帮助其扩容;
第4个分支,就是把元素放入槽内。槽内可能是一个链表,也可能是一棵红黑树,通过头节点的类型可以判断是哪一种。第4个分支是包裹在synchronized (f)里面的,f对应的数组下标位置的头节点,意味着每个数组元素有一把锁,并发度等于数组的长度。

上面的binCount表示链表的元素个数,当这个数目超过TREEIFY_THRESHOLD=8时,把链表转换成红黑树,也就是 treeifyBin(tab,i)方法。但在这个方法内部,不一定需要进行红黑树转换,可能只做扩容操作,所以接下来从扩容讲起。

4.扩容
扩容的实现是最复杂的,下面从treeifyBin(Node[] tab, int index)讲起。

  1. /**
  2. * Replaces all linked nodes in bin at given index unless table is
  3. * too small, in which case resizes instead.
  4. */
  5. private final void treeifyBin(Node<K,V>[] tab, int index) {
  6. Node<K,V> b; int n;
  7. if (tab != null) {
  8. if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
  9. // 数组长度小于阈值64,不做红黑树转换,直接扩容
  10. tryPresize(n << 1);
  11. else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
  12. // 链表转换为红黑树
  13. synchronized (b) {
  14. if (tabAt(tab, index) == b) {
  15. TreeNode<K,V> hd = null, tl = null;
  16. // 遍历链表,初始化红黑树
  17. for (Node<K,V> e = b; e != null; e = e.next) {
  18. TreeNode<K,V> p =
  19. new TreeNode<K,V>(e.hash, e.key, e.val,
  20. null, null);
  21. if ((p.prev = tl) == null)
  22. hd = p;
  23. else
  24. tl.next = p;
  25. tl = p;
  26. }
  27. setTabAt(tab, index, new TreeBin<K,V>(hd));
  28. }
  29. }
  30. }
  31. }
  32. }

在上面的代码中,MIN_TREEIFY_CAPACITY=64,意味着当数组的长度没有超过64的时候,数组的每个节点里都是链表,只会扩容,不会转换成红黑树。只有当数组长度大于或等于64时,才考虑把链表转换成红黑树。
在 tryPresize(int size)内部调用了一个核心方法 transfer(Node<K,V>[] tab,Node<K,V >[] nextTab),先从这个方法的分析说起。

  1. /**
  2. * Tries to presize table to accommodate the given number of elements.
  3. *
  4. * @param size number of elements (doesn't need to be perfectly accurate)
  5. */
  6. private final void tryPresize(int size) {
  7. int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
  8. tableSizeFor(size + (size >>> 1) + 1);
  9. int sc;
  10. while ((sc = sizeCtl) >= 0) {
  11. Node<K,V>[] tab = table; int n;
  12. if (tab == null || (n = tab.length) == 0) {
  13. n = (sc > c) ? sc : c;
  14. if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
  15. try {
  16. if (table == tab) {
  17. @SuppressWarnings("unchecked")
  18. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  19. table = nt;
  20. sc = n - (n >>> 2);
  21. }
  22. } finally {
  23. sizeCtl = sc;
  24. }
  25. }
  26. }
  27. else if (c <= sc || n >= MAXIMUM_CAPACITY)
  28. break;
  29. else if (tab == table) {
  30. int rs = resizeStamp(n);
  31. if (U.compareAndSetInt(this, SIZECTL, sc,
  32. (rs << RESIZE_STAMP_SHIFT) + 2))
  33. transfer(tab, null);
  34. }
  35. }
  36. }
  37. /**
  38. * Moves and/or copies the nodes in each bin to new table. See
  39. * above for explanation.
  40. */
  41. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  42. int n = tab.length, stride;
  43. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  44. stride = MIN_TRANSFER_STRIDE; // subdivide range // 计算步长
  45. if (nextTab == null) { // 初始化新的HashMap
  46. try {
  47. @SuppressWarnings("unchecked")
  48. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 扩容两倍
  49. nextTab = nt;
  50. } catch (Throwable ex) { // try to cope with OOME
  51. sizeCtl = Integer.MAX_VALUE;
  52. return;
  53. }
  54. nextTable = nextTab;
  55. // 初始的transferIndex为旧HashMap的数组长度
  56. transferIndex = n;
  57. }
  58. int nextn = nextTab.length;
  59. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  60. boolean advance = true;
  61. boolean finishing = false; // to ensure sweep before committing nextTab
  62. // 此处,i为遍历下标,bound为边界。
  63. // 如果成功获取一个任务,则i=nextIndex-1
  64. // bound=nextIndex-stride;
  65. // 如果获取不到,则i=0,bound=0
  66. for (int i = 0, bound = 0;;) {
  67. Node<K,V> f; int fh;
  68. // advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
  69. while (advance) {
  70. int nextIndex, nextBound;
  71. // 以下是哪个分支中的advance都是false,表示如果三个分支都不执行,才可以一 直while循环
  72. // 目的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取 一个stride的迁移任务。
  73. if (--i >= bound || finishing)
  74. // 对数组遍历,通过这里的--i进行。如果成功执行了--i,就不需要继续 while循环了,因为advance只能进一步。
  75. advance = false;
  76. else if ((nextIndex = transferIndex) <= 0) {
  77. // transferIndex <= 0,整个HashMap完成
  78. i = -1;
  79. advance = false;
  80. }
  81. // 对transferIndex执行CAS操作,即为当前线程分配1个stride。
  82. // CAS操作成功,线程成功获取到一个stride的迁移任务;
  83. // CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋。
  84. else if (U.compareAndSetInt
  85. (this, TRANSFERINDEX, nextIndex,
  86. nextBound = (nextIndex > stride ?
  87. nextIndex - stride : 0))) {
  88. bound = nextBound;
  89. i = nextIndex - 1;
  90. advance = false;
  91. }
  92. }
  93. // i越界,整个HashMap遍历完成
  94. if (i < 0 || i >= n || i + n >= nextn) {
  95. int sc;
  96. // finishing表示整个HashMap扩容完成
  97. if (finishing) {
  98. nextTable = null;
  99. // 将nextTab赋值给当前table
  100. table = nextTab;
  101. sizeCtl = (n << 1) - (n >>> 1);
  102. return;
  103. }
  104. if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  105. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  106. return;
  107. finishing = advance = true;
  108. i = n; // recheck before commit
  109. }
  110. }
  111. // tab[i]迁移完毕,赋值一个ForwardingNode
  112. else if ((f = tabAt(tab, i)) == null)
  113. advance = casTabAt(tab, i, null, fwd);
  114. else if ((fh = f.hash) == MOVED) // tab[i]的位置已经在迁移过程中
  115. advance = true; // already processed
  116. else {
  117. // 对tab[i]进行迁移操作,tab[i]可能是一个链表或者红黑树
  118. synchronized (f) {
  119. if (tabAt(tab, i) == f) {
  120. Node<K,V> ln, hn;
  121. // 链表
  122. if (fh >= 0) {
  123. int runBit = fh & n;
  124. Node<K,V> lastRun = f;
  125. for (Node<K,V> p = f.next; p != null; p = p.next) {
  126. int b = p.hash & n;
  127. if (b != runBit) {
  128. runBit = b;
  129. // 表示lastRun之后的所有元素,hash值都是一样的
  130. // 记录下这个最后的位置
  131. lastRun = p;
  132. }
  133. }
  134. if (runBit == 0) {
  135. // 链表迁移的优化做法
  136. ln = lastRun;
  137. hn = null;
  138. }
  139. else {
  140. hn = lastRun;
  141. ln = null;
  142. }
  143. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  144. int ph = p.hash; K pk = p.key; V pv = p.val;
  145. if ((ph & n) == 0)
  146. ln = new Node<K,V>(ph, pk, pv, ln);
  147. else
  148. hn = new Node<K,V>(ph, pk, pv, hn);
  149. }
  150. setTabAt(nextTab, i, ln);
  151. setTabAt(nextTab, i + n, hn);
  152. setTabAt(tab, i, fwd);
  153. advance = true;
  154. }
  155. // 红黑树,迁移做法和链表类似
  156. else if (f instanceof TreeBin) {
  157. TreeBin<K,V> t = (TreeBin<K,V>)f;
  158. TreeNode<K,V> lo = null, loTail = null;
  159. TreeNode<K,V> hi = null, hiTail = null;
  160. int lc = 0, hc = 0;
  161. for (Node<K,V> e = t.first; e != null; e = e.next) {
  162. int h = e.hash;
  163. TreeNode<K,V> p = new TreeNode<K,V>
  164. (h, e.key, e.val, null, null);
  165. if ((h & n) == 0) {
  166. if ((p.prev = loTail) == null)
  167. lo = p;
  168. else
  169. loTail.next = p;
  170. loTail = p;
  171. ++lc;
  172. }
  173. else {
  174. if ((p.prev = hiTail) == null)
  175. hi = p;
  176. else
  177. hiTail.next = p;
  178. hiTail = p;
  179. ++hc;
  180. }
  181. }
  182. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  183. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  184. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  185. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  186. setTabAt(nextTab, i, ln);
  187. setTabAt(nextTab, i + n, hn);
  188. setTabAt(tab, i, fwd);
  189. advance = true;
  190. }
  191. }
  192. }
  193. }
  194. }
  195. }

该方法非常复杂,下面一步步分析:
1. 扩容的基本原理如下图,首先建一个新的HashMap,其数组长度是旧数组长度的2倍,然后把旧的元素逐个迁移过来。所以,上面的方法参数有2个,第1个参数tab是扩容之前的HashMap,第2个参数nextTab是扩容之后的HashMap。当nextTab=null的时候,方法最初会对nextTab进行初始化。这里有一个关键点要说明:该方法会被多个线程调用,所以每个线程只是扩容旧的HashMap部分,这就涉及如何划分任务的问题。

image.png
2. 上图为多个线程并行扩容-任务划分示意图。旧数组的长度是N,每个线程扩容一段,一段的长度用变量stride(步长)来表示,transferIndex表示了整个数组扩容的进度。stride的计算公式如上面的代码所示,即:在单核模式下直接等于n,因为在单核模式下没有办法多个线程并行扩容,只需要1个线程来扩容整个数组;在多核模式下为 (n>>>3)/NCPU,并且保证步长的最小值是 16。显然,需要的线程个数约为n/stride。
image.png
transferIndex是ConcurrentHashMap的一个成员变量,记录了扩容的进度。初始值为n,从大到小扩容,每次减stride个位置,最终减至n<=0,表示整个扩容完成。因此,从[0,transferIndex-1]的位置表示还没有分配到线程扩容的部分,从[transfexIndex,n-1]的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经扩容成功。
因为transferIndex会被多个线程并发修改,每次减stride,所以需要通过CAS进行操作,如下面的代码所示。
image.png
image.png
3. 在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里面,有的还在旧的HashMap 里面。这个时候,所有调用 get(k,v)的线程还是会访问旧 HashMap,怎么处理呢?
下图为扩容过程中的转发示意图:当Node[0]已经迁移成功,而其他Node还在迁移过程中时,如果有线程要读取Node[0]的数据,就会访问失败。为此,新建一个ForwardingNode,即转发节点,在这个节点里面记录的是新的 ConcurrentHashMap 的引用。这样,当线程访问到ForwardingNode之后,会去查询新的ConcurrentHashMap。
4. 因为数组的长度 tab.length 是2的整数次方,每次扩容又是2倍。而 Hash 函数是hashCode%tab.length,等价于hashCode&(tab.length-1)。这意味着:处于第i个位置的元素,在新的Hash表的数组中一定处于第i个或者第i+n个位置,如下图所示。
举个简单的例子:假设数组长度是8,扩容之后是16:
若hashCode=5,5%8=0,扩容后,5%16=0,位置保持不变;
若hashCode=24,24%8=0,扩容后,24%16=8,后移8个位置;
若hashCode=25,25%8=1,扩容后,25%16=9,后移8个位置;
若hashCode=39,39%8=7,扩容后,39%8=7,位置保持不变;
……
image.png
正因为有这样的规律,所以如下有代码:
image.png
也就是把tab[i]位置的链表或红黑树重新组装成两部分,一部分链接到nextTab[i]的位置,一部分链接到nextTab[i+n]的位置,如上图所示。然后把tab[i]的位置指向一个ForwardingNode节点。
同时,当tab[i]后面是链表时,使用类似于JDK 7中在扩容时的优化方法,从lastRun往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从lastRun往前的所有节点,需要依次拷贝。
了解了核心的迁移函数transfer(tab,nextTab),再回头看tryPresize(int size)函数。这个函数的输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容。想要看明白这个函数,需要透彻地理解sizeCtl变量,下面这段注释摘自源码。

  1. /**
  2. * Table initialization and resizing control. When negative, the
  3. * table is being initialized or resized: -1 for initialization,
  4. * else -(1 + the number of active resizing threads). Otherwise,
  5. * when table is null, holds the initial table size to use upon
  6. * creation, or 0 for default. After initialization, holds the
  7. * next element count value upon which to resize the table.
  8. */
  9. private transient volatile int sizeCtl;

当sizeCtl=-1时,表示整个HashMap正在初始化;
当sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容;
当sizeCtl=cap时,tab=null,表示未初始之前的初始容量(如上面的构造函数所示);
扩容成功之后,sizeCtl存储的是下一次要扩容的阈值,即上面初始化代码中的n-(n>>>2)=0.75n。
所以,sizeCtl变量在Hash表处于不同状态时,表达不同的含义。明白了这个道理,再来看上面的tryPresize(int size)函数。

  1. /**
  2. * Tries to presize table to accommodate the given number of elements.
  3. *
  4. * @param size number of elements (doesn't need to be perfectly accurate)
  5. */
  6. private final void tryPresize(int size) {
  7. int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
  8. tableSizeFor(size + (size >>> 1) + 1);
  9. int sc;
  10. while ((sc = sizeCtl) >= 0) {
  11. Node<K,V>[] tab = table; int n;
  12. if (tab == null || (n = tab.length) == 0) {
  13. n = (sc > c) ? sc : c;
  14. if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
  15. try {
  16. if (table == tab) {
  17. @SuppressWarnings("unchecked")
  18. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  19. table = nt;
  20. sc = n - (n >>> 2);
  21. }
  22. } finally {
  23. sizeCtl = sc;
  24. }
  25. }
  26. }
  27. else if (c <= sc || n >= MAXIMUM_CAPACITY)
  28. break;
  29. else if (tab == table) {
  30. int rs = resizeStamp(n);
  31. if (U.compareAndSetInt(this, SIZECTL, sc,
  32. (rs << RESIZE_STAMP_SHIFT) + 2))
  33. transfer(tab, null);
  34. }
  35. }
  36. }

tryPresize(int size)是根据期望的元素个数对整个Hash表进行扩容,核心是调用transfer函数。在第一次扩容的时候,sizeCtl会被设置成一个很大的负数
U.compareAndSwapInt(this,SIZECTL,sc,(rs << RESIZE_STAMP_SHIFT)+2);之后每一个线程扩容的时候,sizeCtl 就加 1,U.compareAndSwapInt(this,SIZECTL,sc,sc+1),待扩容完成之后,sizeCtl减1。

ConcurrentSkipListMap/Set

ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。
1.为什么要使用SkipList实现Map?
在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。
而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?
借用Doug Lea的原话:
The reason is that there are no known efficient lock0free insertion and deletion algorithms for search trees.

也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。

2.无锁链表
在前面讲解AQS时,曾反复用到无锁队列,其实现也是链表。究竟二者的区别在哪呢?
前面讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!
关于这个问题,Doug Lea的论文中有清晰的论述,此处引用如下:
操作1:在节点10后面插入节点20。如下图所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。
image.png
操作2:删除节点10。如下图所示,只需把头节点的next指针,进行CAS操作到节点30即可
image.png
但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如下图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。
image.png
为什么会出现这个问题呢?
究其原因:在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!
针对这个问题,在论文中提出了如下的解决办法,如下图所示,把节点 10 的删除分为两2步:
第一步,把节点10的next指针,mark成删除,即软删除;
第二步,找机会,物理删除。
做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10next
针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1 CAS操作里面完成!

image.png
具体的实现有两个办法:
办法一:AtomicMarkableReference
保证每个 next 是 AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
办法2:Mark节点
我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里面完成。

3.跳查表
解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的。
下面先看一下跳查表的数据结构(下面所用代码都引用自JDK 7,JDK 8中的代码略有差异,但不影响下面的原理分析)。
image.png
上图中的Node就是跳查表底层节点类型。所有的对都是由这个单向链表串起来的。
上面的Index层的节点:
image.png
上图中的node属性不存储实际数据,指向Node节点。
down属性:每个Index节点,必须有一个指针,指向其下一个Level对应的节点。
right属性:Index也组成单向链表。
整个ConcurrentSkipListMap就只需要记录顶层的head节点即可:

  1. public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
  2. // ...
  3. private transient Index<K,V> head;
  4. // ...
  5. }

image.png
下面详细分析如何从跳查表上查找、插入和删除元素
put实现分析
image.png

  1. /**
  2. * Main insertion method. Adds element if not present, or
  3. * replaces value if present and onlyIfAbsent is false.
  4. *
  5. * @param key the key
  6. * @param value the value that must be associated with key
  7. * @param onlyIfAbsent if should not insert if already present
  8. * @return the old value, or null if newly inserted
  9. */
  10. private V doPut(K key, V value, boolean onlyIfAbsent) {
  11. if (key == null)
  12. throw new NullPointerException();
  13. Comparator<? super K> cmp = comparator;
  14. for (;;) {
  15. Index<K,V> h; Node<K,V> b;
  16. VarHandle.acquireFence();
  17. int levels = 0; // number of levels descended
  18. if ((h = head) == null) { // try to initialize
  19. Node<K,V> base = new Node<K,V>(null, null, null);
  20. h = new Index<K,V>(base, null, null);
  21. b = (HEAD.compareAndSet(this, null, h)) ? base : null;
  22. }
  23. else {
  24. for (Index<K,V> q = h, r, d;;) { // count while descending
  25. while ((r = q.right) != null) {
  26. Node<K,V> p; K k;
  27. if ((p = r.node) == null || (k = p.key) == null ||
  28. p.val == null)
  29. RIGHT.compareAndSet(q, r, r.right);
  30. else if (cpr(cmp, key, k) > 0)
  31. q = r;
  32. else
  33. break;
  34. }
  35. if ((d = q.down) != null) {
  36. ++levels;
  37. q = d;
  38. }
  39. else {
  40. b = q.node;
  41. break;
  42. }
  43. }
  44. }
  45. if (b != null) {
  46. Node<K,V> z = null; // new node, if inserted
  47. for (;;) { // find insertion point
  48. Node<K,V> n, p; K k; V v; int c;
  49. if ((n = b.next) == null) {
  50. if (b.key == null) // if empty, type check key now
  51. cpr(cmp, key, key);
  52. c = -1;
  53. }
  54. else if ((k = n.key) == null)
  55. break; // can't append; restart
  56. else if ((v = n.val) == null) {
  57. unlinkNode(b, n);
  58. c = 1;
  59. }
  60. else if ((c = cpr(cmp, key, k)) > 0)
  61. b = n;
  62. else if (c == 0 &&
  63. (onlyIfAbsent || VAL.compareAndSet(n, v, value)))
  64. return v;
  65. if (c < 0 &&
  66. NEXT.compareAndSet(b, n,
  67. p = new Node<K,V>(key, value, n))) {
  68. z = p;
  69. break;
  70. }
  71. }
  72. if (z != null) {
  73. int lr = ThreadLocalRandom.nextSecondarySeed();
  74. if ((lr & 0x3) == 0) { // add indices with 1/4 prob
  75. int hr = ThreadLocalRandom.nextSecondarySeed();
  76. long rnd = ((long)hr << 32) | ((long)lr & 0xffffffffL);
  77. int skips = levels; // levels to descend before add
  78. Index<K,V> x = null;
  79. for (;;) { // create at most 62 indices
  80. x = new Index<K,V>(z, x, null);
  81. if (rnd >= 0L || --skips < 0)
  82. break;
  83. else
  84. rnd <<= 1;
  85. }
  86. if (addIndices(h, skips, x, cmp) && skips < 0 &&
  87. head == h) { // try to add new level
  88. Index<K,V> hx = new Index<K,V>(z, x, null);
  89. Index<K,V> nh = new Index<K,V>(h.node, h, hx);
  90. HEAD.compareAndSet(this, h, nh);
  91. }
  92. if (z.val == null) // deleted while adding indices
  93. findPredecessor(key, cmp); // clean
  94. }
  95. addCount(1L);
  96. return null;
  97. }
  98. }
  99. }
  100. }

在底层,节点按照从小到大的顺序排列,上面的index层间隔地串在一起,因为从小到大排列。查找的时候,从顶层index开始,自左往右、自上往下,形成图示的遍历曲线。假设要查找的元素是32,遍历过程如下:
先遍历第2层Index,发现在21的后面;
从21下降到第1层Index,从21往后遍历,发现在21和35之间;
从21下降到底层,从21往后遍历,最终发现在29和35之间。
在整个的查找过程中,范围不断缩小,最终定位到底层的两个元素之间
image.png
关于上面的put(…)方法,有一个关键点需要说明:在通过findPredecessor找到了待插入的元素在[b,n]之间之后,并不能马上插入。因为其他线程也在操作这个链表,b、n都有可能被删除,所以在插入之前执行了一系列的检查逻辑,而这也正是无锁链表的复杂之处。

remove(…)分析

  1. /**
  2. * Main deletion method. Locates node, nulls value, appends a
  3. * deletion marker, unlinks predecessor, removes associated index
  4. * nodes, and possibly reduces head index level.
  5. *
  6. * @param key the key
  7. * @param value if non-null, the value that must be
  8. * associated with key
  9. * @return the node, or null if not found
  10. */
  11. // 若找到了(key, value)就删除,并返回value;找不到就返回null
  12. final V doRemove(Object key, Object value) {
  13. if (key == null)
  14. throw new NullPointerException();
  15. Comparator<? super K> cmp = comparator;
  16. V result = null;
  17. Node<K,V> b;
  18. outer: while ((b = findPredecessor(key, cmp)) != null &&
  19. result == null) {
  20. for (;;) {
  21. Node<K,V> n; K k; V v; int c;
  22. if ((n = b.next) == null)
  23. break outer;
  24. else if ((k = n.key) == null)
  25. break;
  26. else if ((v = n.val) == null)
  27. unlinkNode(b, n);
  28. else if ((c = cpr(cmp, key, k)) > 0)
  29. b = n;
  30. else if (c < 0)
  31. break outer;
  32. else if (value != null && !value.equals(v))
  33. break outer;
  34. else if (VAL.compareAndSet(n, v, null)) {
  35. result = v;
  36. unlinkNode(b, n);
  37. break; // loop to clean up
  38. }
  39. }
  40. }
  41. if (result != null) {
  42. tryReduceLevel();
  43. addCount(-1L);
  44. }
  45. return result;
  46. }

上面的删除方法和插入方法的逻辑非常类似,因为无论是插入,还是删除,都要先找到元素的前驱,也就是定位到元素所在的区间[b,n]。在定位之后,执行下面几个步骤:
1. 如果发现b、n已经被删除了,则执行对应的删除清理逻辑;
2. 否则,如果没有找到待删除的(k, v),返回null;
3. 如果找到了待删除的元素,也就是节点n,则把n的value置为null,同时在n的后面加上Marker节点,同时检查是否需要降低Index的层次。

无论是插入、删除,还是查找,都有相似的逻辑,都需要先定位到元素位置[b,n],然后判断b、n是否已经被删除,如果是,则需要执行相应的删除清理逻辑。这也正是无锁链表复杂的地方。

ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装,每次插入数据前检验元素是否存在
image.png