一,使用

ConcurrentHashMap 是 J.U.C 包里面提供的一个线程安全并且高效的 HashMap,所以ConcurrentHashMap 在并发编程的场景中使用的频率比较高。

ConcurrentHashMap 是 Map 的派生类,所以 api 基本和 Hashmap 是类似,主要就是 put、get 这些方法,接下来基于 ConcurrentHashMap 的 put 和 get 这两个方法作为切入点来分析 ConcurrentHashMap 的源码实现。

二,源码

JDK 1.8 ConcurrentHashMap结构图..png

1.成员变量

  1. //散列表数组的最大限制
  2. private static final int MAXIMUM_CAPACITY = 1 << 30;
  3. //散列表默认值
  4. private static final int DEFAULT_CAPACITY = 16;
  5. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  6. //并发级别:jdk7历史遗留问题,仅仅在初始化的时候使用到,并不是真正的代表并发级别
  7. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  8. //负载因子,JDK1.8中 ConcurrentHashMap 是固定值
  9. private static final float LOAD_FACTOR = 0.75f;
  10. //树化阈值,指定桶位 链表长度达到8的话,有可能发生树化操作。
  11. static final int TREEIFY_THRESHOLD = 8;
  12. //红黑树转化为链表的阈值
  13. static final int UNTREEIFY_THRESHOLD = 6;
  14. //联合TREEIFY_THRESHOLD控制桶位是否树化,只有当table数组长度达到64且 某个桶位 中的链表长度达到8,才会真正树化
  15. static final int MIN_TREEIFY_CAPACITY = 64;
  16. //线程迁移数据最小步长,控制线程迁移任务最小区间一个值
  17. private static final int MIN_TRANSFER_STRIDE = 16;
  18. //计算扩容时候生成的一个 标识戳
  19. private static int RESIZE_STAMP_BITS = 16;
  20. //结果是65535 表示并发扩容最多线程数
  21. private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
  22. //扩容相关
  23. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
  24. //当node节点hash=-1 表示当前节点已经被迁移了 ,fwd节点
  25. static final int MOVED = -1; // hash for forwarding nodes
  26. //node hash=-2 表示当前节点已经树化 且 当前节点为treebin对象 ,代理操作红黑树
  27. static final int TREEBIN = -2; // hash for roots of trees
  28. static final int RESERVED = -3; // hash for transient reservations
  29. //转化成二进制实际上是 31个 1 可以将一个负数通过位移运算得到一个正数
  30. static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
  31. //当前系统的cpu数量
  32. static final int NCPU = Runtime.getRuntime().availableProcessors();
  33. //为了兼容7版本的chp保存的,核心代码并没有使用到
  34. private static final ObjectStreamField[] serialPersistentFields = {
  35. new ObjectStreamField("segments", Segment[].class),
  36. new ObjectStreamField("segmentMask", Integer.TYPE),
  37. new ObjectStreamField("segmentShift", Integer.TYPE)
  38. };
  39. //散列表,长度一定是2次方数
  40. transient volatile Node<K,V>[] table;
  41. //扩容过程中,会将扩容中的新table 赋值给nextTable 保持引用,扩容结束之后,这里会被设置为Null
  42. private transient volatile Node<K,V>[] nextTable;
  43. //LongAdder 中的 baseCount 未发生竞争时 或者 当前LongAdder处于加锁状态时,增量累到到baseCount中
  44. private transient volatile long baseCount;
  45. /**
  46. * sizeCtl < 0
  47. * 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待..
  48. * 2.表示当前table数组正在进行扩容 ,高16位表示:扩容的标识戳 低16位表示:(1 + nThread) 当前参与并发扩容的线程数量
  49. *
  50. * sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
  51. *
  52. * sizeCtl > 0
  53. *
  54. * 1. 如果table未初始化,表示初始化大小
  55. * 2. 如果table已经初始化,表示下次扩容时的 触发条件(阈值)
  56. */
  57. private transient volatile int sizeCtl;
  58. /**
  59. *
  60. * 扩容过程中,记录当前进度。所有线程都需要从transferIndex中分配区间任务,去执行自己的任务。
  61. */
  62. private transient volatile int transferIndex;
  63. /**
  64. * LongAdder中的cellsBuzy 0表示当前LongAdder对象无锁状态,1表示当前LongAdder对象加锁状态
  65. */
  66. private transient volatile int cellsBusy;
  67. /**
  68. * LongAdder中的cells数组,当baseCount发生竞争后,会创建cells数组,
  69. * 线程会通过计算hash值 取到 自己的cell ,将增量累加到指定cell中
  70. * 总数 = sum(cells) + baseCount
  71. */
  72. private transient volatile CounterCell[] counterCells;
  73. // Unsafe mechanics
  74. private static final sun.misc.Unsafe U;
  75. /**表示sizeCtl属性在ConcurrentHashMap中内存偏移地址*/
  76. private static final long SIZECTL;
  77. /**表示transferIndex属性在ConcurrentHashMap中内存偏移地址*/
  78. private static final long TRANSFERINDEX;
  79. /**表示baseCount属性在ConcurrentHashMap中内存偏移地址*/
  80. private static final long BASECOUNT;
  81. /**表示cellsBusy属性在ConcurrentHashMap中内存偏移地址*/
  82. private static final long CELLSBUSY;
  83. /**表示cellValue属性在CounterCell中内存偏移地址*/
  84. private static final long CELLVALUE;
  85. /**表示数组第一个元素的偏移地址*/
  86. private static final long ABASE;
  87. private static final int ASHIFT;
  88. static {
  89. try {
  90. U = sun.misc.Unsafe.getUnsafe();
  91. Class<?> k = ConcurrentHashMap.class;
  92. SIZECTL = U.objectFieldOffset
  93. (k.getDeclaredField("sizeCtl"));
  94. TRANSFERINDEX = U.objectFieldOffset
  95. (k.getDeclaredField("transferIndex"));
  96. BASECOUNT = U.objectFieldOffset
  97. (k.getDeclaredField("baseCount"));
  98. CELLSBUSY = U.objectFieldOffset
  99. (k.getDeclaredField("cellsBusy"));
  100. Class<?> ck = CounterCell.class;
  101. CELLVALUE = U.objectFieldOffset
  102. (ck.getDeclaredField("value"));
  103. Class<?> ak = Node[].class;
  104. ABASE = U.arrayBaseOffset(ak);
  105. //表示数组单元所占用空间大小,scale 表示Node[]数组中每一个单元所占用空间大小
  106. int scale = U.arrayIndexScale(ak);
  107. //1 0000 & 0 1111 = 0
  108. if ((scale & (scale - 1)) != 0)
  109. throw new Error("data type scale not a power of two");
  110. //numberOfLeadingZeros() 这个方法是返回当前数值转换为二进制后,从高位到低位开始统计,看有多少个0连续在一块。
  111. //8 => 1000 numberOfLeadingZeros(8) = 28
  112. //4 => 100 numberOfLeadingZeros(4) = 29
  113. //ASHIFT = 31 - 29 = 2 ??
  114. //ABASE + (5 << ASHIFT)
  115. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  116. } catch (Exception e) {
  117. throw new Error(e);
  118. }
  119. }

2.基础方法

2.1 spread

高位运算

  1. static final int spread(int h) {
  2. return (h ^ (h >>> 16)) & HASH_BITS;
  3. }

2.2 tabAt

该方法获取对象中offset偏移地址对应的对象field的值。实际上这段代码的含义等价于tab[i],但是为什么不直接使用 tab[i]来计算呢?

getObjectVolatile,一旦看到 volatile 关键字,就表示可见性。因为对 volatile 写操作 happen-before 于 volatile 读操作,因此其他线程对 table 的修改均对 get 读取可见;

虽然 table 数组本身是增加了 volatile 属性,但是“volatile 的数组只针对数组的引用具有volatile 的语义,而不是它的元素”。 所以如果有其他线程对这个数组的元素进行写操作,那么当前线程来读的时候不一定能读到最新的值。出于性能考虑,Doug Lea 直接通过 Unsafe 类来对 table 进行操作。
image.png

  1. static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  2. return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
  3. }

2.3 casTabAt

