1、ConcurrentHashMap-底层数据结构
1、ConcurrentHashMap-底层数据结构: 1-1、jdk1.7: 采用Segment + HashEntry的方式进行实现,采取分段锁来保证安全性。 1-1-1、Segment 的结构和 HashMap 类似,是一个数组和链表结构,继承了ReentrantLock类,是 ReentrantLock 重入锁,每个Segment 对应一把锁。 1-1-2、Segment的优点:如果多个线程访问不同的Segment,实际是没有冲突的 ,这与jdk8是类似的(jdk8是把锁加在每个链表/红黑树的头结点对象上,jdk7是把锁加在每个Segment对象上) 1-1-3、缺点:Segment 数组默认大小为16,这个容量初始化之后就不能改变了,并且不是懒惰初始化的。 1-1-4、HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,一个 Segment 里包含一个 HashEntry 数组。 1-1-5、源码: final Segment<K,V>[] segments; static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; } 1-2、jdk1.8: 摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作。 1-2-1、源码: transient volatile Node<K,V>[] table;
1-1、ConcurrentHashMap-底层数据结构-jdk1.7-示意图
1-2、ConcurrentHashMap-底层数据结构-jdk1.8-示意图
1-3、ConcurrentHashMap-底层数据结构-jdk1.8-基本属性
1、ConcurrentHashMap-jdk1.8-基本属性: //node数组最大容量:1 << 30 = 1*2^30=1073741824 private static final int MAXIMUM_CAPACITY = 1 << 30; //默认初始值,必须是2的幂数 private static final int DEFAULT_CAPACITY = 16; //数组可能最大值,需要与toArray()相关方法关联 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //并发级别,遗留下来的,为兼容以前的版本 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //负载因子 private static final float LOAD_FACTOR = 0.75f; //链表转红黑树阀值,> 8 链表转换为红黑树 static final int TREEIFY_THRESHOLD = 8; //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo)) static final int UNTREEIFY_THRESHOLD = 6; //转化为红黑树的表的最小容量 static final int MIN_TREEIFY_CAPACITY = 64; //每次进行转移的最小值 private static final int MIN_TRANSFER_STRIDE = 16; //生成sizeCtl所使用的bit位数 private static int RESIZE_STAMP_BITS = 16; //2^15-1,help resize的最大线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //32-16=16,sizeCtl中记录size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //forwarding nodes的hash值 static final int MOVED = -1; //树根节点的hash值 static final int TREEBIN = -2; //ReservationNode的hash值 static final int RESERVED = -3; //可用处理器数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); //存放node的数组 transient volatile Node<K,V>[] table; /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容 *当为0时:代表当时的table还没有被初始化 *当为正数时:表示初始化或者下一次进行扩容的大小 */ private transient volatile int sizeCtl; // 下一个表,过渡表,用于在扩容时将table表中的结点转移到nextTable中。 private transient volatile Node<K,V>[] nextTable; // 基本计数 private transient volatile long baseCount; // 扩容下另一个表的索引 private transient volatile int transferIndex; // 旋转锁 private transient volatile int cellsBusy; // counterCell表 private transient volatile CounterCell[] counterCells;
1-4、ConcurrentHashMap-底层数据结构-jdk1.8-关键类
1、ConcurrentHashMap-jdk1.8-关键类: 1-1、Node,静态内部类,保存 key,value 及 key 的 hash 值的数据结构。 //它对value和next属性设置了volatile同步锁(与JDK7的Segment相同),它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。 static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } } 1-2、TreeNode,静态内部类, /** 当链表长度过长的时候,会转换为TreeNode。它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类。 */ static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; //TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问 TreeNode(int hash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } Node<K,V> find(int h, Object k) { return findTreeNode(h, k, null); } /** * Returns the TreeNode (or null if not found) for the given key * starting at given root. */ final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { TreeNode<K,V> p = this; do { int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } } 1-3、TreeBin类,静态内部类, /** 并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点. 在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象. 这个类还带有了读写锁。 */ static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock /** * Tie-breaking utility for ordering insertions when equal * hashCodes and non-comparable. We don't require a total * order, just a consistent insertion rule to maintain * equivalence across rebalancings. Tie-breaking further than * necessary simplifies testing a bit. */ static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a.getClass().getName(). compareTo(b.getClass().getName())) == 0) d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1); return d; } /** * Creates bin with initial set of nodes headed by b. */ TreeBin(TreeNode<K,V> b) { //指定了它的hash值为TREEBIN常量 super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); } //........ } 1-4、ForwardingNode类,静态内部类, /** 一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1。 find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。 */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } } 1-5、Traverser类 Traverser类主要用于遍历操作,其子类有BaseIterator、KeySpliterator、ValueSpliterator、EntrySpliterator四个类,BaseIterator用于遍历操作。KeySplitertor、ValueSpliterator、EntrySpliterator则用于键、值、键值对的划分。 1-6、CollectionView类 CollectionView抽象类主要定义了视图操作,其子类KeySetView、ValueSetView、EntrySetView分别表示键视图、值视图、键值对视图。对视图均可以进行操作。 1-7、Segment类 Segment类在JDK1.8中与之前的版本的JDK作用存在很大的差别,JDK1.8下,其在普通的ConcurrentHashMap操作中已经没有失效,其在序列化与反序列化的时候会发挥作用。 1-8、CounterCell CounterCell类主要用于对baseCount的计数。
2、ConcurrentHashMap-Unsafe|CAS
1、Unsafe|CAS: 1-1、ConcurrentHashMap类中,U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。 1-2、ConcurrentHashMap类中,unsafe静态块,控制了一些属性的修改工作,比如最常用的SIZECTL 。 1-3、ConcurrentHashMap类中,添加了Unsafe实例,主要用于反射获取对象相应的字段。
3、ConcurrentHashMap-构造方法
1、ConcurrentHashMap-构造方法: //无参构造函数用于创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射。 /** concurrencyLevel: 1.表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。 2.比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16; 3.也就是Segment的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,如果超过这个值,设置为这个值。 */ public ConcurrentHashMap() { } //构造函数用于创建一个带有指定初始容量、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射。 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) // 初始容量小于0,抛出异常 throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));// 找到最接近该容量的2的幂次方数 // 初始化 this.sizeCtl = cap; } //该构造函数用于创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射。 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } //该构造函数用于创建一个带有指定初始容量、加载因子和并发级别的新的空映射。 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; } //该构造函数用于构造一个与给定映射具有相同映射关系的新映射。 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }2、构造函数-小结: 2-1、构造函数而言,会根据输入的initialCapacity的大小来确定一个最小的且大于等于initialCapacity大小的2的n次幂,如initialCapacity为15,则sizeCtl为16,若initialCapacity为16,则sizeCtl为16。若initialCapacity大小超过了允许的最大值,则sizeCtl为最大值。 2-2、值得注意的是,构造函数中的concurrencyLevel参数已经在JDK1.8中的意义发生了很大的变化,其并不代表所允许的并发数,其只是用来确定sizeCtl大小,在JDK1.8中的并发控制都是针对具体的桶而言,即有多少个桶就可以允许多少个并发数。
4、ConcurrentHashMap-添加元素
1、添加元素:.put()方法,采用 CAS + synchronized 实现并发插入或更新操作。2、源代码: //.put()方法,底层调用了.putVal()进行数据的插入 public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //键或值为空,抛出异常 if (key == null || value == null) throw new NullPointerException(); // 键的hash值经过计算获得hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { // 无限循环,何时插入成功 何时跳出 Node<K,V> f; int n, i, fh; // 表为空或者表的长度为0 if (tab == null || (n = tab.length) == 0) tab = initTable();// 初始化表 // 表不为空并且表的长度大于0,并且该桶不为空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 比较并且交换值,如tab的第i项为空则用新生成的node替换 if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 该结点的hash值为MOVED, 为表连接点时,需要进行整合表的操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 进行结点的转移(在扩容的过程中) else { V oldVal = null; // 加锁同步 synchronized (f) { // 找到table表下标为i的节点 if (tabAt(tab, i) == f) { // 该table表中该结点的hash值大于0 if (fh >= 0) { // binCount赋值为1 binCount = 1; // 无限循环 for (Node<K,V> e = f;; ++binCount) { K ek; // 结点的hash值相等并且key也相等 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 保存该结点的val值 oldVal = e.val; // 进行判断 if (!onlyIfAbsent) // 将指定的value保存至节点,即进行了节点值的更新 e.val = value; break; } // 保存当前节点 Node<K,V> pred = e; // 当前结点的下一个结点为空,即为最后一个结点 if ((e = e.next) == null) { // 新生一个结点并且赋值给next域 pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 节点为红黑树结点类型 else if (f instanceof TreeBin) { Node<K,V> p; // binCount赋值为2 binCount = 2; // 将hash、key、value放入红黑树 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 保存结点的val oldVal = p.val; if (!onlyIfAbsent) // 赋值结点value值 p.val = value; } } } } // binCount不为0 if (binCount != 0) { // 如果binCount大于等于转化为红黑树的阈值 if (binCount >= TREEIFY_THRESHOLD) // 进行转化 treeifyBin(tab, i); if (oldVal != null) // 旧值不为空 // 返回旧值 return oldVal; break; } } } // 增加binCount的数量 addCount(1L, binCount); return null; } 2-1、.putVal()方法,执行流程: 2-1-1、判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤2-1-2。 2-1-2、计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤2-1-3。 2-1-3、根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤2-1-4。 2-1-4、若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤2-1-5。 2-1-5、对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤2-1-6。 2-1-6、若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。 2-2、初始化表.initTable()方法源码分析: //对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 表为空或者表的长度为0 // 如果这个值〈0,表示其他线程正在进行初始化,就放弃这个操作。进行线程让步等待 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 比较sizeCtl的值与sc是否相等,相等则用-1替换,防止其他线程进入。 try { if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0 // sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") // 新生结点数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 赋值给table table = tab = nt; // sc为n * 3/4==>n-(n/2)/2=3/4*n=0.75*n sc = n - (n >>> 2); } } finally { // 设置sizeCtl的值,初始化数组后,将sizeCtl的值改为0.75*n。 sizeCtl = sc; } break; } } // 返回table表 return tab; } 2-3、.tabAt()方法源码分析:原子操作。 //原子操作,用于对指定位置的节点进行操作。 //获得在i位置上的Node节点,即返回table数组中下标为i的结点 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { //getObjectVolatile的第二项参数为下标为i的偏移地址。 //U.getObjectVolatile(),直接获取指定内存的数据,保证了每次拿到数据都是最新的。 return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } 2-4、.casTabAt()方法源码分析:原子操作。 /** 利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少 在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改 因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果.有点类似于SVN */ //用于比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } 2-5、.setTabAt()方法源码分析:原子操作 //利用volatile方法设置节点位置的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); } 2-6、.helpTransfer()方法源码分析: //用于在扩容时将table表中的结点转移到nextTable中。 //是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // table表不为空并且结点类型使ForwardingNode类型,并且结点的nextTable不为空 int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 条件判断 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) // break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 比较并交换 // 将table的结点转移到nextTab中,具体见扩容源码章节 transfer(tab, nextTab); break; } } return nextTab; } return table; } 2-7、.putTreeVal()方法源码分析: //该方法用于将指定的hash、key、value值添加到红黑树中,若已经添加了,则返回null,否则返回该结点。 final TreeNode<K,V> putTreeVal(int h, K k, V v) { Class<?> kc = null; boolean searched = false; for (TreeNode<K,V> p = root;;) { int dir, ph; K pk; if (p == null) { first = root = new TreeNode<K,V>(h, k, v, null, null); break; } else if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { if (!searched) { TreeNode<K,V> q, ch; searched = true; if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) return q; } dir = tieBreakOrder(k, pk); } TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode<K,V> x, f = first; first = x = new TreeNode<K,V>(h, k, v, f, xp); if (f != null) f.prev = x; if (dir <= 0) xp.left = x; else xp.right = x; if (!xp.red) x.red = true; else { lockRoot(); try { root = balanceInsertion(root, x); } finally { unlockRoot(); } } break; } } assert checkInvariants(root); return null; } 2-8、.treeifyBin()方法源码分析: //用于将桶中的数据结构转化为红黑树,其中,值得注意的是,当table的长度未达到阈值时,会进行一次扩容操作,该操作会使得触发treeifyBin操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。 private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { // 表不为空 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // table表的长度小于最小的长度 // 进行扩容,调整某个桶中结点数量过多的问题(由于某个桶中结点数量超出了阈值,则触发treeifyBin) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 桶中存在结点并且结点的hash值大于等于0 synchronized (b) { // 对桶中第一个结点进行加锁 if (tabAt(tab, index) == b) { // 第一个结点没有变化 TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { // 遍历桶中所有结点 // 新生一个TreeNode结点 TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) // 该结点前驱为空 // 设置p为头结点 hd = p; else // 尾节点的next域赋值为p tl.next = p; // 尾节点赋值为p tl = p; } // 设置table表中下标为index的值为hd setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } } 2-9、.addCount()方法源码分析: //该方法主要完成binCount的值加1的操作。 private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // counterCells不为空或者比较交换失败 CounterCell a; long v; int m; // 无竞争标识 boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //....... }
5、ConcurrentHashMap-获取元素
1、获取元素:.get()方法2、源代码: //get函数根据key的hash值来计算在哪个桶中,再遍历桶,查找元素,若找到则返回该结点,否则,返回null。 public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算key的hash值 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空 if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等 if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等 // 返回值 return e.val; } else if (eh < 0) // 结点hash值小于0 // 在桶(链表/红黑树)中查找 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 对于结点hash值大于0的情况 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
6、ConcurrentHashMap-删除元素
1、删除元素:.remove()方法,内部调用了.replaceNode()方法2、源代码: public V remove(Object key) { return replaceNode(key, null, null); } //对remove函数提供支持,remove函数底层是调用的replaceNode函数实现结点的删除。 final V replaceNode(Object key, V value, Object cv) { // 计算key的hash值 int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { // 无限循环 Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) // table表为空或者表长度为0或者key所对应的桶为空 // 跳出循环 break; else if ((fh = f.hash) == MOVED) // 桶中第一个结点的hash值为MOVED // 转移 tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { // 加锁同步 if (tabAt(tab, i) == f) { // 桶中的第一个结点没有发生变化 if (fh >= 0) { // 结点hash值大于0 validated = true; for (Node<K,V> e = f, pred = null;;) { // 无限循环 K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 结点的hash值与指定的hash值相等,并且key也相等 V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { // cv为空或者与结点value相等或者不为空并且相等 // 保存该结点的val值 oldVal = ev; if (value != null) // value为null // 设置结点value值 e.val = value; else if (pred != null) // 前驱不为空 // 前驱的后继为e的后继,即删除了e结点 pred.next = e.next; else // 设置table表中下标为index的值为e.next setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { // 为红黑树结点类型 validated = true; // 类型转化 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { // 根节点不为空并且存在与指定hash和key相等的结点 // 保存p结点的value V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { // cv为空或者与结点value相等或者不为空并且相等 oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) // 移除p结点 setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) // baseCount值减一 addCount(-1L, -1); return oldVal; } break; } } } return null; }
7、ConcurrentHashMap-扩容
1、扩容操作:.tranfer()方法-它支持多线程进行扩容操作,而并没有加锁。2、源代码: /** * 将每个bin中的节点移动和/或复制到新表中。 * 它支持多线程进行扩容操作,而并没有加锁。 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位 boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍 return; } //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果遍历到的节点为空 则放入ForwardingNode指针 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心:遍历到ForwardingNode节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //节点上锁 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //如果fh>=0 证明这是一个Node节点 if (fh >= 0) { int runBit = fh & n; //以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列 Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就可以执行i--操作 advance = true; } //对TreeBin对象进行处理 与上面的过程类似 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //构造正序和反序两个链表 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果扩容后已经不再需要tree的结构 反向转换为链表结构 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就可以执行i--操作 advance = true; } } } } } }3、扩容分析-小结: 3-1、整个扩容操作分为两个部分: 3-1-1、是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到; 3-1-2、是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。 3-2、扩容-单线程:遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素。 3-2-1、如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点; 3-2-2、如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上。 3-2-3、如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上。 3-2-4、遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。 3-3、扩容-多线程: 3-3-1、如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。 3-3-2、多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。
7-1、ConcurrentHashMap-扩容-多线程-逻辑图

https://www.cnblogs.com/williamjie/p/9099861.htmlhttps://www.cnblogs.com/banjinbaijiu/p/9147434.html