功能介绍
基本信息
包路径:java.util.concurrent
说明:ConcurrentHashMap,是Java并发包中自JDK1.5后提供的一个线程安全且高效的HashMap实现,可以用来替代HashTable。直接实现了ConcurrentMap接口,同时继承了AbstractMap抽象类。
它沿用了与它同时期的HashMap版本的思想,(jdk1.8)底层依然由“数组”+链表+红黑树的方式。但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。
特点:
- 线程安全【JDK1.7之前使用分段锁实现,JDK1.8开始使用CAS算法实现】
-
ConcurrentMap是如何保证线程安全的?
我们知道,HashTable通过synchronized同步锁保证线程安全,同一时间只要有一个线程操作某个数据,就会锁定整个哈希表,其他线程就无法对HashTable进行任何操作,只能等待该线程执行完毕或释放锁,那这种方式其实是很不友好且效率很低的。
分段锁
于是ConcurrentHashMap在JDK1.7使用了“分段锁”这种机制,线程访问数据时锁定一段范围的数据,这样在容器内就会存在多个锁定区间的锁(类似数据库的“间隙锁”),每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。
在ConcurrentHashMap中,Segment是一个类,实际上每一个Segment都是一个HashEntry<K,V>[] table
, table中的每一个元素本质上都是一个HashEntry的单向队列。
每一个Segment都拥有一个锁,当进行写操作时,只需要锁定一个Segment,而其它Segment中的数据是可以访问的。本质上Segment类就是一个小的Hashmap,里面table数组存储了各个节点的数据。
由于Segment继承ReentrantLock,所以在put时通过ReentrantLock的tryLock()
方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式(自旋就是一个循环获取锁的过程)继续调用tryLock()
方法去获取锁,超过指定次数就挂起,等待唤醒。CAS + synchronized
JDK1.8版本则做了2点修改
将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构.(与HashMap的变化基本一致)
- 取消segments字段,直接采用
transient volatile HashEntry<K,V>[] table
保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,并发控制使用synchronized和CAS来操作。使用synchronized和CAS来操作,synchronized只锁定当前链表或红黑树的首节点,这样只要哈希不冲突(不操作同一位置元素),就不会产生并发,效率又提升很多。
jdk8 后的put的流程(一个死循环流程):
- 判断 key 和 value 是否为空如果为空就抛出异常
- 对key 进行 hash,查找对应的 key 在table中的位置
- 如果位置为空,那么通过cas操作将值加入到 map 中
- 如果 cas 成功那么put 成功 退出循环
- 如果 cas 失败说明有其他线程已经将该位置抢占,进入第四步
- 检查对table表中对应的节点
table[i]
的 hash 值是否等于 MOVE(-1)- 如果是的话那么代表hashmap正在扩容,帮助其扩容
- 如果不是对当前节点的首节点加锁,然后判断其是红黑树还是链表,将对应的值加入其中因为已经加锁所以不存在线程安全问题。
jdk8 后的get 流程:
ConcurrentHashMap 的搜索方法比较简单
- 根据key在Map中找出其对应的value,如果不存在key,则返回null
- 其中key不允许为null,否则抛异常
- 对于节点可能在链表或树上的情况,需要分别去查找
源码解析
ConcurrentHashMap
基本属性
```java // node数组最大容量:2^30=1073741824 private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认数组容量,必须是2的幂 private static final int DEFAULT_CAPACITY = 16;
// 虚拟机限制的最大数组长度 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 并发级别,遗留下来的,为兼容以前的版本 private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子,该变量代表了当前哈希表的允许存放元素占哈希表大小的最大比例,当达到最大比例时会触发哈希表扩容 private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,即链表上的元素数> 8 链表才有可能转换为红黑树 static final int TREEIFY_THRESHOLD = 8;
// 树转链表阀值,当树上的节点数小于等于6时,重新转换成链表 static final int UNTREEIFY_THRESHOLD = 6;
// 树形化的阈值,只有哈希数组长度>= 64的时候,才会进行树形化,否则进行扩容 static final int MIN_TREEIFY_CAPACITY = 64;
// 扩容线程每次最少要迁移16个hash桶 // 扩容操作中,transfer这个步骤是允许多线程的 // 这个常量表示一个线程执行transfer时,最少要对连续的16个hash桶进行transfer private static final int MIN_TRANSFER_STRIDE = 16;
// 用于生成每次扩容都唯一的生成戳的数,最小是6。 private static int RESIZE_STAMP_BITS = 16;
// 最大的扩容线程的数量 // 2^15-1,help resize的最大线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,相反方向移位后能够反解出生成戳 // 32-16=16,sizeCtl中记录size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//=============== 核心属性
// ForwardingNode节点的hash值 // ForwardingNode是一种临时节点,在扩容进行中才会出现,并且它不存储实际的数据 // 如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode // 读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容 static final int MOVED = -1;
// TreeBin节点的hash值 // TreeBin是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点 static final int TREEBIN = -2;
// ReservationNode节点的hash值 // ReservationNode是一个保留节点,就是个占位符,不会保存实际的数据,正常情况是不会出现的, // 在jdk1.8新的函数式有关的两个方法computeIfAbsent和compute中才会出现 static final int RESERVED = -3;
// CPU的核心数,用于在扩容时计算一个线程一次要干多少活 static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node
// 扩容后的新的table数组,只有在扩容时才有用
// nextTable != null,说明扩容方法还没有真正退出,一般可以认为是此时还有线程正在进行扩容,
// 极端情况需要考虑此时扩容操作只差最后给几个变量赋值(包括nextTable = null)的这个大的步骤,
// 这个大步骤执行时,通过sizeCtl经过一些计算得出来的扩容线程的数量是0
private transient volatile Node
/*
- 控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
- 当为负数时:-1代表正在初始化,<-1代表执行扩容的线程数
- 当为0时:代表当时的table还没有被初始化
- 当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
`` 这里有个核心属性
sizeCtl,多线程之间,以
volatile的方式读取
sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。通过CAS设置
sizeCtl`属性,告知其他线程ConcurrentHashMap的状态变更。
不同状态,sizeCtl
所代表的含义也有所不同。
未初始化:
sizeCtl=0
:表示没有指定初始容量。sizeCtl>0
:表示初始容量。
初始化中:
sizeCtl=-1
:标记作用,告知其他线程,正在初始化。
正常状态:
sizeCtl=0.75n
:扩容阈值。
扩容中:
sizeCtl < 0
: 表示有其他线程正在执行扩容。sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
:表示此时只有一个线程在执行扩容。重要子类(存储结构)
ConcurrentHashMap的存储结构与HashMap基本一致,HashMap使用内部子类Node作为基本单元,存储链表节点数据,使用内部子类TreeNode存储树节点数据。
ConcurrentHashMap则增加了几个子类节点对象:ForwardingNode
、TreeBin
、ReservationNode
。1、Node- 链表节点
Node是ConcurrentHashMap存储结构的基本单元,继承于HashMap中的Entry,用于存储数据。
static class Node<K,V> implements Map.Entry<K,V> {
//链表的数据结构
final int hash; // key的hash值
final K key; // key
// val和next都会在扩容时发生变化,所以加上volatile来保持可见性和禁止重排序
volatile V val; // get操作全程不需要加锁是因为Node的成员val是用volatile修饰
volatile Node<K,V> next; // 表示链表中的下一个节点,数组用volatile修饰主要是保证在数组扩容的时候保证可见性
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允许更新value
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
// 用于map中的get()方法,子类重写
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
2、TreeNode - 树节点
TreeNode继承与Node,但是数据结构换成了红黑树的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树。
不过,ConcurrentHashMap对此节点的操作,都会由TreeBin来代理执行。也可以把这里的TreeNode看出是有一半功能的HashMap.TreeNode,另一半功能在ConcurrentHashMap.TreeBin中。static final class TreeNode<K,V> extends Node<K,V> {
//树形结构的属性定义
TreeNode<K,V> parent; // red-black tree links 父亲节点
TreeNode<K,V> left; // 左子节点
TreeNode<K,V> right; // 右子节点
// 新添加的prev指针是为了删除方便,删除链表的非头节点的节点,都需要知道它的前一个节点才能进行删除,所以直接提供一个prev指针
TreeNode<K,V> prev; // needed to unlink next upon deletion 前方节点
boolean red; // 标志是否红节点
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
/**
* @param h 哈希值
* @param k 键
*/
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* 根据key从根节点开始找出相应的TreeNode,
* @param h 哈希值
* @param k 键
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
// 红黑树的查找过程
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
3、TreeBin - 树根节点封装
TreeBin的hash值固定为-2,它是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。
// 红黑树节点TreeNode实际上还保存有链表的指针,因此也可以用链表的方式进行遍历读取操作
// 自身维护一个简单的读写锁,不用考虑写-写竞争的情况
// 不是全部的写操作都要加写锁,只有部分的put/remove需要加写锁
// 很多方法的实现和jdk1.8的ConcurrentHashMap.TreeNode里面的方法基本一样,可以互相参考
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root; // 红黑树结构的跟节点
volatile TreeNode<K,V> first; // 链表结构的头节点
volatile Thread waiter; // 最近的一个设置 WAITER 标识位的线程,waiter要么是null,要么是当前线程本身
volatile int lockState; // 整体的锁状态
// values for lockState
// 二进制001,红黑树的 已获得写锁状态
static final int WRITER = 1; // set while holding write lock
// 二进制010,红黑树的 等待获取写锁的状态
static final int WAITER = 2; // set when waiting for write lock
// 二进制100,红黑树的 读锁状态,读锁可以叠加,也就是红黑树方式可以并发读,每有一个这样的读线程,lockState都加上一个READER的值
static final int READER = 4; // increment value for setting read lock
// 重要的一点,红黑树的 读锁状态 和 写锁状态 是互斥的,但是从ConcurrentHashMap角度来说,读写操作实际上可以是不互斥的
// 红黑树的 读、写锁状态 是互斥的,指的是以红黑树方式进行的读操作和写操作(只有部分的put/remove需要加写锁)是互斥的
// 但是当有线程持有红黑树的 写锁 时,读线程不会以红黑树方式进行读取操作,而是使用简单的链表方式进行读取,此时读操作和写操作可以并发执行
// 当有线程持有红黑树的 读锁 时,写线程可能会阻塞,不过因为红黑树的查找很快,写线程阻塞的时间很短
// 另外一点,ConcurrentHashMap的put/remove/replace方法本身就会锁住TreeBin节点,这里不会出现写-写竞争的情况,因此这里的读写锁可以实现得很简单
/**
* 比较两个在hashCode相等并且不是Comparable类的元素
*/
static int tieBreakOrder(Object a, Object b) {
int d;
if (a == null || b == null || (d = a.getClass().getName().compareTo(b.getClass().getName())) == 0)
d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1);
return d;
}
/**
* 用以b为头结点的链表创建一棵红黑树
**/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b; // 标识链表首位
TreeNode<K,V> r = null;
// 构建红黑树
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r; // 记录红黑树根节点
assert checkInvariants(root); // 校验红黑树是否合法
}
/**
* 对根节点加写锁,红黑树重构时需要加上写锁
* 该方法只会在调用putTreeVal和removeTreeNode,判断红黑树需要重构时被调用
*/
private final void lockRoot() {
// U.compareAndSwapInt 先比较后交换,如果当前对象中的LOCKSTATE == 0,即没有线程获取到锁,则将其置为WRITER
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))// CAS方式设置写锁
// 如果CAS失败,以竞争的方式加锁
// 单独抽象出一个方法,直到获取到 写锁 这个调用才会返回
contendedLock();
}
// 释放写锁
private final void unlockRoot() {
lockState = 0;
}
/**
* 持续尝试获取写锁
* 可能会阻塞写线程,当写线程获取到写锁时,才会返回
* ConcurrentHashMap的put/remove/replace方法本身就会锁住TreeBin节点
* 也就是说写的时候会直接锁定,这里不会出现写-写竞争的情况
* 因此只用考虑 读锁 阻碍线程获取 写锁,不用考虑 写锁 阻碍线程获取 写锁,
* 这个读写锁本身实现得很简单,处理不了写-写竞争的情况
*/
private final void contendedLock() {
boolean waiting = false;
for (int s;;) {
// ~WAITER是对WAITER进行二进制取反,当此时没有线程持有读锁(不会有线程持有写锁)时,这个if为真
if (((s = lockState) & ~WAITER) == 0) {
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
// 在读锁、写锁 都没有被别的线程持有时,尝试为自己这个写线程获取写锁,同时清空 WAITER 状态的标识位
if (waiting) // 获取到写锁时,如果自己曾经注册过 WAITER 状态,将其清除
waiter = null;
return;
}
}
else if ((s & WAITER) == 0) { // 有线程持有读锁(不会有线程持有 写锁),并且当前线程不是 WAITER 状态时,这个else if为真
// 尝试设置waiter标志
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) { // 尝试占据 WAITER 状态标识位
waiting = true; // 表明自己正处于 WAITER 状态,并且被用于进入下一个 else if
waiter = Thread.currentThread(); // 使自己成为等待获取锁的写线程
}
}
else if (waiting) // 有线程持有 读锁(不会有线程持有 写锁),并且当前线程处于 WAITER 状态时,这个else if为真
// 阻塞自己
LockSupport.park(this);
}
}
/**
* 从根节点开始遍历查找,找到“相等”的节点就返回它,没找到就返回null
* 当有写线程加上 写锁 时,使用链表方式进行查找
*/
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
// 两种特殊情况下以链表的方式进行查找
// 1、有线程正持有 写锁,这样做能够不阻塞读线程
// 2、WAITER时,不再继续加 读锁,能够让已经被阻塞的写线程尽快恢复运行,或者刚好让某个写线程不被阻塞
if (((s = lockState) & (WAITER|WRITER)) != 0) {
// 比较是否相等,如果相等则返回
if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// 否则指向下一节点
e = e.next;
}
// 当前没有线程持有写锁且不在等待
// 读线程数量加1,读状态进行累加
// 以树的形式去读取数据
else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null : r.findTreeNode(h, k, null));
} finally {
Thread w;
// 如果这是最后一个读线程,并且有写线程因为 读锁 而阻塞,那么要通知它,告诉它可以尝试获取写锁了
// U.getAndAddInt(this, LOCKSTATE, -READER)这个操作是在更新之后返回lockstate的旧值,
// 不是返回新值,相当于先判断==,再执行减法
if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w); // 让被阻塞的写线程运行起来,重新去尝试获取 写锁
}
return p;
}
}
}
return null;
}
/**
* 往树上添加节点元素
* 用于实现ConcurrentHashMap.putVal
*/
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
Class<?> kc = null;
boolean searched = false;
for (TreeNode<K,V> p = root;;) {
int dir, ph; K pk;
if (p == null) {
first = root = new TreeNode<K,V>(h, k, v, null, null);
break;
}
else if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) {
if (!searched) {
TreeNode<K,V> q, ch;
searched = true;
if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null))
return q;
}
dir = tieBreakOrder(k, pk);
}
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
TreeNode<K,V> x, f = first;
first = x = new TreeNode<K,V>(h, k, v, f, xp);
if (f != null)
f.prev = x;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
// 下面是有关put加 写锁 部分
// 二叉搜索树新添加的节点,都是取代原来某个的NIL节点(空节点,null节点)的位置
if (!xp.red) // xp是新添加的节点的父节点,如果它是黑色的,新添加一个红色节点就能够保证x这部分的一部分路径关系不变,
// 这是insert重新染色的最最简单的情况
x.red = true; // 因为这种情况就是在树的某个末端添加节点,不会改变树的整体结构,对读线程使用红黑树搜索的搜索路径没影响
else { // 其他情况下会有树的旋转的情况出现,当读线程使用红黑树方式进行查找时,可能会因为树的旋转,导致多遍历、少遍历节点,影响find的结果
lockRoot(); // 除了那种最最简单的情况,其余的都要加 写锁,让读线程用链表方式进行遍历读取
try {
root = balanceInsertion(root, x);
} finally {
unlockRoot();
}
}
break;
}
}
// 校验红黑树合法性
assert checkInvariants(root);
return null;
}
// 基本是同jdk1.8的HashMap.TreeNode.removeTreeNode,仍然是从链表以及红黑树上都删除节点
// 两点区别:1、返回值,红黑树的规模太小时,返回true,调用者再去进行树->链表的转化;2、红黑树规模足够,不用变换成链表时,进行红黑树上的删除要加 写锁
final boolean removeTreeNode(TreeNode<K,V> p) {
TreeNode<K,V> next = (TreeNode<K,V>)p.next;
TreeNode<K,V> pred = p.prev; // unlink traversal pointers
TreeNode<K,V> r, rl;
if (pred == null)
first = next;
else
pred.next = next;
if (next != null)
next.prev = pred;
if (first == null) {
root = null;
return true;
}
if ((r = root) == null || r.right == null || (rl = r.left) == null || rl.left == null) // too small
return true;
lockRoot();
try {
TreeNode<K,V> replacement;
TreeNode<K,V> pl = p.left;
TreeNode<K,V> pr = p.right;
if (pl != null && pr != null) {
TreeNode<K,V> s = pr, sl;
while ((sl = s.left) != null) // find successor
s = sl;
boolean c = s.red; s.red = p.red; p.red = c; // swap colors
TreeNode<K,V> sr = s.right;
TreeNode<K,V> pp = p.parent;
if (s == pr) { // p was s's direct parent
p.parent = s;
s.right = p;
}
else {
TreeNode<K,V> sp = s.parent;
if ((p.parent = sp) != null) {
if (s == sp.left)
sp.left = p;
else
sp.right = p;
}
if ((s.right = pr) != null)
pr.parent = s;
}
p.left = null;
if ((p.right = sr) != null)
sr.parent = p;
if ((s.left = pl) != null)
pl.parent = s;
if ((s.parent = pp) == null)
r = s;
else if (p == pp.left)
pp.left = s;
else
pp.right = s;
if (sr != null)
replacement = sr;
else
replacement = p;
}
else if (pl != null)
replacement = pl;
else if (pr != null)
replacement = pr;
else
replacement = p;
if (replacement != p) {
TreeNode<K,V> pp = replacement.parent = p.parent;
if (pp == null)
r = replacement;
else if (p == pp.left)
pp.left = replacement;
else
pp.right = replacement;
p.left = p.right = p.parent = null;
}
root = (p.red) ? r : balanceDeletion(r, replacement);
if (p == replacement) { // detach pointers
TreeNode<K,V> pp;
if ((pp = p.parent) != null) {
if (p == pp.left)
pp.left = null;
else if (p == pp.right)
pp.right = null;
p.parent = null;
}
}
} finally {
unlockRoot();
}
assert checkInvariants(root);
return false;
}
// 下面四个是经典的红黑树方法
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p);
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p);
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x);
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x);
// 递归检查一些关系,确保构造的是正确无误的红黑树
static <K,V> boolean checkInvariants(TreeNode<K,V> t);
// Unsafe相关的初始化工作
private static final sun.misc.Unsafe U;
private static final long LOCKSTATE;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = TreeBin.class;
LOCKSTATE = U.objectFieldOffset(k.getDeclaredField("lockState"));
} catch (Exception e) {
throw new Error(e);
}
}
}
4、ForwardingNode - 转发节点
ForwardingNode是一种临时节点,在扩容进行中才会出现,hash值固定为-1,并且它不存储实际的数据数据,标识所处位置已经扩容完毕,前往新数组执行。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。
static final class ForwardingNode<K,V> extends Node<K,V> {
// 指向扩容后的新数组
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
/**
* ForwardingNode的查找操作,直接在新数组nextTable上去进行查找
*/
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes 使用循环,避免多次碰到ForwardingNode导致递归过深
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) // 第一个节点就是要找的节点,直接返回
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) { // 继续碰见ForwardingNode的情况,这里相当于是递归调用一次本方法
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k); // 碰见特殊节点,调用其find方法进行查找
}
if ((e = e.next) == null) // 普通节点直接循环遍历链表
return null;
}
}
}
}
5、ReservationNode - 保留节点
或者叫空节点,computeIfAbsent和compute这两个函数式api中才会使用。它的hash值固定为-3,就是个占位符,不会保存实际的数据,正常情况是不会出现的,在jdk1.8新的函数式有关的两个方法computeIfAbsent和compute中才会出现。
为什么需要这个节点,因为正常的写操作,都会想对hash桶的第一个节点进行加锁,但是null是不能加锁,所以就要new一个占位符出来,放在这个空hash桶中成为第一个节点,把占位符当锁的对象,这样就能对整个hash桶加锁了。static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null, null);
}
// 空节点代表这个hash桶当前为null,所以肯定找不到“相等”的节点
Node<K,V> find(int h, Object k) {
return null;
}
}
重要方法
size()方法
size()方法是否是线程安全的?
是;map 中键值对的个数通过求 baseCount 与 counterCells 非空元素的和得到。
// 两种情况
// 1. counterCells 数组未初始化,在没有线程争用时,将 size 的变化写入此字段
// 2. 初始化 counterCells 数组时,没有获取到 cellsBusy 锁,会再次尝试将 size 的变化写入此字段
private transient volatile long baseCount;
// 用于同步 counterCells 数组结构修改的乐观锁资源
private transient volatile int cellsBusy;
// counterCells 数组一旦初始化,size 的变化将不再尝试写入 baseCount
// 可以将 size 的变化写入数组中的任意元素
// 可扩容,长度保持为 2 的幂
private transient volatile CounterCell[] counterCells;
// 参数 x 表示键值对个数的变化值,如果为正,表示新增了元素,如果为负,表示删除了元素
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果 counterCells 为空,则直接尝试通过 CAS 将 x 累加到 baseCount 中
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// counterCells 非空
// 或 counterCells 为空,但 CAS baseCount 失败都会来到这里
CounterCell a; long v; int m;
boolean uncontended = true; // CAS 数组元素时,有没有发生线程争用的标志
// 如果当前线程探针哈希到的数组元素非空,则尝试将 x 累加到对应数组元素
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// counterCells 为空,或其长度小于1
// 或当前线程探针哈希到的数组元素为空
// 或当前线程探针哈希到的数组元素非空,但 CAS 数组元素失败
// 都会调用 fullAddCount 方法来完成 x 的写入
fullAddCount(x, uncontended);
return; // 如果调用过 fullAddCount,则当前线程一定不会协助扩容
}
// 走到这说明,CAS 数组元素成功
// 此时如果 check <= 1,也不协助可能会发生的扩容
if (check <= 1)
return;
// 如果 check 大于 1,则计算当前 map 的 size,为判断是否需要扩容做准备
s = sumCount();
}
// size 的变化已经写入完成
// 后面如果 check >= 0,则判断当前的 size 是否会触发扩容
if (check >= 0) {
// 扩容相关的逻辑
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
addCount 方法记录 size 变化的过程可以分为两类情况:
- counterCells 数组未初始化
- CAS 一次 baseCount
- 如果 CAS 失败,则调用 fullAddCount 方法
- counterCells 数组已初始化
- CAS 一次当前线程探针哈希到的数组元素
- 如果 CAS 失败,则调用 fullAddCount 方法
// 只被 addCount 方法调用
// 如果 counterCells 数组未初始化
// 或者线程哈希到的 counterCells 数组元素未初始化
// 或者 CAS 数组元素失败,都会调用此方法
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 判断线程探针哈希值是否初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true; // 重新假设未发生争用
}
boolean collide = false; // 是否要给 counterCells 扩容的标志
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 数组不为空且长度大于 0
if ((a = as[(n - 1) & h]) == null) {
// 尝试初始化线程探针哈希到的数组元素
if (cellsBusy == 0) { // Try to attach new Cell
// 注意,这里已经把 x 放入对象
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 && // 准备初始化数组元素,要求 cellsBusy 为 0,并尝试将其置 1
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 获得 cellsBusy 锁
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 判断有没有被其它线程初始化
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放 cellsBusy 锁
}
if (created) // 初始化元素成功,直接退出循环
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash(指的是更改当前线程的探针哈希值)
// wasUncontended 为 true 执行到这
// 尝试将 x 累加进数组元素
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// CAS 失败
// 判断 counterCells 是否正在扩容,或数组长度是否大于等于处理器数
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 如果数组没有在扩容,且数组长度小于处理器数
// 此时,如果 collide 为 false,则把它变成 true
// 在下一轮循环中,如果 CAS 数组元素继续失败,就会触发 counterCells 扩容
else if (!collide)
collide = true;
// 如果 collide 为 true,则尝试给 counterCells 数组扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h); // 更改当前线程的探针哈希值
}
// counterCells 数组为空或长度为 0
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 获取 cellsBusy 锁
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2]; // 初始长度为 2
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// counterCells 数组为空或长度为 0,并且获取 cellsBusy 锁失败
// 则会再次尝试将 x 累加到 baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
} // end for
}
fullAddCount方法:
- 线程探针哈希值的初始化
- counterCells 数组的初始化和扩容
- counterCells 元素的初始化
- 将 size 的变化,写入 counterCells 中的某一个元素。(如果 counterCells 初始化时,获取锁失败,则还会尝试将 size 的变化,写入 baseCount)
构造方法
ConcurrentHashMap构造方法不会进行数组的初始化(与HashMap不同),仅会计算并通过sizeCtl保存初始容量。真正的初始化操作在第一次put操作的时候进行。 ```java public ConcurrentHashMap() { }
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 计算初始容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 求 2^n
// 保存hash桶的接下来的初始化使用的容量
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); }
/**
- concurrencyLevel只是为了此方法能够兼容之前的版本,它并不是实际的并发级别,loadFactor也不是实际的加载因子了
这两个都失去了原有的意义,仅仅对初始容量有一定的控制作用 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 检查参数
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 计算初始容量 int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 保存hash桶的接下来的初始化使用的容量 this.sizeCtl = cap; // 用这个重要的变量保存hash桶的接下来的初始化使用的容量
// 不进行任何数组(hash桶)的初始化工作,构造方法进行懒初始化 lazyInitialization }
/**
根据传入的Map初始化 ConcurrentHashMap */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { // 初始容量 = 默认初始容量 this.sizeCtl = DEFAULT_CAPACITY;
// 将传入Map元素全部填充到当前ConcurrentHashMap putAll(m); } ```
iniTable-初始化
真正的初始化在
iniTable()
方法中,在put方法中有调用此方法。 ```java /**真正的初始化方法,使用保存在sizeCtl中的数据作为初始化容量 */ // Initializes table, using the size recorded in sizeCtl. private final Node
[] initTable() { Node [] tab; int sc; // Thread.yeild() 和 CAS 都不是100%和预期一致的方法,所以用循环 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 真正的初始化是要禁止并发的,保证tables数组只被初始化一次,但是又不能切换线程,所以用yeild()暂时让出CPU
// CAS更新sizeCtl标识为 "初始化" 状态
//SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 检查table数组是否已经被初始化,没初始化就真正初始化
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc = threshold,n - (n >>> 2) = n - n/4 = 0.75n,前面说了loadFactor没用了,这里看出,统一用0.75f了
sc = n - (n >>> 2);
}
} finally {
// 初始化后,sizeCtl长度为数组长度的3/4,也就是扩容阈值
sizeCtl = sc;
}
break;
}
put-存放数据
```java /**
- 单纯的额调用putVal方法,并且putVal的第三个参数设置为false
- 当设置为false的时候表示这个value一定会设置
- true的时候,只有当这个key的value为空的时候才会设置
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
```
putVal
```java /** - 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,
- 如果没有的话就初始化数组
- 然后通过计算hash值来确定放在数组的哪个位置
- 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来
- 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制
- 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作
- 然后判断当前取出的节点位置存放的是链表还是树
- 如果是链表的话,则遍历整个链表,取出来的节点的key与要放的key进行比较,如果key相等,并且key的hash值也相等的话,
- 则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾
- 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去
- 最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,
则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组 */ final V putVal(K key, V value, boolean onlyIfAbsent) { // 判空 if (key == null || value == null) throw new NullPointerException();
// 取得key的hash值 int hash = spread(key.hashCode());
// 用来计算在这个位置总共有多少个元素,用来控制扩容或者转移为树 int binCount = 0;
// 如上,CAS 不是100%和预期一致的方法,所以用循环 for (Node
[] tab = table;;) { Node<K,V> f; int n, i, fh;
// 第一次put的时候table没有初始化,则初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 通过哈希计算待插入元素应该在表中的位置,因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界
// f为指定位置的首节点(链表首节点/树首节点)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果这个位置没有元素的话,则通过cas的方式尝试添加
// 创建一个Node添加到数组中,null表示的是下一个节点为空
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
/**
* 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
* 则当前线程也会参与去复制,通过允许多线程复制的功能来减少数组的复制所带来的性能损失
*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 前往帮助扩容
else {
/**
* 如果在这个位置有元素的话,就采用synchronized的方式加锁,
* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,
* 如果找到了key和key的hash值都一样的节点,则把它的值替换到
* 如果没找到的话,则添加在链表的最后面
* 否则,是树的话,则调用putTreeVal方法添加到树中去
*
* 在添加完之后,会对该节点上关联的的数目进行判断,
* 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容
*/
V oldVal = null;
// 整体使用了synchronized锁
synchronized (f) {
// 再次取出要存储的位置的元素,跟前面取出来的比较
if (tabAt(tab, i) == f) {
// 取出来的元素的hash值大于0,当转换为树之后,hash值为-2
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,
if ((e = e.next) == null) {
// 为空的话把这个要加入的节点设置为当前节点的下一个节点
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //表示已经转化成红黑树类型了
Node<K,V> p;
binCount = 2;
// 调用putTreeVal方法,将该元素添加到树中去
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 当在同一个桶的节点数目达到8个的时候,则扩张数组或将给节点的数据转为tree
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
} addCount(1L, binCount); //计数 return null; } ```
其他方法
一些原子操作,使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。 ```java /**
- 用来返回节点数组的指定位置的节点的原子操作
*/
@SuppressWarnings(“unchecked”)
static final
Node tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }
/**
- cas原子操作,在指定位置设定值
*/
static final
boolean casTabAt(Node [] tab, int i,
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }Node<K,V> c, Node<K,V> v) {
/**
- 原子操作,在指定位置设定值
*/
static final
void setTabAt(Node [] tab, int i, Node v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); } ``` tryPresize&transfer-扩容机制
首先上述put过程中,有一个treeifyBin(tab, i)
方法的调用,该方法用于进行链表树形化。在树形化之前会判断数组长度是否小于64,如果小于64则调用tryPresize
方法进行数组扩容。 ```java /** - Replaces all linked nodes in bin at given index unless table is
- too small, in which case resizes instead.
当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树 */ private final void treeifyBin(Node
[] tab, int index) { Node b; int n, sc; if (tab != null) { System.out.println("treeifyBin方\t==>数组长:"+tab.length);
// 当数组长度小于64的时候,扩张数组长度一倍,MIN_TREEIFY_CAPACITY==64
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 使用synchronized同步器,将该节点出的链表转为树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null; //hd:树的头(head)
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null) //把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置
hd = p; //设置head
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的链表放入容器TreeBin中
}
}
}
} }
tryPresize方法如下:
java /**- 扩容表为指可以容纳指定个数(size)的大小(总是2的N次方)
- 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12
- 计算出来c的值为64,则要扩容到sizeCtl≥为止
- 第一次扩容之后 数组长:32 sizeCtl:24
- 第二次扩容之后 数组长:64 sizeCtl:48
第三次扩容之后 数组长:128 sizeCtl:94 —> 这个时候才会退出扩容 */ private final void tryPresize(int size) {
/**
- MAXIMUM_CAPACITY = 1 << 30
- 如果给定的大小大于等于最大数组容量的一半,则直接使用最大容量,
- 否则使用tableSizeFor算出来
后面table一直要扩容到这个值小于等于sizeCtl(数组长度的3/4)才退出扩容 */ int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node
[] tab = table; int n; /**
- 如果数组table还没有被初始化,则初始化一个大小为sizeCtl和刚刚算出来的c中较大的一个大小的数组
- 初始化的时候,设置sizeCtl为-1,初始化完成之后把sizeCtl设置为数组长度的3/4
- 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素,
- 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,
而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断 */ if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 初始化tab的时候,把sizeCtl设为-1
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
} }
/**
- 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出
所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍 */ else if (c <= sc || n >= MAXIMUM_CAPACITY) { break; //退出扩张 } else if (tab == table) { int rs = resizeStamp(n);
/**
- 如果正在扩容Table的话,则帮助扩容
- 否则的话,开始新的扩容
- 在transfer操作,将第一个参数的table中的元素,移动到第二个元素的table中去,
- 虽然此时第二个参数设置的是null,但是,在transfer方法中,当第二个参数为null的时候,
会创建一个两倍大小的table */ if (sc < 0) { Node
[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
/**
- transfer的线程数加一,该线程将进行transfer的帮忙
- 在transfer的时候,sc表示在transfer工作的线程数 */ if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); }
/**
- 没有在初始化或扩容,则开始扩容
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
transfer(tab, null); } } } }(rs << RESIZE_STAMP_SHIFT) + 2)) {
在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。<br />数组扩容的主要方法就是transfer方法,负责迁移node节点。
java /**
- Moves and/or copies the nodes in each bin to new table. See
- above for explanation.
- 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置
- 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,
- 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作
- 扩容的时候会一直遍历,直到复制完所有节点,每处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他,
复制后在新数组中的链表不是绝对的反序的 */ private final void transfer(Node
[] tab, Node [] nextTab) { int n = tab.length, stride; // MIN_TRANSFER_STRIDE=16 用来控制不要占用太多CPU if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
/**
- 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab
- 此时nextTable被设置值了(在初始情况下是为null的)
- 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张,
而只是第一个开始扩张的线程需要初始化下目标数组 */ if (nextTab == null) { try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 创建两倍长数组
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
} nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length;
/**
- 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点
这是一个空的标志节点 */ ForwardingNode
fwd = new ForwardingNode (nextTab); boolean advance = true; // 是否继续向前查找的标志位 boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看是否有未完成的 for (int i = 0, bound = 0;;) { Node
f; int fh; while (advance) { int nextIndex, nextBound;
if (--i >= bound || finishing) {
advance = false;
}
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { //已经完成转移
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); // 设置sizeCtl为扩容后的0.75
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
return;
}
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) // 数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) { // 加锁操作
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // 该节点的hash值大于等于0,说明是一个Node节点
/**
* 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n
* 根据这个规则
* 0--> 放在新表的相同位置
* n--> 放在新表的(n+原来位置)
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
/**
* lastRun 表示的是需要复制的最后一个节点
* 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b
* 这样for循环之后,runBit的值就是最后不变的hash&n的值
* 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点)
* 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的,
* 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置
* 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行复制了,自己就被带过去了
* 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序
*/
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n; //n的值为扩张前的数组的长度
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
/**
* 构造两个链表,顺序大部分和原来是反的
* 分别放到原来的位置和新增加的长度的相同位置(i/n+i)
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
/**
* 假设runBit的值为0,
* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点
* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
*/
ln = new Node<K,V>(ph, pk, pv, ln);
else
/**
* 假设runBit的值不为0,
* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点
* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
*/
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { //否则的话是一个树节点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
/**
* 在复制完树节点之后,判断该节点处构成的树还有几个节点,
* 如果≤6个的话,就转回为一个链表
*/
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
注意:
- 复制之后的新链表不是旧链表的绝对倒序。
- 在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理
<a name="Q2UqJ"></a>
# 简单使用
```java
package com.java.map;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description
* @date: 2020-12-29 23:07
*/
public class ConcurrentHashMapCode {
public static void main(String[] args) {
Map<String,Object> concurrentHashMap = new ConcurrentHashMap<>();
concurrentHashMap.put("one",1);
concurrentHashMap.put("two", 2);
System.out.println(concurrentHashMap);
// 如果传入key对应的value已经存在,就返回存在的value,不进行替换
concurrentHashMap.putIfAbsent("one", 2);
concurrentHashMap.putIfAbsent("three", 3);
System.out.println(concurrentHashMap);
}
}
输出:
{one=1, two=2}
{one=1, two=2, three=3}
总结
- ConcurrentHashMap是Java并发包中自JDK1.5后提供的一个线程安全且高效的HashMap实现,可以用来替代HashTable。直接实现了ConcurrentMap接口,同时继承了AbstractMap抽象类。
- JDK1.8后,ConcurrentHashMap使用CAS算法结合synchronized同步锁的方式保证线程安全。
- ConcurrentHashMap的存储结构与HashMap基本一致,使用内部子类Node作为基本单元,存储链表节点数据,使用内部子类TreeNode存储树节点数据。同时增加了几个子类节点对象:ForwardingNode(转发节点)、TreeBin(红黑树根节点)、ReservationNode(保留节点)。
思考
ConcurrentHashMap与HashMap的比较
线程安全性
- HashMap不是线程安全的,多线程并发下扩容可能会导致数据覆盖的情况。
- ConcurrentHashMap线程安全,在ConcurrentHashMap中,大量使用了
U.compareAndSwapXXX
的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。同时,在ConcurrentHashMap中还定义了三个原子操作,用于对指定位置的节点进行操作。这三种原子操作被广泛的使用在ConcurrentHashMap的get和put等方法中。 我们可以发现JDK8中ConcurrentHashMap的实现使用的是锁分离思想,只是锁住的是一个node,而锁住Node之前的操作是基于在volatile和CAS之上无锁并且线程安全的。
null值
HashMap允许key和value为空。
-
迭代
HashMap在用iterator遍历的同时,不允许修改HashMap。
- ConcurrentHashMap允许该行为,并且更新对后续的遍历是可见的。