cas设置当前节点为桶位的头节点

  1. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
  2. Node<K,V> c, Node<K,V> v) {
  3. return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
  4. }

2.4 setTabAt

  1. static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
  2. U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
  3. }

2.5 resizeStamp

resizeStamp 用来生成一个和扩容有关的扩容戳,具体有什么作用呢?

  1. static final int resizeStamp(int n) {
  2. return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
  3. }

Integer.numberOfLeadingZeros 这个方法是返回无符号整数 n 最高位非 0 位前面的 0 的个数。

比如 10 的二进制是 0000 0000 0000 0000 0000 0000 0000 1010,那么这个方法返回的值就是 28。

根据 resizeStamp 的运算逻辑,我们来推演一下,假如 n=16,那么 resizeStamp(16)=32796转化为二进制是[0000 0000 0000 0000 1000 0000 0001 1100]

接着再来看,当第一个线程尝试进行扩容的时候,会执行下面这段代码:

  1. U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)

rs 左移 16 位,相当于原本的二进制低位变成了高位 1000 0000 0001 1100 0000 0000 00000000

然后再+2 =1000 0000 0001 1100 0000 0000 0000 0000+10=1000 0000 0001 1100 0000 00000000 0010

高 16 位代表扩容的标记、低 16 位代表并行扩容的线程数

这样来存储有什么好处呢?

1,首先在 CHM 中是支持并发扩容的,也就是说如果当前的数组需要进行扩容操作,可以由多个线程来共同负责

2,可以保证每次扩容都生成唯一的生成戳,每次新的扩容,都有一个不同的 n,这个生成戳就是根据 n 来计算出来的一个数字,n 不同,这个数字也不同

第一个线程尝试扩容的时候,为什么是+2

因为 1 表示初始化,2 表示一个线程在执行扩容,而且对 sizeCtl 的操作都是基于位运算的,所以不会关心它本身的数值是多少,只关心它在二进制上的数值,而 sc + 1 会在低 16 位上加 1。

2.6 tableSizeFor

经过多次位移返回大于等于c的最小的二次方数

  1. /**
  2. * Returns a power of two table size for the given desired capacity.
  3. * See Hackers Delight, sec 3.2
  4. * 返回>=c的最小的2的次方数
  5. * c=28
  6. * n=27 => 0b 11011
  7. * 11011 | 01101 => 11111
  8. * 11111 | 00111 => 11111
  9. * ....
  10. * => 11111 + 1 =100000 = 32
  11. */
  12. private static final int tableSizeFor(int c) {
  13. int n = c - 1;
  14. n |= n >>> 1;
  15. n |= n >>> 2;
  16. n |= n >>> 4;
  17. n |= n >>> 8;
  18. n |= n >>> 16;
  19. return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
  20. }

3. 构造方法

  1. public ConcurrentHashMap() {
  2. }
  3. public ConcurrentHashMap(int initialCapacity) {
  4. if (initialCapacity < 0)
  5. throw new IllegalArgumentException();
  6. //如果指定的容量超过允许的最大值,设置为最大值
  7. int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
  8. MAXIMUM_CAPACITY :
  9. tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
  10. /**
  11. * sizeCtl > 0
  12. * 当目前table未初始化时,sizeCtl表示初始化容量
  13. */
  14. this.sizeCtl = cap;
  15. }
  16. public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
  17. this.sizeCtl = DEFAULT_CAPACITY;
  18. putAll(m);
  19. }
  20. public ConcurrentHashMap(int initialCapacity, float loadFactor) {
  21. this(initialCapacity, loadFactor, 1);
  22. }
  23. public ConcurrentHashMap(int initialCapacity,
  24. float loadFactor, int concurrencyLevel) {
  25. //参数校验
  26. if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
  27. throw new IllegalArgumentException();
  28. //如果初始容量小于并发级别,那就设置初始容量为并发级别
  29. if (initialCapacity < concurrencyLevel)
  30. initialCapacity = concurrencyLevel;
  31. //16/0.75 +1 = 22
  32. long size = (long)(1.0 + (long)initialCapacity / loadFactor);
  33. // 22 - > 32
  34. int cap = (size >= (long)MAXIMUM_CAPACITY) ?
  35. MAXIMUM_CAPACITY : tableSizeFor((int)size);
  36. /**
  37. * sizeCtl > 0
  38. * 当目前table未初始化时,sizeCtl表示初始化容量
  39. */
  40. this.sizeCtl = cap;
  41. }

4.put

  1. public V put(K key, V value) {
  2. //如果key已经存在,是否覆盖,默认是false
  3. return putVal(key, value, false);
  4. }

5 putVal

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. //控制k 和 v 不能为null
  3. if (key == null || value == null) throw new NullPointerException();
  4. //通过spread方法,可以让高位也能参与进寻址运算。
  5. int hash = spread(key.hashCode());
  6. //binCount表示当前k-v 封装成node后插入到指定桶位后,在桶位中的所属链表的下标位置
  7. //0 表示当前桶位为null,node可以直接放着
  8. //2 表示当前桶位已经可能是红黑树
  9. int binCount = 0;
  10. //tab 引用map对象的table
  11. //自旋
  12. for (Node<K,V>[] tab = table;;) {
  13. //f 表示桶位的头结点
  14. //n 表示散列表数组的长度
  15. //i 表示key通过寻址计算后,得到的桶位下标
  16. //fh 表示桶位头结点的hash值
  17. Node<K,V> f; int n, i, fh;
  18. //CASE1:成立,表示当前map中的table尚未初始化..
  19. if (tab == null || (n = tab.length) == 0)
  20. //最终当前线程都会获取到最新的map.table引用。
  21. tab = initTable();
  22. //CASE2:i 表示key使用路由寻址算法得到 key对应 table数组的下标位置,tabAt 获取指定桶位的头结点 f
  23. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  24. //进入到CASE2代码块 前置条件 当前table数组i桶位是Null时。
  25. //使用CAS方式 设置 指定数组i桶位 为 new Node<K,V>(hash, key, value, null),并且期望值是null
  26. //cas操作成功 表示ok,直接break for循环即可
  27. //cas操作失败,表示在当前线程之前,有其它线程先你一步向指定i桶位设置值了。
  28. //当前线程只能再次自旋,去走其它逻辑。
  29. if (casTabAt(tab, i, null,
  30. new Node<K,V>(hash, key, value, null)))
  31. break; // no lock when adding to empty bin
  32. }
  33. //CASE3:前置条件,桶位的头结点一定不是null。
  34. //条件成立表示当前桶位的头结点 为 FWD结点,表示目前map正处于扩容过程中..
  35. else if ((fh = f.hash) == MOVED)
  36. //看到fwd节点后,当前节点有义务帮助当前map对象完成迁移数据的工作
  37. //帮助扩容
  38. tab = helpTransfer(tab, f);
  39. //CASE4:当前桶位 可能是 链表 也可能是 红黑树代理结点TreeBin
  40. else {
  41. //当插入key存在时,会将旧值赋值给oldVal,返回给put方法调用处..
  42. V oldVal = null;
  43. //使用sync 加锁“头节点”,理论上是“头结点”
  44. synchronized (f) {
  45. //为什么又要对比一下,看看当前桶位的头节点 是否为 之前获取的头结点?
  46. //为了避免其它线程将该桶位的头结点修改掉,导致当前线程从sync 加锁 就有问题了。之后所有操作都不用在做了。
  47. if (tabAt(tab, i) == f) {//条件成立,说明咱们 加锁 的对象没有问题,可以进来造了!
  48. //条件成立,说明当前桶位就是普通链表桶位。
  49. if (fh >= 0) {
  50. //1.当前插入key与链表当中所有元素的key都不一致时,当前的插入操作是追加到链表的末尾,binCount表示链表长度
  51. //2.当前插入key与链表当中的某个元素的key一致时,当前插入操作可能就是替换了。binCount表示冲突位置(binCount - 1)
  52. binCount = 1;
  53. //迭代循环当前桶位的链表,e是每次循环处理节点。
  54. for (Node<K,V> e = f;; ++binCount) {
  55. //当前循环节点 key
  56. K ek;
  57. //条件一:e.hash == hash 成立 表示循环的当前元素的hash值与插入节点的hash值一致,需要进一步判断
  58. //条件二:((ek = e.key) == key ||(ek != null && key.equals(ek)))
  59. // 成立:说明循环的当前节点与插入节点的key一致,发生冲突了
  60. if (e.hash == hash &&
  61. ((ek = e.key) == key ||
  62. (ek != null && key.equals(ek)))) {
  63. //将当前循环的元素的 值 赋值给oldVal
  64. oldVal = e.val;
  65. if (!onlyIfAbsent)
  66. e.val = value;
  67. break;
  68. }
  69. //当前元素 与 插入元素的key不一致 时,会走下面程序。
  70. //1.更新循环处理节点为 当前节点的下一个节点
  71. //2.判断下一个节点是否为null,如果是null,说明当前节点已经是队尾了,插入数据需要追加到队尾节点的后面。
  72. Node<K,V> pred = e;
  73. if ((e = e.next) == null) {
  74. pred.next = new Node<K,V>(hash, key,
  75. value, null);
  76. break;
  77. }
  78. }
  79. }
  80. //前置条件,该桶位一定不是链表
  81. //条件成立,表示当前桶位是 红黑树代理结点TreeBin
  82. else if (f instanceof TreeBin) {
  83. //p 表示红黑树中如果与你插入节点的key 有冲突节点的话 ,则putTreeVal 方法 会返回冲突节点的引用。
  84. Node<K,V> p;
  85. //强制设置binCount为2,因为binCount <= 1 时有其它含义,所以这里设置为了2
  86. binCount = 2;
  87. //条件一:成立,说明当前插入节点的key与红黑树中的某个节点的key一致,冲突了
  88. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  89. value)) != null) {
  90. //将冲突节点的值 赋值给 oldVal
  91. oldVal = p.val;
  92. if (!onlyIfAbsent)
  93. p.val = value;
  94. }
  95. }
  96. }
  97. }
  98. //说明当前桶位不为null,可能是红黑树 也可能是链表
  99. if (binCount != 0) {
  100. //如果binCount>=8 表示处理的桶位一定是链表
  101. if (binCount >= TREEIFY_THRESHOLD)
  102. //调用转化链表为红黑树的方法
  103. treeifyBin(tab, i);
  104. //说明当前线程插入的数据key,与原有k-v发生冲突,需要将原数据v返回给调用者。
  105. if (oldVal != null)
  106. return oldVal;
  107. break;
  108. }
  109. }
  110. }
  111. //1.统计当前table一共有多少数据
  112. //2.判断是否达到扩容阈值标准,触发扩容。
  113. addCount(1L, binCount);
  114. return null;
  115. }

