先了解以下 Hashtable,故所周知,Hashtable 是线程安全的
public synchronized V put(K key, V value);public synchronized V get(Object key);public synchronized V remove(Object key);
所以在多线程操作下不会出现问题,那么问题来了,由于是锁当前的对象,那么效率就会变得很低,有没有一种既能保证效率,又能线程安全的 HashMap 呢?就是 ConcurrentHashMap 了
分段锁
ConcurrentHashMap 使用了分段锁的设计思路,它使用了 Segment[] 表示数组,而 Segment 中又定义了了一个 HashEntry[] 来表示一段小的数组,HashEntry 和 HashMap 中的 Entry 是一样的,这样就实现了分段的概念。
ConcurrentHashMap 的默认构造函数如下:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel);
- initialCapacity:表示 HashEntry 的数量,默认 16
- loadFactor:加载因子,默认 0.75
- concurrencyLevel:并发等级,默认 16,表示有多少个 Segment
如果 initialCapacity = 32,concurrencyLevel = 16,那么每个 Segment 中的 HashEntry[] 的长度就为 2 ,
即:每个 Segment 中 HashEntry 的数量 = initialCapacity / concurrencyLevel
**
如果是按默认 initialCapacity = 16,concurrencyLevel = 16,的话,HashEntry[] 就等于 2,这是默认的,也就是说 HashEntry[] 最小等于 2
扩容
先扩容 Segment 中的 HashEntry[]
UNSAFE
private static sun.misc.Unsafe UNSAFE;private static int i;static {try {Field field = Unsafe.class.getDeclaredField("theUnsafe");field.setAccessible(true);UNFAFE = (Unsafe) field.get(null);} catch(Exception ex) {ex.printStackTrace();}}i = UNSAFE.objectFieldOffset(Person.class.getDeclaredField("i"));UNSAFE.compareAndSwapInt(person, I_OFFSET, person.i, person.i++);UNSAFE.getIntVolatile(person, I_OFFSET);
源码解析
// initialCapacity表示ConcurrentHashMap初始化的数组的容量// loadFactor表示加载因子// concurrencyLevel表示并发数,意思是一个ConcurrentHashMap对象支持几个线程同时操作public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {// loadFactor不能大于0,initialCapacity不能小于0,concurrencyLevel不能小于或等于0if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();// 并发数最大为1 << 16if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching arguments// 找到一个大于等于concurrencyLevel的2的幂次方数// 如果concurrencyLevel为17,那么ssize=32, sshift=5int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}// 以下两个参数与计算segment数组下标有关系// segmentShift表示hash值要右移的位数,int类型为32位,一个hash值右移了segmentShift位后// 就表示只剩下了hash值的高sshift位// segmentMask就是用来进行&操作的this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;// 数组容量限制if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;// 数组容量除以ssize,表示指定的每个segment的容量,然后向上取整int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;// segment的容量最小为2,然后取大于等于c的2的幂次方数int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// create segments and segments[0]// 创建s0,并把它设置到segments数组中的第0个位置// Segment就是一个小型的HashMap,所以也有加载因子,阈值,Entry数组Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];// 通过UNSAFE将ss也就是segments数组的第0个位置设置为s0;UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;}
public V put(K key, V value) {Segment<K,V> s;// value不能为空if (value == null)throw new NullPointerException();// 根据key计算hash值int hash = hash(key);// 根据hash值计算segments数组的下标,hash值只保留高sshift位,可是为什么要右移?int j = (hash >>> segmentShift) & segmentMask;// 判断segments数组的第j个位置是否为null,如果为null则生成一个segment对象if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j);// 有了segment对象之后,就向segment对象中设置key,valuereturn s.put(key, hash, value, false);}
// 确保segments数组的第k个位置是否为null,如果为null则在该位置生成一个segment对象private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;// 判断segments数组的第k个位置是否为null,如果为null则生成一个segment对象if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {// 以segments数组的中的0个位置的segment对象作为原型,接下来将以原型作为参照来生成新的segment对象Segment<K,V> proto = ss[0]; // use segment 0 as prototype// 通过从原型中获取参数int cap = proto.table.length;float lf = proto.loadFactor;int threshold = (int)(cap * lf);HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];// 再次获取segments数组中的第k个位置的segment对象是否为null// 如果为null则先生成一个新的Segment对象// 然后使用CAS来将新的Segment对象设置到segments数组中去if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck//Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;}
// 如果第一次tryLock的时候就成功了获取到锁了,那么node为null// 如果第一次tryLock的如果没有成功,那么先判断hash值对应的数组下标位置是否有值,// 如果没值则会生成一个HashEntry对象,next属性为null并赋值给node// 如果有值则先遍历当前链表,找到key相同的元素,如果没有相同的key,最终那么也会生成一个HashEntry对象,并赋值给node// 以上步骤相当于寻找当前Segment对象中是否存在key相同的元素,这里是基于效率来考虑:// 反正要加锁,如果一开始没加上,就先不阻塞,先去遍历看是否要生成一个HashEntry对象,如果要则先生成// 先完成了以上步骤之后,再重试加锁:// 最大重试次数跟cpu核数相关,先理解为有多次重试// 在重试的过程中,偶数次重试的时候会去检查一下链表头元素,因为如果链表头元素发生了改变就表示当前位置// 有元素添加进来了,则重试次数重新开始计数// 当超过了最大重试次数之后就开始加锁(加不到则阻塞)// 总结,该方法的目的是使用自旋的方式去加锁,自旋的次数是不确定的,自旋分为遍历链表慢速自旋和快速自旋,自旋规则是这样的:// 如果当前位置上没有元素,则生成新node,然后快速自旋MAX_SCAN_RETRIES次// 如果当前位置上有元素,则遍历链表进行慢速自旋,如果发现链表上有key相同的,则快速自旋MAX_SCAN_RETRIES次// 如果当前位置上有元素,则遍历链表进行慢速自旋,遍历完尾结点之后,则快速自旋MAX_SCAN_RETRIES次// 在快速自旋的过程中,如果发现当前位置上的头结点发生了改变,则重新进入自旋的过程中,为什么要这样呢?// 自旋的时候遍历链表的目的就是看要不要生成一个新的node,那既然都已经生成了,为什么链表头发生了变化还要重新开始自旋呢?// 只能理解为,链表发生了变化就要重新开始自旋,也就是延长自旋时间,尽量不直接使用lock加锁,因为使用lock加锁后就只能等其他线程释放后继续往下走了private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {// 根据key的hash值找到Segment对象内部的Entry数组中对应位置的元素,也就是链表的头结点HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating nodewhile (!tryLock()) {HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {if (node == null) // speculatively create nodenode = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;elsee = e.next;}else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}
// 这是Segment对象中的put方法final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 先尝试获取一下锁,如果获取到了则node为null,如果第一次没有获取到,则开始加锁// node表示新的hashentry对象HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;// 链表头结点HashEntry<K,V> first = entryAt(tab, index);// 从链表头结点开始遍历for (HashEntry<K,V> e = first;;) {// 如果当前遍历到的entry对象的key不为空,则比较key,hash值是否相等,等等则覆盖// 如果不相等则e = e.nextif (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {// 如果尾结点都遍历完了// 设置node的next属性if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);// ConcurrentHash的元素数量+1,如果超过了阈值则扩容// 如果没有超过则将node结点设置到数组上,相当于下移int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node); // 将node添加到数组上,向下移动++modCount;count = c;oldValue = null;break;}}} finally {// 解锁unlock();}return oldValue;}
// Segment内扩容private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;// 数组大小翻倍扩容int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];int sizeMask = newCapacity - 1;// 遍历HashEntry老数组上的每个链表for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i]; // 链表头结点if (e != null) {HashEntry<K,V> next = e.next;// 链表头结点的在新数组上的下表int idx = e.hash & sizeMask;// 如果链表上只有一个结点,那么直接将当前entry移动到新数组上if (next == null) // Single node on listnewTable[idx] = e;else { // Reuse consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;// 从链表的第二个节点开始遍历链表,找到当前链表上和链表头节点新位置相同的最后一个结点for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;// 当前遍历到的entry的新下标不等于上一个entry的新下标,则记录一下// 如果链表的最后3个节点的新下标都一样的话,那么lastRun就是倒数第三个Entryif (k != lastIdx) {lastIdx = k;lastRun = last;}}// 将链表上的最后一个节点(不准确,参照上面的说法)转移到新数组上// 可能会把一个子链表转移过去newTable[lastIdx] = lastRun;// Clone remaining nodes// 从链表头部开始遍历,知道lastRun结束,一个个entry进行转移for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;// 头插法HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 把这个新node加到新数组上int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;}
