https://juejin.cn/post/6937266785854947358?utm_source=gold_browser_extension

CAS + synchronized 来保证并发安全性。

Node节点

ConcurrentHashMap - 图1
其中的 val next 都用了 volatile 修饰,保证了可见性。

初始化

  1. public ConcurrentHashMap(int initialCapacity) {
  2. if (initialCapacity < 0)
  3. throw new IllegalArgumentException();
  4. int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
  5. MAXIMUM_CAPACITY :
  6. tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
  7. this.sizeCtl = cap;
  8. }
  9. // 找到1.5倍初始容量+1最近的2的n次方的值
  10. private static final int tableSizeFor(int c) {
  11. int n = c - 1;
  12. n |= n >>> 1;
  13. n |= n >>> 2;
  14. n |= n >>> 4;
  15. n |= n >>> 8;
  16. n |= n >>> 16;
  17. return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
  18. }

put

  1. sizeCtl
  2. 默认为0,用来控制table的初始化和扩容操作
  3. -1 代表table正在初始化
  4. -N 表示有取-N对应的二进制的低16位数值为M,此时有M-1个线程进行扩容。
  5. 其余情况:
  6. 1、如果table未初始化,表示table需要初始化的大小。
  7. 2、如果table初始化完成,表示table的容量,默认是table大小的0.75
  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode());
  4. int binCount = 0;
  5. for (Node<K,V>[] tab = table;;) {
  6. Node<K,V> f; int n, i, fh;
  7. if (tab == null || (n = tab.length) == 0)
  8. tab = initTable();
  9. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  10. if (casTabAt(tab, i, null,
  11. new Node<K,V>(hash, key, value, null)))
  12. break; // no lock when adding to empty bin
  13. }
  14. else if ((fh = f.hash) == MOVED)
  15. tab = helpTransfer(tab, f);
  16. else {
  17. V oldVal = null;
  18. synchronized (f) {
  19. if (tabAt(tab, i) == f) {
  20. if (fh >= 0) {
  21. binCount = 1;
  22. for (Node<K,V> e = f;; ++binCount) {
  23. K ek;
  24. if (e.hash == hash &&
  25. ((ek = e.key) == key ||
  26. (ek != null && key.equals(ek)))) {
  27. oldVal = e.val;
  28. if (!onlyIfAbsent)
  29. e.val = value;
  30. break;
  31. }
  32. Node<K,V> pred = e;
  33. if ((e = e.next) == null) {
  34. pred.next = new Node<K,V>(hash, key,
  35. value, null);
  36. break;
  37. }
  38. }
  39. }
  40. else if (f instanceof TreeBin) {
  41. Node<K,V> p;
  42. binCount = 2;
  43. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  44. value)) != null) {
  45. oldVal = p.val;
  46. if (!onlyIfAbsent)
  47. p.val = value;
  48. }
  49. }
  50. }
  51. }
  52. if (binCount != 0) {
  53. if (binCount >= TREEIFY_THRESHOLD)
  54. treeifyBin(tab, i);
  55. if (oldVal != null)
  56. return oldVal;
  57. break;
  58. }
  59. }
  60. }
  61. addCount(1L, binCount);
  62. return null;
  63. }
  1. 数组为空,初始化数组
  2. 如果hash&(capacity-1)对应的位置如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
  3. 倘若当前map正在扩容f.hash == MOVED=-1, 则跟其他线程一起进行扩容
  4. 出现hash冲突,则采用synchronized关键字。倘若当前hash对应的节点是链表的头节点,遍历链表,若找到对应的node节点,则修改node节点的val,否则在链表末尾添加node节点;倘若当前节点是红黑树的根节点,在树结构上遍历元素,更新或增加节点。


helpTransfer 在扩容时候使用

  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2. Node<K,V>[] nextTab; int sc;
  3. if (tab != null && (f instanceof ForwardingNode) &&
  4. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  5. int rs = resizeStamp(tab.length);
  6. while (nextTab == nextTable && table == tab &&
  7. (sc = sizeCtl) < 0) {
  8. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  9. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  10. break;
  11. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  12. transfer(tab, nextTab);
  13. break;
  14. }
  15. }
  16. return nextTab;
  17. }
  18. return table;
  19. }

initTable 初始化数组时候使用

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. if ((sc = sizeCtl) < 0)
  5. Thread.yield(); // lost initialization race; just spin
  6. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  7. try {
  8. if ((tab = table) == null || tab.length == 0) {
  9. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  10. @SuppressWarnings("unchecked")
  11. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  12. table = tab = nt;
  13. sc = n - (n >>> 2);
  14. }
  15. } finally {
  16. sizeCtl = sc;
  17. }
  18. break;
  19. }
  20. }
  21. return tab;
  22. }
  1. 如果ssizeCtl<0,说明有别的数组进行初始化操作
  2. 使用CAS修改sizectl的值为-1,表示本线程正在进行初始化
  3. 如果sizectl>0,则说明构造函数里面传入了初试容量,按这个值进行初始化容量
  4. 按默认16进行初始化
  5. 修改sizectl使其成为0.75倍的容量大小
  6. 返回数组

hash

  1. static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash 31个1
  2. static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
  3. int index = (n - 1) & hash // n为bucket的个数
  4. static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  5. return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
  6. }

采用Unsafe.getObjectVolatie()来获取,而不是直接用table[index]的原因跟ConcurrentHashMap的弱一致性有关。在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

get

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. if ((eh = e.hash) == h) {
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0)
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) {
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }
  • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  • 如果是红黑树那就按照树的方式获取值。
  • 就不满足那就按照链表的方式遍历获取值。