6 initTable

数组初始化方法,这个方法比较简单,就是初始化一个合适大小的数组。

sizeCtl :这个标志是在 Node 数组初始化或者扩容的时候的一个控制位标识,负数代表正在进行初始化或者扩容操作。

-1 代表正在初始化

-N 代表有 N-1 个线程正在进行扩容操作,这里不是简单的理解成 n 个线程,sizeCtl 就是-N

0 标识 Node 数组还没有被初始化,正数代表初始化或者下一次扩容的大小

  1. /**
  2. * Initializes table, using the size recorded in sizeCtl.
  3. * * sizeCtl < 0
  4. * * 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待..
  5. * * 2.表示当前table数组正在进行扩容 ,高16位表示:扩容的标识戳 低16位表示:(1 + nThread) 当前参与并发扩容的线程数量
  6. * *
  7. * * sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
  8. * *
  9. * * sizeCtl > 0
  10. * *
  11. * * 1. 如果table未初始化,表示初始化大小
  12. * * 2. 如果table已经初始化,表示下次扩容时的 触发条件(阈值)
  13. */
  14. private final Node<K,V>[] initTable() {
  15. //tab 引用map.table
  16. //sc sizeCtl的临时值
  17. Node<K,V>[] tab; int sc;
  18. //自旋 条件:map.table 尚未初始化
  19. while ((tab = table) == null || tab.length == 0) {
  20. if ((sc = sizeCtl) < 0)
  21. //大概率就是-1,表示其它线程正在进行创建table的过程,当前线程没有竞争到初始化table的锁。
  22. Thread.yield(); // lost initialization race; just spin
  23. //1.sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
  24. //2.如果table未初始化,表示初始化大小
  25. //3.如果table已经初始化,表示下次扩容时的 触发条件(阈值)
  26. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  27. try {
  28. //这里为什么又要判断呢? 防止其它线程已经初始化完毕了,然后当前线程再次初始化..导致丢失数据。
  29. //条件成立,说明其它线程都没有进入过这个if块,当前线程就是具备初始化table权利了。
  30. if ((tab = table) == null || tab.length == 0) {
  31. //sc大于0 创建table时 使用 sc为指定大小,否则使用 16 默认值.
  32. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  33. @SuppressWarnings("unchecked")
  34. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  35. //最终赋值给 map.table
  36. table = tab = nt;
  37. //n >>> 2 => 等于 1/4 n n - (1/4)n = 3/4 n => 0.75 * n
  38. //sc 0.75 n 表示下一次扩容时的触发条件。
  39. sc = n - (n >>> 2);
  40. }
  41. } finally {
  42. //1.如果当前线程是第一次创建map.table的线程话,sc表示的是 下一次扩容的阈值
  43. //2.表示当前线程 并不是第一次创建map.table的线程,当前线程进入到else if 块 时,将
  44. //sizeCtl 设置为了-1 ,那么这时需要将其修改为 进入时的值。
  45. sizeCtl = sc;
  46. }
  47. break;
  48. }
  49. }
  50. return tab;
  51. }

