简介
concurrentHashMap是个线程安全的hashMap。
JDK7的ConcurrentHashMap
jdk7里使用的是数组segment[]来存储某个hash的值,在每个segment里又维护一个hashEntity[]来存储key,value的值,在多线程并发访问同个concurrentHashMap时,会锁住一个segment。这是使用了分段锁的概念。
HashTable
HashTable也是个线程安全的hashMap。它于ConcurrentHashMap的区别是使用HashTable时候会锁住整个HashTable。每个segment下的HashEntity又类似于HashTable。
HashTable由于使用时候需要锁住整个HashTable,所以性能上不如ConcurrentHashMap。但是一方面为了兼容老版本,另外一方面,HashTable的迭代器时强一致性,ConcurrentHashMap的迭代器时弱一致性,ConcurrentHashMap并不完全代替HashTable。
put(K key,V value)插入值
public V put(K key, V value) {return putVal(key, value, false);}
如果碰到key重复的,默认时替换。
final V putVal(K key, V value, boolean onlyIfAbsent) {//key和value都不能为空if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());//计算hash位置int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable(); //初始化table容器else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//通过hash值对应的数组下标得到第一个节点; 以volatile读的方式来读取table数组中的元素,保证每次拿到的数据都是最新的if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))////如果该下标返回的节点为空,则直接通过cas将新的值封装成node插入即可;如果cas失败,说明存在竞争,则进入下一次循环break; // no lock when adding to empty bin,插入头节点,结束循环}else if ((fh = f.hash) == MOVED) //moved=-1的时候,说明table正在扩容tab = helpTransfer(tab, f); //帮助扩容else {V oldVal = null;synchronized (f) { //锁住当前nodeif (tabAt(tab, i) == f) {//双重检查,//链式结构if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;//hash相等,且key相等 替换值if (e.hash == hash && ((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent) {e.val = value;//修改}break;}//循环到末尾未找到。新的值加在链表尾部Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}//如果是树形结构,执行树存放else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;//节点key重复if (!onlyIfAbsent)p.val = value;//修改值}}}}//添加成功if (binCount != 0) {//链表最大大于等于8 的话转成树形结构(红黑树)if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;//修改的,直接返回旧的值break;}}}addCount(1L, binCount);//map数据count+1return null;}
这段代码可以看出:
jdk8中数据时存在Node
node位空的时候会用cas去插入新的节点。
节点不为空的时候,会锁这个Node,而不是锁住整个table
第二个到第七个新key插入,会以链式插在单向链表得尾部
如果链表长度>=8的时候。链表会转换成红黑树。
扩容的时候,其他线程可以协助扩容
spread(key.hashCode())获取Node节点下标位置
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
initTable()初始化容器
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0) {Thread.yield(); // lost initialization race; just spin 获取锁失败,进入自旋}else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//cas锁,比较且交换。try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//默认长度是16@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;//边界值}break;}}return tab;}
初始化的时候会同过cas。保证只有一个线程会初始化table。其他线程cas失败的化,会通过yield让出时间片。自旋等待初始化线程初始化table结束
初始化会默认创建一个16个槽位的table。
并设置边界值sc = n - (n >>> 2);及槽位的0.75(扩容因子)
也可以在初始化ConcurrentHashMap的时候设置初始化槽位值
public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;}private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;}
addCount(1L, binCount).添加count。扩容判断
private final void addCount(long x, int check) {CounterCell[] as; long b, s;//counterCells不为空if ((as = counterCells) != null//或者修改BASECOUNT失败|| !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 //counterCells为空||(a = as[ThreadLocalRandom.getProbe() & m]) == null //随机取余为空||!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //修改槽位失败(并发)fullAddCount(x, uncontended);return;}if (check <= 1) //根节点添加,无需扩容return;s = sumCount(); //获取map的size}//检查是否需要扩容if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null && //map.size >= 扩容预伐值,table不为空(n = tab.length) < MAXIMUM_CAPACITY) { //小于最大容量int rs = resizeStamp(n);//其他线程正在扩容if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || //校验异常 sizeCtl 变化了||容结束了,不再有线程进行扩容sc == rs + MAX_RESIZERS || (nt = nextTable) == null || //帮助线程数已经达到最大 || 结束扩容了transferIndex <= 0) //转移状态变化了break;//cas锁 帮助扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//扩容else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}
ConcurrentHashMap为了防止并发,会维护一个baseCount 和一个CounterCell[]
第一次count的时候回去初始CounterCell
其他线程进来的时候会先去cas修改baseCount+1,如果失败的话,会cas CounterCell数组中的某一个元素让他+1.如果还是失败的话回去扩容这个CounterCell
计算count之后,大于阈值的话,如果有其他线程在扩容的话,会尝试协助扩容
private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonempty 如果最后一个槽非空,为truefor (;;) {CounterCell[] as; CounterCell a; int n; long v;if ((as = counterCells) != null && (n = as.length) > 0) {//counterCells已经初始化了,且counterCells已经被赋值了if ((a = as[(n - 1) & h]) == null) {//某个counterCell槽位没有值if (cellsBusy == 0) { // Try to attach new Cell 尝试附加新的元素CounterCell r = new CounterCell(x); // Optimistic create 乐观创建if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//加锁boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;//初始化槽位created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty 槽位不为空了}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash 之后继续重复else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//给a槽位+1break;else if (counterCells != as || n >= NCPU)collide = false; // At max size or stale 最大扩容else if (!collide)collide = true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {if (counterCells == as) {// Expand table unless staleCounterCell[] rs = new CounterCell[n << 1];//扩容for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//case保证只有一个线程初始化counterCellsboolean init = false;try { // Initialize tableif (counterCells == as) {CounterCell[] rs = new CounterCell[2];//传建一个长度为2的CounterCellrs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;//释放cas锁,volatile解决可见性}if (init)break;}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))//上面都失败的话回去尝试更新baseCount。break; // Fall back on using base}}
并发的时候,第一个线程回去cas 去初始化长度为2的CounterCell[]并随机将值放入一个槽位,
其他线程自旋,并cas尝试更新baseCount
CounterCell初始化好之后,线程会随机cas更新某个槽位
发现某个槽位没有实例化,则cas锁去实例化这个槽位
这些cas volatile修饰的cellsBusy值。保证只有一个线程会去初始化/修改/实例化某个槽位或者值。也同时保证可见性
sumCount()获取map的size
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
计数size的时候,会将baseCount的值+遍历所有的CounterCell里的值
resizeSteam(int n)获取当前扩容的线程数
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));}
helpTransfer(Node[] tab, Node f)协助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//cas去确定扩容线程满了没transfer(tab, nextTab);break;}}return nextTab;}return table;}
transfer(tab, nextTab)扩容或协助扩容的真正方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围//新的table 未创建。创建新的tableif (nextTab == null) { // initiating 初始化新的tabletry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE; //扩容失败 sizeCtl,该容器不再扩容return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;//转发节点,占位用的。其他线程发现这个fwd占位符者跳过这个节点ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTab 在提交nextTab之前确保清除for (int i = 0, bound = 0;;) {//迭代去扩容Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;// 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)// 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。// 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。if (--i >= bound || finishing)advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进// 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。else if ((nextIndex = transferIndex) <= 0) {// 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了// 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断i = -1;advance = false;}// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound; // 这个值就是当前线程可以处理的最小当前区间最小下标i = nextIndex - 1;// 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;//结束扩容sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {//扩容某个槽位if (tabAt(tab, i) == f) {Node<K,V> ln, hn;//定义一个高位,一个低位if (fh >= 0) {//链式int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {//遍历int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;//找出lastRun节点}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);//使用头插法放地位elsehn = new Node<K,V>(ph, pk, pv, hn);//使用头插法放高位}setTabAt(nextTab, i, ln);//地位setTabAt(nextTab, i + n, hn);//高位setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {//红黑树TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) ://判断需不需要恢复链式(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
扩容前会细分一个线程扩容范围将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
并创建一个原来table两倍大的容器,并重原来容器的尾部开始向前扩容。
扩容某个槽位会先synchronized锁住槽位,防止并发。
对于链表会先定义高地位两个链表,遍历找到lastRun的节点。在重新计算p.hash & n计算新的hash槽位是在地位还是高位,再次遍历采用头插入法将将节点分高低链表。
低位的链表放在新table原来位置,高位的放在新table原来槽位*2的位置
红黑树也同样重新hash分高低位链表,如果单个链表的长度小于8,则恢复链式结构
get(k key)获取值
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());//获取hashif ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {//(n-1)&h 的下标有值if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))//根节点有值,直接取return e.val;}else if (eh < 0)//表示正在扩容,那么就从forwardingNode里查找return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {//遍历查找if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}
