同步集合介绍

Java 的集合分为两部分,以 Collection 为根接口的线性集合和以 Map 为根接口的双列集合。在不考虑到线程安全问题的情况下,我们会将 Collection 集合分为两类,有序的 List 和无序的 Set,这两类都是在 JDK2 由 Josh Bloch 和 Neal Gafter 主导开发的工具类。但是 JDK5 中 又加入了由 Doug Lea 主导开发的 FIFO 集合。

Queue 的一大作用就是它提供了很多线程安全的集合,而且在并发场景下 Queue 的实现类在提供了同种功能的情况下性能比传统使用 synchronized 更加的出色。

Java并发-05-同步集合 - 图1

Wikipedia” In systems design, a fail-fast system is one which immediately reports at its interface any condition that is likely to indicate a failure. Fail-fast systems are usually designed to stop normal operation rather than attempt to continue a possibly flawed process. Such designs often check the system’s state at several points in an operation, so any failures can be detected early. The responsibility of a fail-fast module is detecting errors, then letting the next-highest level of the system handle them.

Fail-Fast 是一种错误检查机制,旨在于能够早期发现可能造成错误的操作。它的目的是让错误早期的暴露出来以至于能在早期被解决。

集合的 Fail-Fast 主要是使用在迭代器中。当迭代器被创建后,除了迭代器本身的方法可以改变集合的结构外,其他的因素如若改变了集合的结构,都被抛出 ConcurrentModificationException 异常。这不只是限于多线程环境,单线程环境也是可以造成 Fail-Fast 的。

单线程 Fail-Fast

  1. public class Test1 {
  2. public static void main(String[] args) {
  3. List<Integer> list = new ArrayList<>();
  4. for (int i = 0; i < 20; i++) {
  5. list.add(i);
  6. }
  7. Iterator<Integer> it = list.iterator();
  8. int temp = 0;
  9. while (it.hasNext()) {
  10. if (temp == 3) {
  11. temp++;
  12. list.remove(3);
  13. } else {
  14. temp++;
  15. System.out.print(temp + " ");
  16. System.out.println(it.next());
  17. }
  18. }
  19. }
  20. }
  21. // 1 0
  22. // 2 1
  23. // 3 2
  24. // 5 Exception in thread "main" java.util.ConcurrentModificationException
  25. // at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
  26. // at java.util.ArrayList$Itr.next(ArrayList.java:859)
  27. // at juc.c_025.Test1.main(Test1.java:24)

多线程 Fail-Fast

  1. public class FastFailTest {
  2. private static List<String> list = new ArrayList<>();
  3. public static void main(String[] args) {
  4. // 同时启动两个线程对list进行操作!
  5. new ThreadOne().start();
  6. new ThreadTwo().start();
  7. }
  8. private static void printAll() {
  9. Iterator iter = list.iterator();
  10. while (iter.hasNext()) {
  11. System.out.print(iter.next() + ", ");
  12. }
  13. System.out.println();
  14. }
  15. // 向list中依次添加 0,1,2,3,4,5,每添加一个数之后,就通过printAll() 遍历整个list
  16. private static class ThreadOne extends Thread {
  17. public void run() {
  18. int i = 0;
  19. while (i < 6) {
  20. list.add(String.valueOf(i));
  21. printAll();
  22. i++;
  23. }
  24. }
  25. }
  26. // 向list中依次添加 10,11,12,13,14,15,每添加一个数之后,就通过 printAll() 遍历整个list
  27. private static class ThreadTwo extends Thread {
  28. public void run() {
  29. int i = 10;
  30. while (i < 16) {
  31. list.add(String.valueOf(i));
  32. printAll();
  33. i++;
  34. }
  35. }
  36. }
  37. }