7 addCount

  1. private final void addCount(long x, int check) {
  2. //as 表示 LongAdder.cells
  3. //b 表示LongAdder.base
  4. //s 表示当前map.table中元素的数量
  5. CounterCell[] as; long b, s;
  6. //条件一:true->表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell 去累加数据
  7. // false->表示当前线程应该将数据累加到 base
  8. //条件二:false->表示写base成功,数据累加到base中了,当前竞争不激烈,不需要创建cells
  9. // true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells。
  10. if ((as = counterCells) != null ||
  11. !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  12. //有几种情况进入到if块中?
  13. //1.true->表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell 去累加数据
  14. //2.true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells。
  15. //a 表示当前线程hash寻址命中的cell
  16. CounterCell a;
  17. //v 表示当前线程写cell时的期望值
  18. long v;
  19. //m 表示当前cells数组的长度
  20. int m;
  21. //true -> 未竞争 false->发生竞争
  22. boolean uncontended = true;
  23. //条件一:as == null || (m = as.length - 1) < 0
  24. //true-> 表示当前线程是通过 写base竞争失败 然后进入的if块,就需要调用fullAddCount方法去扩容 或者 重试.. LongAdder.longAccumulate
  25. //条件二:a = as[ThreadLocalRandom.getProbe() & m]) == null 前置条件:cells已经初始化了
  26. //true->表示当前线程命中的cell表格是个空,需要当前线程进入fullAddCount方法去初始化 cell,放入当前位置.
  27. //条件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
  28. // false->取反得到false,表示当前线程使用cas方式更新当前命中的cell成功
  29. // true->取反得到true,表示当前线程使用cas方式更新当前命中的cell失败,需要进入fullAddCount进行重试 或者 扩容 cells。
  30. if (as == null || (m = as.length - 1) < 0 ||
  31. (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
  32. !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
  33. ) {
  34. fullAddCount(x, uncontended);
  35. //考虑到fullAddCount里面的事情比较累,就让当前线程 不参与到 扩容相关的逻辑了,直接返回到调用点。
  36. return;
  37. }
  38. if (check <= 1)
  39. return;
  40. //获取当前散列表元素个数,这是一个期望值
  41. s = sumCount();
  42. }
  43. //表示一定是一个put操作调用的addCount
  44. if (check >= 0) {
  45. //tab 表示map.table
  46. //nt 表示map.nextTable
  47. //n 表示map.table数组的长度
  48. //sc 表示sizeCtl的临时值
  49. Node<K,V>[] tab, nt; int n, sc;
  50. /**
  51. * sizeCtl < 0
  52. * 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待..
  53. * 2.表示当前table数组正在进行扩容 ,高16位表示:扩容的标识戳 低16位表示:(1 + nThread) 当前参与并发扩容的线程数量
  54. *
  55. * sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
  56. *
  57. * sizeCtl > 0
  58. *
  59. * 1. 如果table未初始化,表示初始化大小
  60. * 2. 如果table已经初始化,表示下次扩容时的 触发条件(阈值)
  61. */
  62. //自旋
  63. //条件一:s >= (long)(sc = sizeCtl)
  64. // true-> 1.当前sizeCtl为一个负数 表示正在扩容中..
  65. // 2.当前sizeCtl是一个正数,表示扩容阈值
  66. // false-> 表示当前table尚未达到扩容条件
  67. //条件二:(tab = table) != null
  68. // 恒成立 true
  69. //条件三:(n = tab.length) < MAXIMUM_CAPACITY
  70. // true->当前table长度小于最大值限制,则可以进行扩容。
  71. while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
  72. (n = tab.length) < MAXIMUM_CAPACITY) {
  73. //扩容批次唯一标识戳
  74. //16 -> 32 扩容 标识为:1000 0000 0001 1011
  75. int rs = resizeStamp(n);
  76. //条件成立:表示当前table正在扩容
  77. // 当前线程理论上应该协助table完成扩容
  78. if (sc < 0) {
  79. //条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
  80. // true->说明当前线程获取到的扩容唯一标识戳 非 本批次扩容
  81. // false->说明当前线程获取到的扩容唯一标识戳 是 本批次扩容
  82. //条件二: JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs << 16 ) + 1
  83. // true-> 表示扩容完毕,当前线程不需要再参与进来了
  84. // false->扩容还在进行中,当前线程可以参与
  85. //条件三:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs<<16) + MAX_RESIZERS
  86. // true-> 表示当前参与并发扩容的线程达到了最大值 65535 - 1
  87. // false->表示当前线程可以参与进来
  88. //条件四:(nt = nextTable) == null
  89. // true->表示本次扩容结束
  90. // false->扩容正在进行中
  91. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  92. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  93. transferIndex <= 0)
  94. break;
  95. //前置条件:当前table正在执行扩容中.. 当前线程有机会参与进扩容。
  96. //条件成立:说明当前线程成功参与到扩容任务中,并且将sc低16位值加1,表示多了一个线程参与工作
  97. //条件失败:1.当前有很多线程都在此处尝试修改sizeCtl,有其它一个线程修改成功了,导致你的sc期望值与内存中的值不一致 修改失败
  98. // 2.transfer 任务内部的线程也修改了sizeCtl。
  99. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  100. //协助扩容线程,持有nextTable参数
  101. transfer(tab, nt);
  102. }
  103. //1000 0000 0001 1011 0000 0000 0000 0000 +2 => 1000 0000 0001 1011 0000 0000 0000 0010
  104. //条件成立,说明当前线程是触发扩容的第一个线程,在transfer方法需要做一些扩容准备工作
  105. else if (U.compareAndSwapInt(this, SIZECTL, sc,
  106. (rs << RESIZE_STAMP_SHIFT) + 2))
  107. //触发扩容条件的线程 不持有nextTable
  108. transfer(tab, null);
  109. s = sumCount();
  110. }
  111. }
  112. }

8. transfer

ConcurrentHashMap 支持并发扩容,实现方式是,把 Node 数组进行拆分,让每个线程处理自己的区域,假设 table 数组总长度是 64,默认情况下,那么每个线程可以分到 16 个 bucket。然后每个线程处理的范围,按照倒序来做迁移。

通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置 transferIndex属性值,并初始化 i 和 bound 值,i 指当前处理的槽位序号,bound 指需要处理的槽位边界,先处理槽位 31 的节点; (bound,i) =(16,31) 从 31 的位置往前推动。

每存在一个线程执行完扩容操作,就通过 cas 执行 sc-1。

接着判断(sc-2) !=resizeStamp(n) << RESIZE_STAMP_SHIFT ; 如果相等,表示当前为整个扩容操作的 最后一个线程,那么意味着整个扩容操作就结束了;如果不相等,说明还得继续。

这么做的目的,一方面是防止不同扩容之间出现相同的 sizeCtl,另外一方面,还可以避免sizeCtl 的 ABA 问题导致的扩容重叠的情况。

扩容图解
JDK1.8 ConcurrentHashMap并发扩容..png
lastRun机制
LastRun机制..png
判断是否需要扩容,也就是当更新后的键值对总数 baseCount >= 阈值 sizeCtl 时,进行rehash,这里面会有两个逻辑。

  1. 如果当前正在处于扩容阶段,则当前线程会加入并且协助扩容。
  2. 如果当前没有在扩容,则直接触发扩容操作。

扩容操作的核心在于数据的转移,在单线程环境下数据的转移很简单,无非就是把旧数组中的数据迁移到新的数组。但是这在多线程环境下,在扩容的时候其他线程也可能正在添加元素,这时又触发了扩容怎么办?可能大家想到的第一个解决方案是加互斥锁,把转移过程锁住,虽然是可行的解决方案,但是会带来较大的性能开销。因为互斥锁会导致所有访问临界区的线程陷入到阻塞状态,持有锁的线程耗时越长,其他竞争线程就会一直被阻塞,导致吞吐量较低。而且还可能导致死锁。

而 ConcurrentHashMap 并没有直接加锁,而是采用 CAS 实现无锁的并发同步策略,最精华的部分是它可以利用多线程来进行协同扩容。

它把 Node 数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的bucket会被替换为一个ForwardingNode节点,标记当前bucket已经被其他线程迁移完了。接下来分析一下它的源码实现。

fwd:这个类是个标识类,用于指向新表用的,其他线程遇到这个类会主动跳过这个类,因为这个类要么就是扩容迁移正在进行,要么就是已经完成扩容迁移,也就是这个类要保证线程安全,再进行操作。

advance:这个变量是用于提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。

