JDK1.7
整个ConcurrentHashMap由一个个Segment组成,Segment代表部分或者一段的意思,所以很多地方都会将其描述为分段锁。Segment通过继承ReentrantLock来进行加锁,所以每次需要加锁的操作锁住的是一个Segment,这样只要保证Segment是线程安全的,也就实现了全局的线程安全。
每个Segment就是一个哈希数组,对应每个槽位中是一个个的链表。
concurrencyLevel:并行级别、并发数、Segment数。默认是16,也就是说ConcurrentHashMap有16个Segment,所以理论上,最多可以同时支持16个线程并发写,只要它们的操作分布在不同的Segment上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
初始化
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;// 计算并行级别,保持并行级别是2的n次方while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}// 使用默认值,currencyLevel为16,sshift为4// 那么segmentShift=28,segmentMask=15this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;// initialCapacity设置整个map的初始大小// 这里根据initialCapacity计算Segment数组中每个位置可以分到的大小// 如initialCapacity=64,那么每个Segment可以分到4个int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;// 默认MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的// 因为这样的话,对于具体的槽上,// 插入一个元素不至于扩容,插入第二个的时候才会扩容int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// 创建segment数组,并创建数组的第一个元素segment[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];// 往数组写入segment[0]UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;}
初始化完成,我们得到了一个Segment数组。
就当是new ConcurrentHashMap()无参构造函数进行初始化的,那么初始化完成以后:
- Segment数组长度为16,不可以扩容;
- Segment[i]的默认大小为2,负载因子是0.75,得出初始阈值为1.5,也就是以后插入第一个元素不会触发扩容。插入第二个会进行第一次扩容;
- 这里初始化了 segment[0],其他位置还是 null;
- 当前移位数 segmentShift = 28,掩码 segmentMask = 15。
put过程
流程如下:
(1)计算key的hash值,再计算Segment数组下标,找到相应的Segment。
(2)如果该Segment未初始化,则采用循环CAS操作的方式进行初始化。
(3)执行插入操作:
- 通过tryLock获取该Segment的独占锁,失败则通过scanAndLockForPut获取锁。
- 计算数组下标,使用头插法插入节点,如果节点为空则执行初始化。
- 如果添加节点后该Segment超过阈值,那么对这个Segment进行扩容。
(4)Segment扩容:
- 创建新数组,扩容两倍。
- 进行数据迁移,将原数组i处的链表移到新数组i或者i+oldCap的位置。
将新数组赋值给原数组。
public V put(K key, V value) {Segment<K, V> s;if (value == null) {throw new NullPointerException();}// 1. 计算key的hash值int hash = hash(key);// 2. 根据hash值找到Segment数组中的位置j// hash是32位,无符号右移segmentShift(28)位,剩下高4位// 然后和segmentMask(15)做一次与操作,也就是说j是hash值的高位,也就是槽的数组下标int j = (hash >>> segmentShift) & segmentMask;// 初始化只初始化了segment[0],其他位置还是null// ensureSegment(j)对segment[j]进行初始化if ((s = (Segment<K, V> UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)s = ensureSegment(j);// 3. 插入到槽s中return s.put(key, hash, value, false);}
第一层皮很简单,根据hash值很快就能找到相应Segment,之后就是Segment内部的put操作了。
Segment内部是由数组+链表组成的。final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 先通过tryLock获取该segment的独占锁,node初始化为空// 如果失败则通过scanAndLockForPut获取锁,node在scanAndLockForPut中初始化HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);V oldValue;try {// segment内部的数组HashEntry<K,V>[] tab = table;// 计算放置的数组下标int index = (tab.length - 1) & hash;// 获取到该数组相应位置的链表头节点HashEntry<K,V> first = entryAt(tab, index);// 遍历链表for (HashEntry<K,V> e = first;;) {if (e != null) {K k;// 存在相同的keyif ((k = e.key) == key || (e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {// 覆盖旧值e.value = value;++modCount;}break;}// 迭代e = e.next;}else {// 如果node不为空,将其设置为链表头节点if (node != null)node.setNext(first);// 如果node为空,初始化并将其设置为链表头节点elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;// 如果超过了该segment的阈值,那么这个segment需要进行扩容if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);// 没有达到阈值,将node放到数组tab的index位置,// 其实就是将新的节点设置成原链表的头节点elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {// 解锁unlock();}return oldValue;}
插槽操作通过独占锁的保护保证线程安全,先找到数组相应位置的链表头节点,为空则初始化并设置头节点,超过segment的阈值则需要扩容。
其中的还有几步关键操作:
(1)初始化槽
ConcurrentHashMap初始化的时候会初始化第一个槽segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。这里需要考虑并发,对与每个线程先检查该槽是否被初始化,然后通过循环CAS操作直到成功为止。private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {// 这里使用当前segment[0]处的数组长度和负载因子来初始化segment[k]// 使用当前是因为segment[0]可能已经扩容了Segment<K,V> proto = ss[0];int cap = proto.table.length;float lf = proto.loadFactor;int threshold = (int)(cap * lf);// 初始化segment[k]内部的数组HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];// 再次检查该槽是否被其他线程初始化if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);// while+CAS设值成功后退出while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;}
(2)获取写锁
这个方法有两个出口,一个是tryLock成功,循环终止;另一个是通过MAX_SCAN_RETRIES控制自旋次数,失败则进入lock()方法,阻塞等待,直到成功拿到独占锁。private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {// 获取数组相应位置的头节点HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating node// 循环获取锁while (!tryLock()) {HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {// 链表为空或者tryLock失败if (node == null) // 初始化node节点node = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;else// 迭代e = e.next;}// 如果重试次数如果超过MAX_SCAN_RETRIE(单核1多核64)// 那么放弃抢锁,进入到阻塞队列等待锁// lock()是阻塞方法,直到获取锁后返回else if (++retries > MAX_SCAN_RETRIES) {lock();break;}// 如果有新的元素进入链表并成为新的头节点// 那么重新获取头节点else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}
(3)扩容
segment数组不能扩容,扩容时segment数组某个位置内部的数组HashEntry[]进行扩容,扩容后容量加倍。这个方法不需要考虑并发,因为被调用的时候已经持有该segment的独占锁。 // node是扩容时需要添加的新节点private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;// 扩容两倍int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);// 创建新数组HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];// 新的掩码,如从16扩容到32,那么sizeMask为31,对应二进制 ‘000...00011111’int sizeMask = newCapacity - 1;// 遍历数组// 将原数组i处的链表拆分到新数组位置i和i+oldCap两个位置for (int i = 0; i < oldCapacity ; i++) {// e是链表的头节点HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;// 计算放置的数组下标int idx = e.hash & sizeMask;// 该位置处为独狼if (next == null)newTable[idx] = e;else { // Reuse consecutive sequence at same slot// e是链表表头HashEntry<K,V> lastRun = e;// idx 是当前链表的头结点 e 的新位置int lastIdx = idx;// 下面这个for循环会找到一个lastRun节点,这个节点之后的所有元素是将要放到一起的for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}// 将lastRun及其之后的所有节点组成的这个链表放到lastIdx这个位置newTable[lastIdx] = lastRun;// 下面的操作是处理lastRun之前的节点,// 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 将新节点放到新数组两个链表之一的头部int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;// 赋值给原数组table = newTable;}
put操作的线程安全性:
- 初始化槽,使用while循环+CAS操作来初始化Segment中的数组。
- 头插法。所以,这个时候,get操作在链表遍历的过程以及该到了中间,是不会影响的。另一个并发问题是get操作在put之后,需要保证刚刚插入表头的节点被读取,这个依赖于setEntryAt方法中使用的UNSAFE.putOrderedObject。
- 扩容操作。扩容是新创建了数组,然后进行数据迁移,最后将newTable赋值给table。所以,如果get操作此时也在进行,那么也没关系,如果get先行,那么就是在旧的table进行查询;如果put先行,那么volatile可以保证可见性。
get过程
get过程比较简单:
(1)计算hash值
(2)根据hash值找到相应的Segment
(3)找到Segment相应位置的链表进行遍历
public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;// 1. 计算hash值int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// 2. 根据hash值找到对应的segmentif ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {// 3. 找到segment内部素组相应位置的链表,遍历for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;}
JDK1.8

put过程
流程如下:
(1)计算key的hash值。
(2)通过无限循环+CAS插入节点。
(3)如果数组为空,进行数组初始化。
(4)如果数组非空,则计算hash值对应下标,得到相应位置的第一节点。
- 如果第一节点为空,用一次CAS操作将这个新节点放在该位置即可;
- 如果CAS操作失败,说明存在并发操作,进入下一轮循环。
(6)如果hash值为MOVED,说明在进行扩容操作,然后帮助数据迁移。
(7)如果第一节点不为空,对第一节点加synchonized锁。
- 如果hash值大于0,说明是链表。直接遍历链表,若存在相同key则进行值覆盖,然后break。
- 如果节点类型为树节点,则调用红黑树的插值方法插入新节点。
(8)判断链表长度是否超过扩容阈值8并且数组长度大于64,是则进行树化操作。
(9)判断是否产生了值覆盖,是则返回旧值,否则返回null。
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 计算hash值int hash = spread(key.hashCode());// 用于记录相应链表的长度int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 如果数组为空,则进行数组初始化if (tab == null || (n = tab.length) == 0)tab = initTable();// 如果数组不为空,则找到hash值对应的数组下标,得到第一个节点// 如果第一个节点为空else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 用一次CAS操作将这个新值放在该位置即可// 如果CAS失败,说明存在并发操作,进入下一轮循环if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// 如果hash值为MOVED,说明在扩容else if ((fh = f.hash) == MOVED)// 帮助数据迁移tab = helpTransfer(tab, f);// 如果第一个节点不为空else {V oldVal = null;// 对第一个节点加锁synchronized (f) {if (tabAt(tab, i) == f) {// hash值大于0,说明是链表if (fh >= 0) {// 记录链表长度binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// 发现相等的key则进行值覆盖,然后breakif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 到达链表最末端,将新值放到链表的最后面Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 第一个节点类型位红黑树节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;// 调用红黑树的插值方法插入新节点if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 链表长度超过扩容阈值8,进行树化操作if (binCount >= TREEIFY_THRESHOLD)// 如果当前数组长度<64,进行数组扩容,不会树化treeifyBin(tab, i);// 存在值覆盖,返回旧值if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;}
其中的还有几步关键操作:
(1)数据初始化
初始化一个合适大小的数组,然后设置sizeCtl。
并发问题是通过对sizeCtl进行CAS操作来控制的。
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 已经被其他线程初始化if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// 通过CAS操作将sizeCtl设置为-1,代表抢锁成功else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {// DEFAULT_CAPACITY默认初始容量是16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 将这个数组赋值给table,table是volatile的table = tab = nt;// 如果n为16的话,那么这里sc=12// 其实就是0.75 * nsc = n - (n >>> 2);}} finally {// 设置sizeCtl为scsizeCtl = sc;}break;}}return tab;}
(2)链表转红黑树
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {// MIN_TREEIFY_CAPACITY 为 64// 所以,如果数组长度小于64的时候,其实也就是32或者16或者更小的时候,会进行数组扩容if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1);// b是头节点else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {// 加锁synchronized (b) {if (tabAt(tab, index) == b) {// 遍历链表,建立一颗红黑树TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}// 将红黑树设置到数组相应位置中setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}
(3)扩容
// 这里的size已经加倍private final void tryPresize(int size) {// 取size的1.5倍,再加1,再往上取最近的2的n次方int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;// 这个if分支和之前说的初始化数组的代码基本上是一样的if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {int rs = resizeStamp(n);if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 用CAS将sizeCtl加1,然后执行transferif (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 调用transfer方法进行数据迁移else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}}
(4)数据迁移
方法很长,主要是将原来的tab元素迁移到新的nextTab数组中
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// stride在单核下直接等于n,多核模式下为 (n >>> 3) /NCPU,最小值是16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