Fail-Fast 基本原理

  1. public abstract class AbstractList<E> extends AbstractCollection<E> implements List<E>{
  2. // 这个标识着此集合被修改多少次
  3. protected transient int modCount = 0;
  4. private class Itr implements Iterator<E> {
  5. int cursor = 0;
  6. int lastRet = -1;
  7. // 初始化对象的时候将 modCount 赋值给 expectedModCount
  8. int expectedModCount = modCount;
  9. public boolean hasNext() {
  10. return cursor != size();
  11. }
  12. public E next() {
  13. checkForComodification();
  14. try {
  15. int i = cursor;
  16. E next = get(i);
  17. lastRet = i;
  18. cursor = i + 1;
  19. return next;
  20. } catch (IndexOutOfBoundsException e) {
  21. checkForComodification();
  22. throw new NoSuchElementException();
  23. }
  24. }
  25. final void checkForComodification() {
  26. // 如果迭代器里记录的 expectedModCount 和当前集合的 modCount 不一致,抛出异常
  27. if (modCount != expectedModCount)
  28. throw new ConcurrentModificationException();
  29. }
  30. }
  31. }

modCount 增加

  1. public boolean add(E e) {
  2. ensureCapacityInternal(size + 1); // Increments modCount!!
  3. elementData[size++] = e;
  4. return true;
  5. }
  6. private void ensureCapacityInternal(int minCapacity) {
  7. ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
  8. }
  9. private void ensureExplicitCapacity(int minCapacity) {
  10. modCount++;
  11. // overflow-conscious code
  12. if (minCapacity - elementData.length > 0)
  13. grow(minCapacity);
  14. }
  15. public E remove(int index) {
  16. rangeCheck(index);
  17. modCount++;
  18. E oldValue = elementData(index);
  19. int numMoved = size - index - 1;
  20. if (numMoved > 0)
  21. System.arraycopy(elementData, index+1, elementData, index, numMoved);
  22. elementData[--size] = null; // clear to let GC do its work
  23. return oldValue;
  24. }

CopyOnWriteArrayList

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问List的内部数据,毕竟读取操作是安全的。