finishing:这个变量用于提示扩容是否结束用的。

  1. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2. //n 表示扩容之前table数组的长度
  3. //stride 表示分配给线程任务的步长
  4. int n = tab.length, stride;
  5. // stride 固定为 16
  6. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  7. stride = MIN_TRANSFER_STRIDE; // subdivide range
  8. //条件成立:表示当前线程为触发本次扩容的线程,需要做一些扩容准备工作
  9. //条件不成立:表示当前线程是协助扩容的线程..
  10. if (nextTab == null) { // initiating
  11. try {
  12. //创建了一个比扩容之前大一倍的table
  13. @SuppressWarnings("unchecked")
  14. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  15. nextTab = nt;
  16. } catch (Throwable ex) { // try to cope with OOME
  17. sizeCtl = Integer.MAX_VALUE;
  18. return;
  19. }
  20. //赋值给对象属性 nextTable ,方便协助扩容线程 拿到新表
  21. nextTable = nextTab;
  22. //记录迁移数据整体位置的一个标记。index计数是从1开始计算的。
  23. transferIndex = n;
  24. }
  25. //表示新数组的长度
  26. int nextn = nextTab.length;
  27. //fwd 节点,当某个桶位数据处理完毕后,将此桶位设置为fwd节点,其它写线程 或读线程看到后,会有不同逻辑。
  28. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  29. //推进标记
  30. boolean advance = true;
  31. //完成标记
  32. boolean finishing = false; // to ensure sweep before committing nextTab
  33. //i 表示分配给当前线程任务,执行到的桶位
  34. //bound 表示分配给当前线程任务的下界限制
  35. int i = 0, bound = 0;
  36. //自旋
  37. for (;;) {
  38. //f 桶位的头结点
  39. //fh 头结点的hash
  40. Node<K,V> f; int fh;
  41. /**
  42. * 1.给当前线程分配任务区间
  43. * 2.维护当前线程任务进度(i 表示当前处理的桶位)
  44. * 3.维护map对象全局范围内的进度
  45. */
  46. while (advance) {
  47. //分配任务的开始下标
  48. //分配任务的结束下标
  49. int nextIndex, nextBound;
  50. //CASE1:
  51. //条件一:--i >= bound
  52. //成立:表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个 桶位.
  53. //不成立:表示当前线程任务已完成 或 者未分配
  54. if (--i >= bound || finishing)
  55. advance = false;
  56. //CASE2:
  57. //前置条件:当前线程任务已完成 或 者未分配
  58. //条件成立:表示对象全局范围内的桶位都分配完毕了,没有区间可分配了,设置当前线程的i变量为-1 跳出循环后,执行退出迁移任务相关的程序
  59. //条件不成立:表示对象全局范围内的桶位尚未分配完毕,还有区间可分配
  60. else if ((nextIndex = transferIndex) <= 0) {
  61. i = -1;
  62. advance = false;
  63. }
  64. //CASE3:
  65. //前置条件:1、当前线程需要分配任务区间 2.全局范围内还有桶位尚未迁移
  66. //条件成立:说明给当前线程分配任务成功
  67. //条件失败:说明分配给当前线程失败,应该是和其它线程发生了竞争吧
  68. else if (U.compareAndSwapInt
  69. (this, TRANSFERINDEX, nextIndex,
  70. nextBound = (nextIndex > stride ?
  71. nextIndex - stride : 0))) {
  72. bound = nextBound;
  73. i = nextIndex - 1;
  74. advance = false;
  75. }
  76. }
  77. //CASE1:
  78. //条件一:i < 0
  79. //成立:表示当前线程未分配到任务
  80. if (i < 0 || i >= n || i + n >= nextn) {
  81. //保存sizeCtl 的变量
  82. int sc;
  83. if (finishing) {
  84. nextTable = null;
  85. table = nextTab;
  86. sizeCtl = (n << 1) - (n >>> 1);
  87. return;
  88. }
  89. //条件成立:说明设置sizeCtl 低16位 -1 成功,当前线程可以正常退出
  90. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  91. //1000 0000 0001 1011 0000 0000 0000 0000
  92. //条件成立:说明当前线程不是最后一个退出transfer任务的线程
  93. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  94. //正常退出
  95. return;
  96. finishing = advance = true;
  97. i = n; // recheck before commit
  98. }
  99. }
  100. //前置条件:【CASE2~CASE4】 当前线程任务尚未处理完,正在进行中
  101. //CASE2:
  102. //条件成立:说明当前桶位未存放数据,只需要将此处设置为fwd节点即可。
  103. else if ((f = tabAt(tab, i)) == null)
  104. advance = casTabAt(tab, i, null, fwd);
  105. //CASE3:
  106. //条件成立:说明当前桶位已经迁移过了,当前线程不用再处理了,直接再次更新当前线程任务索引,再次处理下一个桶位 或者 其它操作
  107. else if ((fh = f.hash) == MOVED)
  108. advance = true; // already processed
  109. //CASE4:
  110. //前置条件:当前桶位有数据,而且node节点 不是 fwd节点,说明这些数据需要迁移。
  111. else {
  112. //sync 加锁当前桶位的头结点
  113. synchronized (f) {
  114. //防止在你加锁头对象之前,当前桶位的头对象被其它写线程修改过,导致你目前加锁对象错误...
  115. if (tabAt(tab, i) == f) {
  116. //ln 表示低位链表引用
  117. //hn 表示高位链表引用
  118. Node<K,V> ln, hn;
  119. //条件成立:表示当前桶位是链表桶位
  120. if (fh >= 0) {
  121. //lastRun
  122. //可以获取出 当前链表 末尾连续高位不变的 node
  123. int runBit = fh & n;
  124. Node<K,V> lastRun = f;
  125. for (Node<K,V> p = f.next; p != null; p = p.next) {
  126. int b = p.hash & n;
  127. if (b != runBit) {
  128. runBit = b;
  129. lastRun = p;
  130. }
  131. }
  132. //条件成立:说明lastRun引用的链表为 低位链表,那么就让 ln 指向 低位链表
  133. if (runBit == 0) {
  134. ln = lastRun;
  135. hn = null;
  136. }
  137. //否则,说明lastRun引用的链表为 高位链表,就让 hn 指向 高位链表
  138. else {
  139. hn = lastRun;
  140. ln = null;
  141. }
  142. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  143. int ph = p.hash; K pk = p.key; V pv = p.val;
  144. if ((ph & n) == 0)
  145. ln = new Node<K,V>(ph, pk, pv, ln);
  146. else
  147. hn = new Node<K,V>(ph, pk, pv, hn);
  148. }
  149. setTabAt(nextTab, i, ln);
  150. setTabAt(nextTab, i + n, hn);
  151. setTabAt(tab, i, fwd);
  152. advance = true;
  153. }
  154. //条件成立:表示当前桶位是 红黑树 代理结点TreeBin
  155. else if (f instanceof TreeBin) {
  156. //转换头结点为 treeBin引用 t
  157. TreeBin<K,V> t = (TreeBin<K,V>)f;
  158. //低位双向链表 lo 指向低位链表的头 loTail 指向低位链表的尾巴
  159. TreeNode<K,V> lo = null, loTail = null;
  160. //高位双向链表 lo 指向高位链表的头 loTail 指向高位链表的尾巴
  161. TreeNode<K,V> hi = null, hiTail = null;
  162. //lc 表示低位链表元素数量
  163. //hc 表示高位链表元素数量
  164. int lc = 0, hc = 0;
  165. //迭代TreeBin中的双向链表,从头结点 至 尾节点
  166. for (Node<K,V> e = t.first; e != null; e = e.next) {
  167. // h 表示循环处理当前元素的 hash
  168. int h = e.hash;
  169. //使用当前节点 构建出来的 新的 TreeNode
  170. TreeNode<K,V> p = new TreeNode<K,V>
  171. (h, e.key, e.val, null, null);
  172. //条件成立:表示当前循环节点 属于低位链 节点
  173. if ((h & n) == 0) {
  174. //条件成立:说明当前低位链表 还没有数据
  175. if ((p.prev = loTail) == null)
  176. lo = p;
  177. //说明 低位链表已经有数据了,此时当前元素 追加到 低位链表的末尾就行了
  178. else
  179. loTail.next = p;
  180. //将低位链表尾指针指向 p 节点
  181. loTail = p;
  182. ++lc;
  183. }
  184. //当前节点 属于 高位链 节点
  185. else {
  186. if ((p.prev = hiTail) == null)
  187. hi = p;
  188. else
  189. hiTail.next = p;
  190. hiTail = p;
  191. ++hc;
  192. }
  193. }
  194. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  195. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  196. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  197. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  198. setTabAt(nextTab, i, ln);
  199. setTabAt(nextTab, i + n, hn);
  200. setTabAt(tab, i, fwd);
  201. advance = true;
  202. }
  203. }
  204. }
  205. }
  206. }
  207. }

链表迁移原理

1)高低位原理分析

ConcurrentHashMap 在做链表迁移时,会用高低位来实现,这里有两个问题要分析一下

1,如何实现高低位链表的区分

假如有这样一个队列

1.jpg

第 14 个槽位插入新节点之后,链表元素个数已经达到了 8,且数组长度为 16,优先通过扩容来缓解链表过长的问题

假如当前线程正在处理槽位为 14 的节点,它是一个链表结构,在代码中,首先定义两个变量节点 ln 和 hn,实际就是 lowNode 和 HighNode,分别保存 hash 值的第 x 位为 0 和不等于0 的节点

通过 fn&n 可以把这个链表中的元素分为两类,A 类是 hash 值的第 X 位为 0,B 类是 hash 值的第 x 位为不等于 0(至于为什么要这么区分,稍后分析),并且通过 lastRun 记录最后要处理的节点。最终要达到的目的是,A 类的链表保持位置不动,B 类的链表为 14+16(扩容增加的长度)=30

把 14 槽位的链表单独伶出来,用蓝色表示 fn&n=0 的节点,假如链表的分类是这样

1.jpeg

  1. for (Node<K,V> p = f.next; p != null; p = p.next) {
  2. int b = p.hash & n;
  3. if (b != runBit) {
  4. runBit = b;
  5. lastRun = p;
  6. }
  7. }

