// 初始化数组
// 一个线程在put时如果发现tab是空的,则需要进行初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
/*
其中一个线程进入else if 后 会创建 table
其他线程都在if中相互转让cpu然后再次循环,再次进入if
当进入else if 的线程成功初始化后
其他线程会陆续进入else if 或 进入while的条件判断
判断后发现tab已经不为null,此时会退出循环。
*/
while ((tab = table) == null || tab.length == 0) {
// sizeCtl默认等于0,如果为-1表示有其他线程正在进行初始化,本线程不竞争CPU
// yield表示放弃CPU,线程重新进入就绪状态,重新竞争CPU,如果竞争不到就等,如果竞争到了又继续循环
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 通过cas将sizeCtl改为-1,如果改成功了则进行后续操作
// 如果没有成功,则表示有其他线程在进行初始化或已经把数组初始化好了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 当前线程将sizeCtl改为-1后,再一次判断数组是否为空
// 会不会存在一个线程进入到此处之后,数组不为空了?
if ((tab = table) == null || tab.length == 0) {
// 如果在构造ConcurrentHashMap时指定了数组初始容量,那么sizeCtl就为初始化容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 如果n为16,那么就是16-4=12
// sc = 3*n/4 = 0.75n, 初始化完成后sizeCtl的数字表示扩容的阈值
sc = n - (n >>> 2);
}
} finally {
// 此时sc为阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
1. ConcurrentHashMap底层结构
1.1. JDK1.7
底层一个Segments数组,存储一个Segments对象,一个Segments中储存一个Entry数组,存储的每个Entry对象又是一个链表头结点。
get():
1、第一次哈希 找到 对应的Segment段,
调用Segment中的get方法
2、再次哈希找到对应的链表,
3、最后在链表中查找。
// 外部类方法
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash); // 第一次hash 确定段的位置
}
//以下方法是在Segment对象中的方法;
//确定段之后在段中再次hash,找出所属链表的头结点。
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
put():
1、首先确定段的位置,
调用Segment中的put方法:
2、加锁
3、检查当前Segment数组中包含的HashEntry节点的个数,如果超过阈值就重新hash
4、然后再次hash确定放的链表。
5、在对应的链表中查找是否相同节点,如果有直接覆盖,如果没有将其放置链表尾部
当size超过阈值会进行rehash扩容。
1.2. JDK1.8
put方法:
1、检查Key或者Value是否为null,
2、得到Kye的hash值
3、如果Node数组是空的,此时才初始化 initTable(),
4、如果找的对应的下标的位置为空,直接new一个Node节点并放入, break;
5、
6、如果对应头结点不为空, 进入同步代码块
判断此头结点的hash值,是否大于零,大于零则说明是链表的头结点在链表中寻找,
如果有相同hash值并且key相同,就直接覆盖,返回旧值 结束
如果没有则就直接放置在链表的尾部
此头节点的Hash值小于零,则说明此节点是红黑二叉树的根节点
调用树的添加元素方法
判断当前数组是否要转变为红黑树
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());// 得到 hash 值
int binCount = 0; // 用于记录相应链表的长度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,
// 用一次 CAS 操作将新new出来的 Node节点放入数组i下标位置
// 如果 CAS 失败,那就是有并发操作,进到下一个循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
tab = helpTransfer(tab, f);
else { // 到这里就是说,f 是该位置的头结点,而且不为空
V oldVal = null;
// 获取链表头结点监视器对象
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount != 0 说明上面在做链表操作
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值: 8
if (binCount >= TREEIFY_THRESHOLD)
// 如果当前数组的长度小于 64,那么会进行数组扩容,而不是转换为红黑树
treeifyBin(tab, i); // 如果超过64,会转成红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}
get方法
https://blog.csdn.net/qq_41884976/article/details/89532816
。。。待补充
2. 面试题
2.1. ConcurrentHashMap是如何实现线程安全的
分段锁 对整个桶数组进行了分割分段(Segment),每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。
使用的是优化的synchronized 关键字同步代码块 和 cas操作了维护并发。
2.2. 和Hashtable的线程安全机制有什么联系
Hashtable是用 synchronized方法 和 synchronized代码块实现线程安全的。
JDK1.7 和 JDK1.8版本都用到了 synchronized代码块。
2.3. ConcurrentHashMap 和 Hashtable 的效率
ConcurrentHashMap JDK 1.7:使用分段锁,一个线程占用一个segment,其他线程可以操作其他segment。
ConcurrentHashMap JDK 1.8:put和get不用二次哈希,一把锁只能锁住一个链表或一棵红黑树,提高了并发效率。
Hashtable:使用synchronized方法锁,如果一个线程抢到锁,其他线程都要等待该锁释放,效率很低。