CopyOnWriteArrayList 类的所有可变操作(add,set等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的 ArrayList,所谓CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

  1. private transient volatile Object[] array;
  2. public E get(int index) {
  3. return get(getArray(), index);
  4. }
  5. private E get(Object[] a, int index) {
  6. return (E) a[index];
  7. }
  8. final Object[] getArray() {
  9. return array;
  10. }

CopyOnWriteArrayList 写入操作 add() 方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

  1. private transient volatile Object[] array;
  2. public boolean add(E e) {
  3. final ReentrantLock lock = this.lock;
  4. lock.lock(); // 加锁
  5. try {
  6. Object[] elements = getArray();
  7. int len = elements.length;
  8. Object[] newElements = Arrays.copyOf(elements, len + 1);// 拷贝新数组
  9. newElements[len] = e;
  10. setArray(newElements);
  11. return true;
  12. } finally {
  13. lock.unlock(); // 释放锁
  14. }
  15. }
  16. final void setArray(Object[] a) {
  17. array = a;
  18. }

CopyOnWriteArrayList 测试

  1. // CopyOnWriteArrayList 写入速度很慢,适用于 读远大于写的情况
  2. public class T02_CopyOnWriteList {
  3. public static void main(String[] args) {
  4. List<String> lists =
  5. new CopyOnWriteArrayList<>();
  6. //new Vector<>();
  7. Random r = new Random();
  8. Thread[] ths = new Thread[100];
  9. for (int i = 0; i < ths.length; i++) {
  10. Runnable task = () -> {
  11. for (int i1 = 0; i1 < 1000; i1++)
  12. lists.add("a" + r.nextInt(10000));
  13. };
  14. ths[i] = new Thread(task);
  15. }
  16. runAndComputeTime(ths);
  17. System.out.println(lists.size());
  18. }
  19. static void runAndComputeTime(Thread[] ths) {
  20. long s1 = System.currentTimeMillis();
  21. Arrays.asList(ths).forEach(t -> t.start());
  22. Arrays.asList(ths).forEach(t -> {
  23. try {
  24. t.join();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. });
  29. long s2 = System.currentTimeMillis();
  30. System.out.println(s2 - s1);
  31. }
  32. }

ConcurrentSkipListXXX

跳表

对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低。跳表是一种采用空间换时间策略的能进行快速查找的数据结构。它们都可以对元素进行快速的查找。和平衡树相比,平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

跳表的本质是同时维护了多个链表,并且链表是分层的,

Java并发-05-同步集合 - 图2Java并发-05-同步集合 - 图3
使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListXXX。

JDK 实现

JDK 一共提供了两类跳表:ConcurrentSkipListMap和ConcurrentSkipListSet。我们使用 Map 进行测试。

  1. public class TestConcurrentSkipListMap {
  2. static final Random r = new Random(System.currentTimeMillis());
  3. public static void main(String[] args) {
  4. ConcurrentSkipListMap map =
  5. new ConcurrentSkipListMap(Comparator.comparingInt(o -> (int) o));
  6. // ConcurrentSkipListMap map =
  7. // new ConcurrentSkipListMap(Comparator.comparingInt(o -> (int) o).reversed());
  8. Thread []threads = new Thread[100];
  9. for (int j = 0; j < 100; j++) {
  10. threads[j] = new Thread(() -> {
  11. for (int i = 0; i < 1000; i++)
  12. map.put(r.nextInt(100000),"");
  13. });
  14. threads[j].start();
  15. }
  16. Arrays.stream(threads).forEach(item -> {
  17. try {
  18. item.join();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. });
  23. map.forEach((o, o2) -> System.out.print(o + ", "));
  24. }
  25. }

BlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;当队列为空时,尝试获取取一个元素也会阻塞。ArrayBlockingQueue 默认情况下不保证线程访问队列的公平性,但可以指定。

  1. public class T06_ArrayBlockingQueue {
  2. static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
  3. static Random r = new Random();
  4. public static void main(String[] args) throws InterruptedException {
  5. for (int i = 0; i < 10; i++) {
  6. strs.put("a" + i);
  7. }
  8. strs.put("aaa"); // 满了就会等待,程序阻塞
  9. strs.add("aaa"); // 满了就抛异常
  10. strs.offer("aaa"); // 满了就返回 false
  11. strs.offer("aaa", 1, TimeUnit.SECONDS); // 指定时间内成功返回 true,否则返回 false
  12. System.out.println(strs);
  13. }
  14. }

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,具有 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 时,会指定其大小。默认大小为 Integer.MAX_VALUE。LinkedBlockingQueue 只能是不公平状态。

相关构造方法:

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }
  4. public LinkedBlockingQueue(int capacity) {
  5. if (capacity <= 0) throw new IllegalArgumentException();
  6. this.capacity = capacity;
  7. last = head = new Node<E>(null);
  8. }

测试:

  1. public class T05_LinkedBlockingQueue {
  2. static BlockingQueue<String> strs = new LinkedBlockingQueue<>(10);
  3. static Random r = new Random();
  4. public static void main(String[] args) {
  5. new Thread(() -> {
  6. for (int i = 0; i < 100; i++) {
  7. try {
  8. strs.put("a" + i); // 如果满了,就会等待
  9. TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }, "p1").start();
  15. for (int i = 0; i < 5; i++) {
  16. new Thread(() -> {
  17. for (;;) {
  18. try {
  19. System.out.println(Thread.currentThread().getName() +
  20. " take -" + strs.take()); // 如果空了,就会等待
  21. TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }, "c" + i).start();
  27. }
  28. }
  29. }

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,所以自定义类可以通过实现 Comparable 方法来指定元素排序规则。初始化时也可以通过构造器参数 Comparator 来指定排序规则。(很明显,不是 FIFO 了)

PriorityBlockingQueue 并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

  1. public class T10_PriorityBlockingQueue {
  2. static BlockingQueue<String> strs = new PriorityBlockingQueue<>(10);
  3. static Random r = new Random();
  4. public static void main(String[] args) {
  5. new Thread(() -> {
  6. for (int i = 0; i < 100; i++) {
  7. try {
  8. strs.put("a" + i);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }, "p1").start();
  14. for (int i = 0; i < 5; i++) {
  15. new Thread(() -> {
  16. for (;;) {
  17. try {
  18. System.out.println(Thread.currentThread().getName() +
  19. " take -" + strs.take()); // 如果空了,就会等待
  20. TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }, "c" + i).start();
  26. }
  27. }
  28. }
  29. // 和 LinkedBlockingQueue 的区别就是,输出是自然排序的

SynchronousQueue

SynchronousQueue是一个比较特殊的阻塞队列,它具有以下几个特点:

  1. 一个调用插入方法的线程必须等待另一个线程调用取出方法;
  2. 队列没有容量Capacity(或者说容量为0),事实上队列中并不存储元素,它只是提供两个线程进行信息交换的场所;
  3. 于以上原因,队列在很多场合表现的像一个空队列。不能对元素进行迭代,不能peek元素,poll会返回null;
  4. 队列中不允许存入null元素。
  5. SynchronousQueue如同ArrayedBlockingQueue一样,支持“公平”策略,在构造函数中可以传入false或true表示是否支持该策略。
  1. public class T08_SynchronusQueue {
  2. public static void main(String[] args) throws InterruptedException {
  3. BlockingQueue<String> strs = new SynchronousQueue<>();
  4. new Thread(() -> {
  5. try {
  6. TimeUnit.SECONDS.sleep(10);
  7. System.out.println(strs.take());
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }).start();
  12. strs.put("aaa"); // 阻塞,等待消费者消费
  13. System.out.println(strs.size());
  14. }
  15. }

TransferQueue

TransferQueue提供了一个场所,生产者线程使用transfer方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。前面介绍的SynchronousQueue很像一个容量为0的TransferQueue。

  1. public class TransferQueueExam {
  2. public static void main(String[] args) {
  3. TransferQueue<String> queue = new LinkedTransferQueue<>();
  4. new Thread(new Producer("Producer1", queue)).start();
  5. new Thread(new Producer("Producer2", queue)).start();
  6. new Thread(new Consumer("Consumer1", queue)).start();
  7. new Thread(new Consumer("Consumer2", queue)).start();
  8. }
  9. static class Producer implements Runnable {
  10. private final String name;
  11. private final TransferQueue<String> queue;
  12. Producer(String name, TransferQueue<String> queue) {
  13. this.name = name;
  14. this.queue = queue;
  15. }
  16. @Override
  17. public void run() {
  18. System.out.println(name + " begin transfer objects");
  19. try {
  20. for (int i = 0; i < 10; i++) {
  21. queue.transfer(name + ": Product" + i);
  22. System.out.println(name + " transfer " + " Product" + i);
  23. }
  24. System.out.println("after transformation");
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. System.out.println(name + " is over");
  29. }
  30. }
  31. static class Consumer implements Runnable {
  32. private final String name;
  33. private final TransferQueue<String> queue;
  34. private static Random rand = new Random(System.currentTimeMillis());
  35. Consumer(String name, TransferQueue<String> queue) {
  36. this.name = name;
  37. this.queue = queue;
  38. }
  39. @Override
  40. public void run() {
  41. try {
  42. for (int i = 0; i < 5; i++) {
  43. String str = queue.take();
  44. System.out.println(name + " take " + str);
  45. TimeUnit.SECONDS.sleep(100);
  46. }
  47. System.out.println(name + " is over");
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }
  53. }

和SynchronizedQueue的区别

SynchronizedQueue 可以认为是一个一对一的模型,TransferQueue 是一个多对多的模型。

  1. public class T09_SynchronusQueue { // 容量为0
  2. public static void main(String[] args) throws InterruptedException {
  3. BlockingQueue<String> strs = new SynchronousQueue<>();
  4. new Thread(() -> {
  5. try {
  6. TimeUnit.SECONDS.sleep(1);
  7. System.out.println(strs.take());
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }).start();
  12. new Thread(() -> {
  13. try {
  14. strs.put("bbb");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }).start();
  19. new Thread(() -> {
  20. try {
  21. strs.put("aaa");
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }).start();
  26. System.out.println(strs.size());
  27. }
  28. }
  29. // 只会输出aaa,bbb会被覆盖。对换两个提供者线程,只会输出bbb,aaa会被覆盖。

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列。

Java并发-05-同步集合 - 图4

  1. private static class Node<E> {
  2. volatile E item;
  3. volatile Node<E> next;
  4. // 设置Node中的数据域item
  5. Node(E item) {
  6. UNSAFE.putObject(this, itemOffset, item);
  7. }
  8. // 更改Node中的指针域next
  9. boolean casItem(E cmp, E val) {
  10. return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  11. }
  12. // 设置Node中的指针域next
  13. void lazySetNext(Node<E> val) {
  14. UNSAFE.putOrderedObject(this, nextOffset, val);
  15. }
  16. // 更新Node中的指针域next
  17. boolean casNext(Node<E> cmp, Node<E> val) {
  18. return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  19. }
  20. private static final sun.misc.Unsafe UNSAFE;
  21. private static final long itemOffset;
  22. private static final long nextOffset;
  23. static {
  24. try {
  25. UNSAFE = sun.misc.Unsafe.getUnsafe();
  26. Class<?> k = Node.class;
  27. itemOffset = UNSAFE.objectFieldOffset
  28. (k.getDeclaredField("item"));
  29. nextOffset = UNSAFE.objectFieldOffset
  30. (k.getDeclaredField("next"));
  31. } catch (Exception e) {
  32. throw new Error(e);
  33. }
  34. }
  35. }

ConcurrentLinkedQueue 类有下面两个构造方法:

  1. // 默认构造方法,head节点存储的元素为空,tail节点等于head节点
  2. public ConcurrentLinkedQueue() {
  3. head = tail = new Node<E>(null);
  4. }
  5. // 根据其他集合来创建队列
  6. public ConcurrentLinkedQueue(Collection<? extends E> c) {
  7. Node<E> h = null, t = null;
  8. // 遍历节点
  9. for (E e : c) {
  10. // 若节点为null,则直接抛出 NullPointerException 异常
  11. checkNotNull(e);
  12. Node<E> newNode = new Node<E>(e);
  13. if (h == null)
  14. h = t = newNode;
  15. else {
  16. t.lazySetNext(newNode);
  17. t = newNode;
  18. }
  19. }
  20. if (h == null)
  21. h = t = new Node<E>(null);
  22. head = h;
  23. tail = t;
  24. }

可以看出没有元素的情况下 head 节点存储的元素为空,tail 节点等于 head 节点。

入队操作

由于 tail 和 head 都是 volatile 变量,对它们的读操作会远远高于写操作带来的消耗。为了减少 CAS 的操作,插入结点的时候并不会每次都将 tail 结点更新为最新插入的结点。而是更新两个结点之后才会hop一下。

Java并发-05-同步集合 - 图5
public boolean offer(E e) {
checkNotNull(e);
// 创建入队节点
final Node newNode = new Node(e);

  1. // 循环 CAS 直到入队成功
  2. // 1、根据tail节点定位出尾节点(last node);
  3. // 2、将新节点置为尾节点的下一个节点;
  4. // 3、casTail更新尾节点
  5. for (Node<E> t = tail, p = t;;) {
  6. // p用来表示队列的尾节点,初始情况下等于 tail 节点
  7. // q是p的next节点
  8. Node<E> q = p.next;
  9. // 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
  10. // 如果p是尾节点
  11. if (q == null) {
  12. // 设置p节点的下一个节点为新节点,设置成功则casNext返回true;
  13. // 否则返回false,说明有其他线程更新过尾节点,此时就会回到14行再次循环
  14. if (p.casNext(null, newNode)) {
  15. // 如果p != t,则将入队节点设置成tail节点,
  16. // 更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
  17. if (p != t) // 此时一次更新两个结点
  18. casTail(t, newNode); // Failure is OK
  19. return true;
  20. }
  21. }
  22. // 多线程操作时候,由于 poll 时候会把旧的head变为自引用,然后将head的next设置为新的head
  23. // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
  24. else if (p == q)
  25. // 此时队列已经丢了,我们需要从定位队列。如果 tail 更新到了新队列就使用 tail
  26. // 如果tail没有更新到新队列,使用新队列的 head 进行遍历
  27. p = (t != (t = tail)) ? t : head;
  28. // 寻找尾节点
  29. else
  30. // Check for tail updates after two hops.
  31. p = (p != t && t != (t = tail)) ? t : q;
  32. }

}

  1. ### 出队操作
  2. 并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出head节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新 head 节点。这种做法通过减少使用 CAS 更新 head 节点的消耗,从而提高出队效率。
  3. <div align="center"><img width="80%" src="http://blogfileqiniu.isjinhao.site/e030ae8e-b768-43a7-a632-2d9df6d5daff" /></div>
  4. ```java
  5. public E poll() {
  6. restartFromHead:
  7. for (;;) {
  8. for (Node<E> h = head, p = h, q;;) {
  9. E item = p.item;
  10. // head 里有元素的时候,直接替换为空并出队
  11. if (item != null && p.casItem(item, null)) {
  12. if (p != h) // 一次跳两个结点
  13. updateHead(h, ((q = p.next) != null) ? q : p);
  14. return item;
  15. }
  16. // 此时说明只有一个头结点
  17. else if ((q = p.next) == null) {
  18. updateHead(h, p);
  19. return null;
  20. }
  21. // 此时需要重定位head
  22. else if (p == q)
  23. continue restartFromHead;
  24. else
  25. p = q; // p 向前移动
  26. }
  27. }
  28. }
  29. final void updateHead(Node<E> h, Node<E> p) {
  30. // head 指向 p
  31. if (h != p && casHead(h, p))
  32. // h 的 next 指向自己
  33. h.lazySetNext(h);
  34. }

ConcurrentHashMap

重要成员属性的介绍

  1. // The array of bins. Lazily initialized upon first insertion.
  2. // Size is always a power of two. Accessed directly by iterators.
  3. transient volatile Node<K,V>[] table;
  4. // The next table to use; non-null only while resizing.
  5. private transient volatile Node<K,V>[] nextTable;
  6. // Base counter value, used mainly when there is no contention,
  7. // but also as a fallback during table initialization races. Updated via CAS.
  8. private transient volatile long baseCount;
  9. // Table initialization and resizing control. When negative, the
  10. // table is being initialized or resized:
  11. // 1. -1 for initialization, else
  12. // 2. -(1 + the number of active resizing threads).
  13. // Otherwise, when table is null, holds the initial table size to use upon
  14. // creation, or 0 for default. After initialization, holds the
  15. // next element count value upon which to resize the table.
  16. private transient volatile int sizeCtl;
  17. static class Node<K,V> implements Map.Entry<K,V> {
  18. final int hash;
  19. final K key;
  20. volatile V val; // volatile
  21. volatile Node<K,V> next; // volatile
  22. }

增加结点

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }

putVal 的方法比较多,我们分两个部分进行分析。先看初始化。

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode());
  4. int binCount = 0;
  5. for (Node<K,V>[] tab = table;;) {
  6. Node<K,V> f; int n, i, fh;
  7. if (tab == null || (n = tab.length) == 0)
  8. tab = initTable();

initTable()

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. if ((sc = sizeCtl) < 0)
  5. Thread.yield(); // lost initialization race; just spin
  6. // 正在初始化则 sc 为-1。(Object o, long offset, int expected, int x);
  7. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  8. try {
  9. // 再次判断表是否为空
  10. if ((tab = table) == null || tab.length == 0) {
  11. // 由于 sizeCtl 和 sc 进行了Swap。sc 大于0则使用 sc
  12. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  13. @SuppressWarnings("unchecked")
  14. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  15. table = tab = nt;
  16. // 等效于 sc *= 0.75
  17. sc = n - (n >>> 2);
  18. }
  19. } finally {
  20. sizeCtl = sc;
  21. }
  22. break;
  23. }
  24. }
  25. return tab;
  26. }

当前桶的第一个结点

  1. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  2. // expect 为 null 时,设置 Node
  3. if (casTabAt(tab, i, null,
  4. new Node<K,V>(hash, key, value, null)))
  5. break; // no lock when adding to empty bin
  6. }

检测到桶结点是 ForwardingNode 类型,协助扩容

  1. // f.hash 并不是 hashCode()。是一种结点状态
  2. else if ((fh = f.hash) == MOVED)
  3. tab = helpTransfer(tab, f); // 后面单独分析

插入结点

  1. else {
  2. V oldVal = null;
  3. // f 是桶的第一个结点
  4. synchronized (f) {
  5. if (tabAt(tab, i) == f) {
  6. // 正常的按链表插入
  7. if (fh >= 0) {
  8. binCount = 1;
  9. for (Node<K,V> e = f;; ++binCount) {
  10. K ek;
  11. // 如果是 equal() 的结点
  12. if (e.hash == hash && ((ek = e.key) == key ||
  13. (ek != null && key.equals(ek)))) {
  14. oldVal = e.val;
  15. if (!onlyIfAbsent) // 是否仅当不存在此结点才插入
  16. e.val = value;
  17. break;
  18. }
  19. // 非 equal() 的结点
  20. Node<K,V> pred = e;
  21. if ((e = e.next) == null) {
  22. pred.next = new Node<K,V>(hash, key, value, null);
  23. break;
  24. }
  25. }
  26. }
  27. // 向红黑树中添加元素,TreeBin 结点的 hash 值为 TREEBIN(-2)
  28. else if (f instanceof TreeBin) {
  29. Node<K,V> p;
  30. binCount = 2;
  31. // 如果新增一个结点,返回 null,如果已存在此结点返回旧结点
  32. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
  33. oldVal = p.val;
  34. if (!onlyIfAbsent)
  35. p.val = value;
  36. }
  37. }
  38. }
  39. }
  40. // binCount == 0 说明向红黑树中添加或修改一个节点成功
  41. // binCount == 0 说明 put 操作将一个新节点添加成为某个桶的首节点
  42. // binCount > 2 说明向一个链表中插入结点
  43. // TREEIFY_THRESHOLD 最小为2
  44. if (binCount != 0) {
  45. if (binCount >= TREEIFY_THRESHOLD)
  46. treeifyBin(tab, i);
  47. if (oldVal != null)
  48. return oldVal;
  49. break;
  50. }
  51. }
  52. }
  53. // CAS 更新baseCount,并判断是否需要扩容
  54. addCount(1L, binCount);
  55. // 程序走到这一步说明此次 put 操作是一个添加操作,否则早就 return 返回了
  56. return null;
  57. }

看完 pulVal() 的大体逻辑之后,我们再来看看被跳过的并发时协助扩容的场景。

ForwardingNode

  1. static final class ForwardingNode<K,V> extends Node<K,V> {
  2. final Node<K,V>[] nextTable;
  3. ForwardingNode(Node<K,V>[] tab) {
  4. // 状态为 MOVED,与上面的代码块对应
  5. super(MOVED, null, null, null);
  6. this.nextTable = tab;
  7. }
  8. }

这个节点内部保存了 nextTable 的引用,它指向一张 hash 表。在扩容操作中,我们需要对每个桶中的结点进行分离和转移,如果某个桶结点中所有节点都已经迁移完成了(已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。

ForwardingNode 继承自 Node 结点,并且它唯一的构造函数将构建一个键,值 next 都为 null 的结点,反正它就是个标识,无需那些属性。但是 hash 值却为 MOVED。

所以,我们在 putVal 方法中遍历整个 hash 表的桶结点,如果遇到 hash 值等于 MOVED,说明已经有线程正在扩容 rehash 操作,整体上还未完成,不过我们要插入的桶的位置已经完成了所有节点的迁移。

由于检测到当前哈希表正在扩容,于是让当前线程去协助扩容。

helpTransfer()

  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2. Node<K,V>[] nextTab; int sc;
  3. // 旧表不为空 && 结点是 ForwardingNode 类型 && 新表不为空
  4. if (tab != null && (f instanceof ForwardingNode) &&
  5. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  6. // 默认情况下:RESIZE_STAMP_SHIFT 和 RESIZE_STAMP_BITS 都是 16
  7. // rs 的后16位表示
  8. int rs = resizeStamp(tab.length);
  9. while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
  10. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  11. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  12. break;
  13. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  14. transfer(tab, nextTab);
  15. break;
  16. }
  17. }
  18. return nextTab;
  19. }
  20. return table;
  21. }

transfer()

  1. // 第一部分
  2. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3. int n = tab.length, stride;
  4. // 计算单个线程允许处理的最少table桶首节点个数,不能小于 16
  5. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  6. stride = MIN_TRANSFER_STRIDE;
  7. // 刚开始扩容,初始化 nextTab
  8. //
  9. if (nextTab == null) {
  10. try {
  11. @SuppressWarnings("unchecked")
  12. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 扩大二倍
  13. nextTab = nt;
  14. } catch (Throwable ex) {
  15. sizeCtl = Integer.MAX_VALUE;
  16. return;
  17. }
  18. nextTable = nextTab;
  19. // transferIndex 指向最后一个桶,方便从后向前遍历
  20. transferIndex = n;
  21. }
  22. int nextn = nextTab.length;
  23. // 定义 ForwardingNode 用于标记迁移完成的桶
  24. // 当旧表数组槽为空或元素复制完后,旧表的数组槽放入该ForwardingNode<K,V>结点,
  25. // 其他线程若读到该结点,则会在新表中读,若发生写则会帮助扩容
  26. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

这部分代码还是比较简单的,主要完成的是对单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。

  1. // 第二部分,并发扩容控制的核心
  2. // advance开始为true时会进入下面的while循环申请一段数组槽中元素赋值的任务,
  3. // 完成一个旧数组槽的复制后也会置为true
  4. boolean advance = true;
  5. // 是否完成所有复制的标志位
  6. boolean finishing = false;
  7. // i 指向当前桶,bound 指向当前线程需要处理的桶结点的区间下限
  8. for (int i = 0, bound = 0;;) {
  9. Node<K,V> f; int fh;
  10. // 这个 while 循环的目的就是通过 --i 遍历当前线程所分配到的桶结点
  11. // 一个桶一个桶的处理
  12. while (advance) {
  13. int nextIndex, nextBound;
  14. if (--i >= bound || finishing)
  15. advance = false;
  16. // transferIndex <= 0 说明已经没有需要迁移的桶了
  17. else if ((nextIndex = transferIndex) <= 0) {
  18. i = -1;
  19. advance = false;
  20. }
  21. // 更新 transferIndex
  22. // 为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)
  23. else if (U.compareAndSwapInt(this,
  24. TRANSFERINDEX,
  25. nextIndex,
  26. nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
  27. bound = nextBound;
  28. i = nextIndex - 1;
  29. advance = false;
  30. }
  31. }
  32. // 当前线程所有任务完成
  33. if (i < 0 || i >= n || i + n >= nextn) {
  34. int sc;
  35. if (finishing) {
  36. nextTable = null;
  37. table = nextTab;
  38. sizeCtl = (n << 1) - (n >>> 1);
  39. return;
  40. }
  41. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  42. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  43. return;
  44. finishing = advance = true;
  45. i = n;
  46. }
  47. }
  48. // 待迁移桶为空,那么在此位置 CAS 添加 ForwardingNode 结点标识该桶已经被处理过了
  49. else if ((f = tabAt(tab, i)) == null)
  50. advance = casTabAt(tab, i, null, fwd);
  51. // 如果扫描到 ForwardingNode,说明此桶已经被处理过了,跳过即可
  52. else if ((fh = f.hash) == MOVED)
  53. advance = true;