通过上面这段代码遍历,会记录 runBit 以及 lastRun,按照上面这个结构,那么 runBit 应该是蓝色节点,lastRun 应该是第 6 个节点接着,再通过这段代码进行遍历,生成 ln 链以及 hn 链

  1. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  2. int ph = p.hash; K pk = p.key; V pv = p.val;
  3. if ((ph & n) == 0)
  4. ln = new Node<K,V>(ph, pk, pv, ln);
  5. else
  6. hn = new Node<K,V>(ph, pk, pv, hn);
  7. }

1.jpeg

接着,通过 CAS 操作,把 hn 链放在 i+n 也就是 14+16 的位置,ln 链保持原来的位置不动。并且设置当前节点为 fwd,表示已经被当前线程迁移完了。

  1. setTabAt(nextTab, i, ln);
  2. setTabAt(nextTab, i + n, hn);
  3. setTabAt(tab, i, fwd);

迁移完成以后的数据分布如下

1.jpg

2)为什么要做高低位的划分

要想了解这么设计的目的,我们需要从 ConcurrentHashMap 的根据下标获取对象的算法来看,在 putVal 方法中 1018 行:

  1. (f = tabAt(tab, i = (n - 1) & hash)) == null

通过(n-1) & hash 来获得在 table 中的数组下标来获取节点数据,【&运算是二进制运算符,1& 1=1,其他都为 0】。

链表迁移原理..png

9.helpTransfer

如果对应的节点存在,判断这个节点的 hash 是不是等于 MOVED(-1),说明当前节点是ForwardingNode 节点,意味着有其他线程正在进行扩容,那么当前现在直接帮助它进行扩容,因此调用 helpTransfer方法。

  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2. //nextTab 引用的是 fwd.nextTable == map.nextTable 理论上是这样。
  3. //sc 保存map.sizeCtl
  4. Node<K,V>[] nextTab; int sc;
  5. //条件一:tab != null 恒成立 true
  6. //条件二:(f instanceof ForwardingNode) 恒成立 true
  7. //条件三:((ForwardingNode<K,V>)f).nextTable) != null 恒成立 true
  8. if (tab != null && (f instanceof ForwardingNode) &&
  9. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  10. //拿当前标的长度 获取 扩容标识戳 假设 16 -> 32 扩容:1000 0000 0001 1011
  11. int rs = resizeStamp(tab.length);
  12. //条件一:nextTab == nextTable
  13. //成立:表示当前扩容正在进行中
  14. //不成立:1.nextTable被设置为Null 了,扩容完毕后,会被设为Null
  15. // 2.再次出发扩容了...咱们拿到的nextTab 也已经过期了...
  16. //条件二:table == tab
  17. //成立:说明 扩容正在进行中,还未完成
  18. //不成立:说明扩容已经结束了,扩容结束之后,最后退出的线程 会设置 nextTable 为 table
  19. //条件三:(sc = sizeCtl) < 0
  20. //成立:说明扩容正在进行中
  21. //不成立:说明sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束。
  22. while (nextTab == nextTable && table == tab &&
  23. (sc = sizeCtl) < 0) {
  24. //条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
  25. // true->说明当前线程获取到的扩容唯一标识戳 非 本批次扩容
  26. // false->说明当前线程获取到的扩容唯一标识戳 是 本批次扩容
  27. //条件二: JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs << 16 ) + 1
  28. // true-> 表示扩容完毕,当前线程不需要再参与进来了
  29. // false->扩容还在进行中,当前线程可以参与
  30. //条件三:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs<<16) + MAX_RESIZERS
  31. // true-> 表示当前参与并发扩容的线程达到了最大值 65535 - 1
  32. // false->表示当前线程可以参与进来
  33. //条件四:transferIndex <= 0
  34. // true->说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干..
  35. // false->还有任务可以分配。
  36. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  37. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  38. break;
  39. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  40. transfer(tab, nextTab);
  41. break;
  42. }
  43. }
  44. return nextTab;
  45. }
  46. return table;
  47. }

