1.7中的ConcurrentHashMap整体上可以看作是一个双层数组的结构。外层是Segment数组,每个Segment内是一个HashEntry数组,类似于一个HashMap,数组的每个Segment在操作时都相对独立。Segment在初始化后不允许扩容,HashEntry允许扩容。在加锁的过程中只会对某个Segment加锁而不是整个数组。
构造方法
//无参构造器会用默认参数调用该构造器//默认参数为initialCapacity=16,loadFactor=0.75f,concurrencyLevel=16public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;//Segment数组的长度int ssize = 1;//Segment数组的长度要为2的n次幂且大于等于concurrencyLevelwhile (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;//c用来计算Segment内数组的大小int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;//cap为真正的Segment内的数组大小 需要调整为2的n次幂int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;//创建0位置的SegmentSegment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);//创建Segment数组Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];//将s0赋值给Segment[0]UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;}
Put方法
public V put(K key, V value) {Segment<K,V> s;if (value == null)//不允许空值throw new NullPointerException();int hash = hash(key);//j是找到的Segment数组索引int j = (hash >>> segmentShift) & segmentMask;//判断插入的位置是否为null,为null则创建一个Segmentif ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j);return s.put(key, hash, value, false);}private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;//为保证在当前位置只能有一个Segment被创建,这里用UNSAFE,if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {Segment<K,V> proto = ss[0]; // use segment 0 as prototype//从Segment[0]获取参数 HashEntry数组的大小和加载因子int cap = proto.table.length;float lf = proto.loadFactor;//计算扩容阈值int threshold = (int)(cap * lf);HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];//再次检测当前位置是否已经有其他线程创造的Segmentif ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheckSegment<K,V> s = new Segment<K,V>(lf, threshold, tab);//这里用cas保证写入成功while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;}final V put(K key, int hash, V value, boolean onlyIfAbsent) {//tryLock 尝试获取锁,即使获取失败,后面的代码也会继续执行//Lock 获取锁,若获取失败,则进入阻塞状态//此时若尝试获取锁失败,则会引入自旋锁,在scanAndLockForPut实现HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;//是HashEntry链表插入,和hashMap差不多try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;}private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {//获取要插入的Segment的第一个节点HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;//while循环为一直尝试获取锁 使用tryLock可以在未获取锁的时候进行//一些其它操作提高效率//retries变量起到引导作用 判断当前应该进行什么操作int retries = -1; // negative while locating nodewhile (!tryLock()) {HashEntry<K,V> f; // to recheck first below//若retries小于0 则进行遍历链表操作if (retries < 0) {if (e == null) {//若遍历到表尾则创建HashEntryif (node == null) // speculatively create nodenode = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))//找到key相同的HashEntry 将retries置为0retries = 0;elsee = e.next;}//此时为遍历链表的操作已经结束还未获取到锁 不能一直处于循环状态占用CPU内存//每进行一次循环会让retries变量+1 当retries大于64时 调用Lock方法阻塞else if (++retries > MAX_SCAN_RETRIES) {lock();break;}////判断在循环等待的过程中 链表结构是否发生了变化即头结点发生改变//若改变则将retries置为-1 重新进行遍历链表操作//为了避免在判断头结点是否改变的过程中 释放的锁被其它线程抢走 只在retries为奇数时判断else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}