10.get

  1. public V get(Object key) {
  2. //tab 引用map.table
  3. //e 当前元素
  4. //p 目标节点
  5. //n table数组长度
  6. //eh 当前元素hash
  7. //ek 当前元素key
  8. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  9. //扰动运算后得到 更散列的hash值
  10. int h = spread(key.hashCode());
  11. //条件一:(tab = table) != null
  12. //true->表示已经put过数据,并且map内部的table也已经初始化完毕
  13. //false->表示创建完map后,并没有put过数据,map内部的table是延迟初始化的,只有第一次写数据时会触发创建逻辑。
  14. //条件二:(n = tab.length) > 0 true->表示table已经初始化
  15. //条件三:(e = tabAt(tab, (n - 1) & h)) != null
  16. //true->当前key寻址的桶位 有值
  17. //false->当前key寻址的桶位中是null,是null直接返回null
  18. if ((tab = table) != null && (n = tab.length) > 0 &&
  19. (e = tabAt(tab, (n - 1) & h)) != null) {
  20. //前置条件:当前桶位有数据
  21. //对比头结点hash与查询key的hash是否一致
  22. //条件成立:说明头结点与查询Key的hash值 完全一致
  23. if ((eh = e.hash) == h) {
  24. //完全比对 查询key 和 头结点的key
  25. //条件成立:说明头结点就是查询数据
  26. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  27. return e.val;
  28. }
  29. //条件成立:
  30. //1.-1 fwd 说明当前table正在扩容,且当前查询的这个桶位的数据 已经被迁移走了
  31. //2.-2 TreeBin节点,需要使用TreeBin 提供的find 方法查询。
  32. else if (eh < 0)
  33. return (p = e.find(h, key)) != null ? p.val : null;
  34. //当前桶位已经形成链表的这种情况
  35. while ((e = e.next) != null) {
  36. if (e.hash == h &&
  37. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  38. return e.val;
  39. }
  40. }
  41. return null;
  42. }

11.remove

  1. public V remove(Object key) {
  2. return replaceNode(key, null, null);
  3. }

12.replaceNode

  1. final V replaceNode(Object key, V value, Object cv) {
  2. //计算key经过扰动运算后的hash
  3. int hash = spread(key.hashCode());
  4. //自旋
  5. for (Node<K,V>[] tab = table;;) {
  6. //f表示桶位头结点
  7. //n表示当前table数组长度
  8. //i表示hash命中桶位下标
  9. //fh表示桶位头结点 hash
  10. Node<K,V> f; int n, i, fh;
  11. //CASE1:
  12. //条件一:tab == null true->表示当前map.table尚未初始化.. false->已经初始化
  13. //条件二:(n = tab.length) == 0 true->表示当前map.table尚未初始化.. false->已经初始化
  14. //条件三:(f = tabAt(tab, i = (n - 1) & hash)) == null true -> 表示命中桶位中为null,直接break, 会返回
  15. if (tab == null || (n = tab.length) == 0 ||
  16. (f = tabAt(tab, i = (n - 1) & hash)) == null)
  17. break;
  18. //CASE2:
  19. //前置条件CASE2 ~ CASE3:当前桶位不是null
  20. //条件成立:说明当前table正在扩容中,当前是个写操作,所以当前线程需要协助table完成扩容。
  21. else if ((fh = f.hash) == MOVED)
  22. tab = helpTransfer(tab, f);
  23. //CASE3:
  24. //前置条件CASE2 ~ CASE3:当前桶位不是null
  25. //当前桶位 可能是 "链表" 也可能 是 "红黑树" TreeBin
  26. else {
  27. //保留替换之前的数据引用
  28. V oldVal = null;
  29. //校验标记
  30. boolean validated = false;
  31. //加锁当前桶位 头结点,加锁成功之后会进入 代码块。
  32. synchronized (f) {
  33. //判断sync加锁是否为当前桶位 头节点,防止其它线程,在当前线程加锁成功之前,修改过 桶位 的头结点。
  34. //条件成立:当前桶位头结点 仍然为f,其它线程没修改过。
  35. if (tabAt(tab, i) == f) {
  36. //条件成立:说明桶位 为 链表 或者 单个 node
  37. if (fh >= 0) {
  38. validated = true;
  39. //e 表示当前循环处理元素
  40. //pred 表示当前循环节点的上一个节点
  41. Node<K,V> e = f, pred = null;
  42. for (;;) {
  43. //当前节点key
  44. K ek;
  45. //条件一:e.hash == hash true->说明当前节点的hash与查找节点hash一致
  46. //条件二:((ek = e.key) == key || (ek != null && key.equals(ek)))
  47. //if 条件成立,说明key 与查询的key完全一致。
  48. if (e.hash == hash &&
  49. ((ek = e.key) == key ||
  50. (ek != null && key.equals(ek)))) {
  51. //当前节点的value
  52. V ev = e.val;
  53. //条件一:cv == null true->替换的值为null 那么就是一个删除操作
  54. //条件二:cv == ev || (ev != null && cv.equals(ev)) 那么是一个替换操作
  55. if (cv == null || cv == ev ||
  56. (ev != null && cv.equals(ev))) {
  57. //删除 或者 替换
  58. //将当前节点的值 赋值给 oldVal 后续返回会用到
  59. oldVal = ev;
  60. //条件成立:说明当前是一个替换操作
  61. if (value != null)
  62. //直接替换
  63. e.val = value;
  64. //条件成立:说明当前节点非头结点
  65. else if (pred != null)
  66. //当前节点的上一个节点,指向当前节点的下一个节点。
  67. pred.next = e.next;
  68. else
  69. //说明当前节点即为 头结点,只需要将 桶位设置为头结点的下一个节点。
  70. setTabAt(tab, i, e.next);
  71. }
  72. break;
  73. }
  74. pred = e;
  75. if ((e = e.next) == null)
  76. break;
  77. }
  78. }
  79. //条件成立:TreeBin节点。
  80. else if (f instanceof TreeBin) {
  81. validated = true;
  82. //转换为实际类型 TreeBin t
  83. TreeBin<K,V> t = (TreeBin<K,V>)f;
  84. //r 表示 红黑树 根节点
  85. //p 表示 红黑树中查找到对应key 一致的node
  86. TreeNode<K,V> r, p;
  87. //条件一:(r = t.root) != null 理论上是成立
  88. //条件二:TreeNode.findTreeNode 以当前节点为入口,向下查找key(包括本身节点)
  89. // true->说明查找到相应key 对应的node节点。会赋值给p
  90. if ((r = t.root) != null &&
  91. (p = r.findTreeNode(hash, key, null)) != null) {
  92. //保存p.val 到pv
  93. V pv = p.val;
  94. //条件一:cv == null 成立:不必对value,就做替换或者删除操作
  95. //条件二:cv == pv ||(pv != null && cv.equals(pv)) 成立:说明“对比值”与当前p节点的值 一致
  96. if (cv == null || cv == pv ||
  97. (pv != null && cv.equals(pv))) {
  98. //替换或者删除操作
  99. oldVal = pv;
  100. //条件成立:替换操作
  101. if (value != null)
  102. p.val = value;
  103. //删除操作
  104. else if (t.removeTreeNode(p))
  105. //这里没做判断,直接搞了...很疑惑
  106. setTabAt(tab, i, untreeify(t.first));
  107. }
  108. }
  109. }
  110. }
  111. }
  112. //当其他线程修改过桶位 头结点时,当前线程 sync 头结点 锁错对象时,validated 为false,会进入下次for 自旋
  113. if (validated) {
  114. if (oldVal != null) {
  115. //替换的值 为null,说明当前是一次删除操作,oldVal !=null 成立,说明删除成功,更新当前元素个数计数器。
  116. if (value == null)
  117. addCount(-1L, -1);
  118. return oldVal;
  119. }
  120. break;
  121. }
  122. }
  123. }
  124. return null;
  125. }

13.TreeBin

13.1 属性

  1. //红黑树 根节点
  2. TreeNode<K,V> root;
  3. //链表的头节点
  4. volatile TreeNode<K,V> first;
  5. //等待者线程(当前lockState是读锁状态)
  6. volatile Thread waiter;
  7. /**
  8. * 1.写锁状态 写是独占状态,以散列表来看,真正进入到TreeBin中的写线程 同一时刻 只有一个线程。 1
  9. * 2.读锁状态 读锁是共享,同一时刻可以有多个线程 同时进入到 TreeBin对象中获取数据。 每一个线程 都会给 lockStat + 4
  10. * 3.等待者状态(写线程在等待),当TreeBin中有读线程目前正在读取数据时,写线程无法修改数据,那么就将lockState的最低2位 设置为 0b 10
  11. */
  12. volatile int lockState;
  13. // values for lockState
  14. static final int WRITER = 1; // set while holding write lock
  15. static final int WAITER = 2; // set when waiting for write lock
  16. static final int READER = 4; // increment value for setting read lock

13.2 构造器

  1. TreeBin(TreeNode<K,V> b) {
  2. //设置节点hash为-2 表示此节点是TreeBin节点
  3. super(TREEBIN, null, null, null);
  4. //使用first 引用 treeNode链表
  5. this.first = b;
  6. //r 红黑树的根节点引用
  7. TreeNode<K,V> r = null;
  8. //x表示遍历的当前节点
  9. for (TreeNode<K,V> x = b, next; x != null; x = next) {
  10. next = (TreeNode<K,V>)x.next;
  11. //强制设置当前插入节点的左右子树为null
  12. x.left = x.right = null;
  13. //条件成立:说明当前红黑树 是一个空树,那么设置插入元素 为根节点
  14. if (r == null) {
  15. //根节点的父节点 一定为 null
  16. x.parent = null;
  17. //颜色改为黑色
  18. x.red = false;
  19. //让r引用x所指向的对象。
  20. r = x;
  21. }
  22. else {
  23. //非第一次循环,都会来带else分支,此时红黑树已经有数据了
  24. //k 表示 插入节点的key
  25. K k = x.key;
  26. //h 表示 插入节点的hash
  27. int h = x.hash;
  28. //kc 表示 插入节点key的class类型
  29. Class<?> kc = null;
  30. //p 表示 为查找插入节点的父节点的一个临时节点
  31. TreeNode<K,V> p = r;
  32. for (;;) {
  33. //dir (-1, 1)
  34. //-1 表示插入节点的hash值大于 当前p节点的hash
  35. //1 表示插入节点的hash值 小于 当前p节点的hash
  36. //ph p表示 为查找插入节点的父节点的一个临时节点的hash
  37. int dir, ph;
  38. //临时节点 key
  39. K pk = p.key;
  40. //插入节点的hash值 小于 当前节点
  41. if ((ph = p.hash) > h)
  42. //插入节点可能需要插入到当前节点的左子节点 或者 继续在左子树上查找
  43. dir = -1;
  44. //插入节点的hash值 大于 当前节点
  45. else if (ph < h)
  46. //插入节点可能需要插入到当前节点的右子节点 或者 继续在右子树上查找
  47. dir = 1;
  48. //如果执行到 CASE3,说明当前插入节点的hash 与 当前节点的hash一致,会在case3 做出最终排序。最终
  49. //拿到的dir 一定不是0,(-1, 1)
  50. else if ((kc == null &&
  51. (kc = comparableClassFor(k)) == null) ||
  52. (dir = compareComparables(kc, k, pk)) == 0)
  53. dir = tieBreakOrder(k, pk);
  54. //xp 想要表示的是 插入节点的 父节点
  55. TreeNode<K,V> xp = p;
  56. //条件成立:说明当前p节点 即为插入节点的父节点
  57. //条件不成立:说明p节点 底下还有层次,需要将p指向 p的左子节点 或者 右子节点,表示继续向下搜索。
  58. if ((p = (dir <= 0) ? p.left : p.right) == null) {
  59. //设置插入节点的父节点 为 当前节点
  60. x.parent = xp;
  61. //小于P节点,需要插入到P节点的左子节点
  62. if (dir <= 0)
  63. xp.left = x;
  64. //大于P节点,需要插入到P节点的右子节点
  65. else
  66. xp.right = x;
  67. //插入节点后,红黑树性质 可能会被破坏,所以需要调用 平衡方法
  68. r = balanceInsertion(r, x);
  69. break;
  70. }
  71. }
  72. }
  73. }
  74. //将r 赋值给 TreeBin对象的 root引用。
  75. this.root = r;
  76. assert checkInvariants(root);
  77. }

13.3 putTreeVal

  1. final TreeNode<K,V> putTreeVal(int h, K k, V v) {
  2. Class<?> kc = null;
  3. boolean searched = false;
  4. for (TreeNode<K,V> p = root;;) {
  5. int dir, ph; K pk;
  6. if (p == null) {
  7. first = root = new TreeNode<K,V>(h, k, v, null, null);
  8. break;
  9. }
  10. else if ((ph = p.hash) > h)
  11. dir = -1;
  12. else if (ph < h)
  13. dir = 1;
  14. else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
  15. return p;
  16. else if ((kc == null &&
  17. (kc = comparableClassFor(k)) == null) ||
  18. (dir = compareComparables(kc, k, pk)) == 0) {
  19. if (!searched) {
  20. TreeNode<K,V> q, ch;
  21. searched = true;
  22. if (((ch = p.left) != null &&
  23. (q = ch.findTreeNode(h, k, kc)) != null) ||
  24. ((ch = p.right) != null &&
  25. (q = ch.findTreeNode(h, k, kc)) != null))
  26. return q;
  27. }
  28. dir = tieBreakOrder(k, pk);
  29. }
  30. TreeNode<K,V> xp = p;
  31. if ((p = (dir <= 0) ? p.left : p.right) == null) {
  32. //当前循环节点xp 即为 x 节点的爸爸
  33. //x 表示插入节点
  34. //f 老的头结点
  35. TreeNode<K,V> x, f = first;
  36. first = x = new TreeNode<K,V>(h, k, v, f, xp);
  37. //条件成立:说明链表有数据
  38. if (f != null)
  39. //设置老的头结点的前置引用为 当前的头结点。
  40. f.prev = x;
  41. if (dir <= 0)
  42. xp.left = x;
  43. else
  44. xp.right = x;
  45. if (!xp.red)
  46. x.red = true;
  47. else {
  48. //表示 当前新插入节点后,新插入节点 与 父节点 形成 “红红相连”
  49. lockRoot();
  50. try {
  51. //平衡红黑树,使其再次符合规范。
  52. root = balanceInsertion(root, x);
  53. } finally {
  54. unlockRoot();
  55. }
  56. }
  57. break;
  58. }
  59. }
  60. assert checkInvariants(root);
  61. return null;
  62. }

13.4 find

  1. final Node<K,V> find(int h, Object k) {
  2. if (k != null) {
  3. //e 表示循环迭代的当前节点 迭代的是first引用的链表
  4. for (Node<K,V> e = first; e != null; ) {
  5. //s 保存的是lock临时状态
  6. //ek 链表当前节点 的key
  7. int s; K ek;
  8. //(WAITER|WRITER) => 0010 | 0001 => 0011
  9. //lockState & 0011 != 0 条件成立:说明当前TreeBin 有等待者线程 或者 目前有写操作线程正在加锁
  10. if (((s = lockState) & (WAITER|WRITER)) != 0) {
  11. if (e.hash == h &&
  12. ((ek = e.key) == k || (ek != null && k.equals(ek))))
  13. return e;
  14. e = e.next;
  15. }
  16. //前置条件:当前TreeBin中 等待者线程 或者 写线程 都没有
  17. //条件成立:说明添加读锁成功
  18. else if (U.compareAndSwapInt(this, LOCKSTATE, s,
  19. s + READER)) {
  20. TreeNode<K,V> r, p;
  21. try {
  22. //查询操作
  23. p = ((r = root) == null ? null :
  24. r.findTreeNode(h, k, null));
  25. } finally {
  26. //w 表示等待者线程
  27. Thread w;
  28. //U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER)
  29. //1.当前线程查询红黑树结束,释放当前线程的读锁 就是让 lockstate 值 - 4
  30. //(READER|WAITER) = 0110 => 表示当前只有一个线程在读,且“有一个线程在等待”
  31. //当前读线程为 TreeBin中的最后一个读线程。
  32. //2.(w = waiter) != null 说明有一个写线程在等待读操作全部结束。
  33. if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
  34. (READER|WAITER) && (w = waiter) != null)
  35. //使用unpark 让 写线程 恢复运行状态。
  36. LockSupport.unpark(w);
  37. }
  38. return p;
  39. }
  40. }
  41. }
  42. return null;
  43. }

三,总结

在java8中,ConcurrentHashMap使用数组+链表+红黑树的组合方式,利用cas和synchronized保证并发写的安全。

引入红黑树的原因:链表查询的时间复杂度为On,但是红黑树的查询时间复杂度为O(log(n)),所以在节点比较多的情况下,使用红黑树可以大大提升性能。

链式桶是一个由node节点组成的链表。树状桶是一颗由TreeNode节点组成的红黑树。输的根节点为TreeBin类型。

当链表长度大于8整个hash表长度大于64的时候,就会转化为TreeBin。TreeBin作为根节点,其实就是红黑树对象。在ConcurrentHashMap的table数组中,存放的就是TreeBin对象,而不是TreeNoe对象。

数组table是懒加载的,只有第一次添加元素的时候才会初始化,所以initTable()存在线程安全问题。

重要的属性就是sizeCtl,用来控制table的初始化和扩容操作的过程:

  • -1代表table正在初始化,其他线程直接join等待。
  • -N代表有N-1个线程正在进行扩容操作,严格来说,当其为负数的时候,只用到了低16位,如果低16位为M,此时有M-1个线程进行扩容。
  • 大于0有两种情况:如果table没有初始化,她就表示table初始化的大小,如果table初始化完了,就表示table的容量,默认是table大小的四分之三。

Transfer()扩容

table数据转移到nextTable。扩容操作的核心在于数据的转移,把旧数组中的数据迁移到新的数组。ConcurrentHashMap精华的部分是它可以利用多线程来进行协同扩容,简单来说,它把table数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划分每个线程所负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的 Bucket会被替换为一个Forwarding节点,标记当前Bucket已经被其他线程迁移完了。

helpTransfer()帮助扩容

ConcurrentHashMap并发添加元素时,如果正在扩容,其他线程会帮助扩容,也就是多线程扩容。

第一次添加元素时,默认初始长度为16,当往table中继续添加元素时,通过Hash值跟数组长度取余来决定放在数组的哪个Bucket位置,如果出现放在同一个位置,就优先以链表的形式存放,在同一个位置的个数达到了8个以上,如果数组的长度还小于64,就会扩容数组。如果数组的长度大于等于64,就会将该节点的链表转换成树。

通过扩容数组的方式来把这些节点分散开。然后将这些元素复制到扩容后的新数组中,同一个Bucket中的元素通过Hash值的数组长度位来重新确定位置,可能还是放在原来的位置,也可能放到新的位置。而且,在扩容完成之后,如果之前某个节点是树,但是现在该节点的“Key-Value对”数又小于等于6个,就会将该树转为链表。

put()

JDK1.8在使用CAS自旋完成桶的设置时,使用synchronized内置锁保证桶内并发操作的线程安全。尽管对同一个Map操作的线程争用会非常激烈,但是在同一个桶内的线程争用通常不会很激烈,所以使用CAS自旋、synchronized不会降低ConcurrentHashMap的性能。为什么不用ReentrantLock显式锁呢?如果为每 个桶都创建一个ReentrantLock实 例,就会带来大量的内存消耗,反过来,使用CAS自旋、synchronized,内存消耗的增加更小。

get()

get()通过UnSafe的getObjectVolatile()来读取数组中的元素。为什么要这样做?虽然HashEntry数组的引用是volatile类型,但是数组内元素的 用不是volatile类型,因此多线程对 数组元素的修改是不安全的,可能会在数组中读取到尚未构造完成的元素对象。get()方法通过UnSafe的getObjectVolatile方法来保证元素的读取安全,调用getObjectVolatile()去读取数组元素需要先获得元素在数组中的偏移量,在这里,get()方法根据哈希码计算出偏移量为u,然后通过偏移量u来尝试读取数值。