ConcurrentHashMap的认识
概述
ConcurrentHashMap类中包含两个静态内部类 HashEntry 和 Segment,其中 HashEntry 用来封装具体的K/V对,是个典型的四元组;Segment 用来充当锁的角色,每个 Segment 对象守护整个ConcurrentHashMap的若干个桶 (可以把Segment看作是一个小型的哈希表),其中每个桶是由若干个 HashEntry 对象链接起来的链表。
总的来说,一个ConcurrentHashMap实例中包含由若干个Segment实例组成的数组,而一个Segment实例又包含由若干个桶,每个桶中都包含一条由若干个 HashEntry 对象链接起来的链表。特别地,ConcurrentHashMap 在默认并发级别下会创建16个Segment对象的数组,如果键能均匀散列,每个 Segment 大约守护整个散列表中桶总数的 1/16。
成员变量
与HashMap相比,ConcurrentHashMap 增加了两个属性用于定位段,分别是 segmentMask 和 segmentShift。此外,不同于HashMap的是,ConcurrentHashMap底层结构是一个Segment数组,而不是Object数组,具体源码如下:
// 用于定位段,大小等于segments数组的大小减 1,是不可变的
final int segmentMask;
// 用于定位段,大小等于32(hash值的位数)减去对segments的大小取以2为底的对数值,是不可变的
final int segmentShift;
// ConcurrentHashMap的底层结构是一个Segment数组
final Segment<K,V>[] segments;
还有几个重要的属性,三个非常重要的参数:初始容量、负载因子 和 并发级别,这三个参数是影响ConcurrentHashMap性能的重要参数。从源码我们可以看出,ConcurrentHashMap 也正是通过initialCapacity、loadFactor和concurrencyLevel这三个参数进行构造并初始化segments数组、段偏移量segmentShift、段掩码segmentMask和每个segment的。
段的定义:Segment
Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每个 Segment 对象用来守护它的成员对象 table 中包含的若干个桶。table 是一个由 HashEntry 对象组成的链表数组,table 数组的每一个数组成员就是一个桶。
段(Segment)的定义以及成员变量如下:
//对count的大小造成影响的操作的次数(比如put或者remove操作)
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// Segment中元素的数量,可见的
transient volatile int count;
//对count的大小造成影响的操作的次数(比如put或者remove操作)
transient int modCount;
// 阈值,段中元素的数量超过这个值就会对Segment进行扩容
transient int threshold;
// 链表数组
transient volatile HashEntry<K,V>[] table;
// 段的负载因子,其值等同于ConcurrentHashMap的负载因子
final float loadFactor;
// ...
}
在Segment类中,count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组中包含的 HashEntry 对象的个数,也就是 Segment 中包含的 HashEntry 对象的总数。
特别需要注意的是,之所以在每个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是对 ConcurrentHashMap 并发性的考虑:因为这样当需要更新计数器时,不用锁定整个ConcurrentHashMap。事实上,每次对段进行结构上的改变,如在段中进行增加/删除节点(修改节点的值不算结构上的改变),都要更新count的值,此外,在JDK的实现中每次读取操作开始都要先读取count的值。
特别需要注意的是,count是volatile修饰的,这使得对count的任何更新对其它线程都是立即可见的。
modCount用于统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,这一点具体在谈到跨段操作时会详述。
threashold用来表示段需要进行重哈希的阈值。
loadFactor表示段的负载因子,其值等同于ConcurrentHashMap的负载因子的值。
table是一个典型的链表数组,而且也是volatile的,这使得对table的任何更新对其它线程也都是立即可见的。
我们知道,ConcurrentHashMap允许多个修改(写)操作并发进行,其关键在于使用了锁分段技术,它使用了不同的锁来控制对哈希表的不同部分进行的修改(写),而 ConcurrentHashMap 内部使用段(Segment)来表示这些不同的部分。实际上,每个段实质上就是一个小的哈希表,每个段都有自己的锁(Segment 类继承了 ReentrantLock 类)。这样,只要多个修改(写)操作发生在不同的段上,它们就可以并发进行。
基本元素:HashEntry
HashEntry用来封装具体的键值对,是个典型的四元组。与HashMap中的Entry类似,HashEntry也包括同样的四个域,分别是key、hash、value和next。不同的是,在HashEntry类中,key,hash和next域都被声明为final的,value域被volatile所修饰,因此HashEntry对象几乎是不可变的,
这是ConcurrentHashmap读操作并不需要加锁的一个重要原因。
next域被声明为final本身就意味着我们不能从hash链的中间或尾部添加或删除节点,因为这需要修改next引用值,因此所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。
但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制(重新new)一遍,最后一个节点指向要删除结点的下一个结点(这在谈到ConcurrentHashMap的删除操作时还会详述)。
特别地,由于value域被volatile修饰,所以其可以确保被读线程读到最新的值,这是ConcurrentHashmap读操作并不需要加锁的另一个重要原因。实际上,ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。HashEntry代表hash链中的一个节点,其结构如下所示:
static final class HashEntry<K,V> {
final K key; // 声明 key 为 final 的
final int hash; // 声明 hash 值为 final 的
volatile V value; // 声明 value 被volatile所修饰
final HashEntry<K,V> next; // 声明 next 为 final 的
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}
与HashMap类似,在ConcurrentHashMap中,如果在散列时发生碰撞,也会将碰撞的 HashEntry 对象链成一个链表。由于HashEntry的next域是final的,所以新节点只能在链表的表头处插入。
ConcurrentHashMap 的数据结构
本质上,ConcurrentHashMap就是一个Segment数组,而一个Segment实例则是一个小的哈希表。由于Segment类继承于ReentrantLock类,从而使得Segment对象能充当锁的角色,这样,每个 Segment对象就可以守护整个ConcurrentHashMap的若干个桶,其中每个桶是由若干个HashEntry 对象链接起来的链表。通过使用段(Segment)将ConcurrentHashMap划分为不同的部分,ConcurrentHashMap就可以使用不同的锁来控制对哈希表的不同部分的修改,从而允许多个修改操作并发进行, 这正是ConcurrentHashMap锁分段技术的核心内涵。进一步地,如果把整个ConcurrentHashMap看作是一个父哈希表的话,那么每个Segment就可以看作是一个子哈希表 。
注意,假设ConcurrentHashMap一共分为2^n个段,每个段中有2^m个桶,那么段的定位方式是将key的hash值的高n位与(2^n-1)相与。在定位到某个段后,再将key的hash值的低m位与(2^m-1)相与,定位到具体的桶位。
JDK1.7ConcurrentHashMap源码
https://www.bilibili.com/video/BV1x741117jq?p=3&t=5356.3
另一视频
https://www.bilibili.com/video/BV1jq4y1Z79x?p=2
文档:
https://blog.csdn.net/justloveyou_/article/details/72783008
https://blog.csdn.net/zzu_seu/article/details/106698150
https://blog.csdn.net/xingxiupaioxue/article/details/88062163?
https://blog.csdn.net/u010723709/article/details/48007881?
https://blog.csdn.net/justloveyou_/article/details/72783008
概述
很明显,我们在介绍HashMap就知道它不是线程安全,要保证它是线程安全的,有两种思路;一种是跟HashTable一样,直接在各个方法都添加上synchronized关键进行修改,直接锁住了整个对象;但这样子改很影响性能,因为每次都要锁住临界区代码,比如两个线程同时调用put方法进行添加元素,两个线程总归有一个线程先进入临界区代码将元素添加进哈希表,此时另一个线程因为先拿到临界区对象锁而自己没有锁的缘故被阻塞住了,只能干等着,就非常影响性能。
另一种实现是JDK1.7 中的ConcurrentHashMap采用分段锁的思想进行实现。值得注意的是ConcurrentHashMap里面的内部类并不是Entry类型的,而是Segment;但是在这个Segment类里面又有一个HashEntry 类型的属性table数组。也就是说ConcurrentHashMap有一个Segment类型的数组segments,而这个Segment类内部又有一个HashEntry 类型的数组属性table,用图来表示就是数组里面嵌套数组。![image.png](https://cdn.nlark.com/yuque/0/2022/png/28814483/1658404859429-65899cf9-9f62-4e8c-a07c-3a448546fddb.png#clientId=u904e803d-0ec6-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=204&id=u0a97591c&name=image.png&originHeight=280&originWidth=1128&originalType=binary&ratio=1&rotation=0&showTitle=false&size=29154&status=done&style=none&taskId=u8bff33d4-dd0b-4476-814b-eeec1eb8fdf&title=&width=820.3636363636364)
注意:
重要常量及属性
常量
/* ---------------- Constants -------------- */
//默认的HashEntry类型的table数组初始化容量16
static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认的并发级别16,指的是Segment类型的数组segments的容量
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
static final int MAXIMUM_CAPACITY = 1 << 30;
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大并发级别,2的16次方
static final int MAX_SEGMENTS = 1 << 16;
static final int RETRIES_BEFORE_LOCK = 2;
属性
final int segmentMask;
final int segmentShift;
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
内部类
Segment
/**
(这个Segment可以理解为一个小的HashMap)
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
...
}
重要属性
/***************************1.7*****************************/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
// 链表数组
transient volatile HashEntry<K,V>[] table;
//Segment中元素的数量,
transient int count;
//对count的大小造成影响的操作的次数(比如put或者remove操作)
transient int modCount;
// 阈值,段中元素的数量超过这个值就会对Segment进行扩容
transient int threshold;
// 段的负载因子,其值等同于ConcurrentHashMap的负载因子
final float loadFactor;
...
}
对比1.6的
/***************************1.6*****************************/
//对count的大小造成影响的操作的次数(比如put或者remove操作)
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// Segment中元素的数量,可见的
transient volatile int count;
//对count的大小造成影响的操作的次数(比如put或者remove操作)
transient int modCount;
// 阈值,段中元素的数量超过这个值就会对Segment进行扩容
transient int threshold;
// 链表数组
transient volatile HashEntry<K,V>[] table;
// 段的负载因子,其值等同于ConcurrentHashMap的负载因子
final float loadFactor;
// ...
}
构造方法
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
put方法
跟HashMap中的put方法非常类似,当添加Segment元素的时候首先要去找到该元素所在的Segment数组的下标,然后去判断一下该位置是否已经存在一个Segment对象了,倘若没有的话就去new创建一个Segment对象,调用该对象的put方法将我们添加的元素封装成一个HashEntry对象,并将其放进该位置对应的Segment对象内部
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
remove方法
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
rehash方法
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
跟锁相关的方法
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
clear方法
final void clear() {
lock();
try {
HashEntry<K,V>[] tab = table;
for (int i = 0; i < tab.length ; i++)
setEntryAt(tab, i, null);
++modCount;
count = 0;
} finally {
unlock();
}
}
}
replace方法
final boolean replace(K key, int hash, V oldValue, V newValue) {
if (!tryLock())
scanAndLock(key, hash);
boolean replaced = false;
try {
HashEntry<K,V> e;
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
if (oldValue.equals(e.value)) {
e.value = newValue;
++modCount;
replaced = true;
}
break;
}
}
} finally {
unlock();
}
return replaced;
}
final V replace(K key, int hash, V value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V> e;
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
e.value = value;
++modCount;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
HashEntry
/**
相当于输出为用户可见的Map.Entry,也就是说跟Map.Entry类似
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
构造方法(五个)
无参构造方法
/*
DEFAULT_INITIAL_CAPACITY:默认的HashEntry类型的table数组初始化容量16
DEFAULT_LOAD_FACTOR:默认的加载因子0.75f
DEFAULT_CONCURRENCY_LEVEL:默认的并发级别16,指的是Segment类型的数组segments的容量
*/
public ConcurrentHashMap() {
//无参构造方法内部调用的还是传递3个参数的构造方法
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
构造方法一
public ConcurrentHashMap(int initialCapacity) {
// 内部调用的是三个参数的构造方法
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
假设传递进来的initialCapacity是32,也就是说table数组的容量是32,并发级别DEFAULT_CONCURRENCY_LEVEL取默认的16,即Segment类型的数组segments的容量是16,这样就有数组segments的一个位置装table数组的2个位置。
也就是说,segments数组对象里面的table数组的是算出来的,传递进来的initialCapacity 容量32是segments数组中所有的table数组的总容量(segments数组中的一个元素就对应一个table数组)
构造方法二
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
//两个参数的构造方法内部调用的还是三个参数的构造方法,说明三个参数的构造方法才是关键
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
构造方法三
public ConcurrentHashMap(
int initialCapacity,
float loadFactor,
int concurrencyLevel
) {
//验证参数是否合理
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//判断并发级别
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
//最终是Segment数组的大小
int ssize = 1;
//ssize从1开始循环,concurrencyLevel默认16
while (ssize < concurrencyLevel) {
//concurrencyLevel=16的话,sshift=4,即二的sshift次方等于concurrencyLevel
++sshift;
// 左移一位,相当于ssize*2
ssize <<= 1;
}
//若果concurrencyLevel是16,那么退出循环时ssize就是16;如果concurrencyLevel是17,退出循环是ssize是32;也就是说,这个循环的作用就是寻找大于concurrencyLevel的最小二次方数
//
this.segmentShift = 32 - sshift;
//segment数组长度-1,之后获取segment数组下标的时候用这个进行相与运算
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//假如这里计算c=1,table数组容量也不会是1,而是2
int c = initialCapacity / ssize;
//相当于向上取整,使之变为2的次方
if (c * ssize < initialCapacity)
++c;
//MIN_SEGMENT_TABLE_CAPACITY表示table数组默认容量2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 当c=1时不进入循环
while (cap < c)
cap <<= 1;
// create segments数组 and segments[0]对象
Segment<K,V> s0 = new Segment<K,V>(
loadFactor,
(int)(cap * loadFactor), //计算扩容阈值
(HashEntry<K,V>[])new HashEntry[cap]
);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//将创建的segment对象放到segments数组中
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
虽然ConcurrentHashMap它是一个Segment数组,但是我们在添加元素的时候,还是得根据key去算出哈希值进而得到一个下标。也就是说,首先还是要去找出我们添加的元素应该存放到Segment数组的哪一个下标位置;去找这个下标的时候它跟HashMap类似,都是用哈希码去跟Segment数组长度减一进行相与,所以这里的Segment数组的容量大小还是要控制一下,即始终让其容量为2的次方数。
这里为什么要在构造方法里面先生成一个s0对象?这是因为跟HashMap中的put方法非常类似,当添加Segment元素的时候首先要去找到该元素所在的Segment数组的下标,然后去判断一下该位置是否已经存在一个Segment对象了,倘若没有的话就去new创建一个Segment对象,调用该对象的put方法将我们添加的元素封装成一个HashEntry对象,并将其放进该位置对应的Segment对象内部。如果说我们没有提前生成一个S0对象的话,我们去创建这个Segment对象的时候就需要自己去确定几个参数,其中就包括Segment对象内部的table数组的大小以及扩容的阈值,换句话说,没有提前生成一个S0对象的话,我们在每次创建Segment对象的时候都需要我们去确定一下这个Segment对象内部的table数组的大小以及扩容的阈值。如果提前创建了一个S0对象,我们就不需要每次创建新的Segment对象的时候都去确定这些参数了。可以理解为将这个S0对象当成原型了。
UNSAFE类
调用构造方法的时候,构造方法中会间接去调用一个UNSAFE类的方法,这里我们分析一下这个类的作用;
这个UNSAFE类在ConcurrentHashMap中的静态代码块进行初始化的
由于我们自己写的类使用的是App应用类加载器进行加载的,所以使用该语句调用方法返回UNSAFE对象时会报错;而ConcurrentHashMap使用的是boot类加载器进行加载,所以不会报错;
我们可以通过反射机制来获取到该UNSAFE类对象,这个UNSAFE类底层是使用CAS来保证原子操作,达到线程安全的目的;
PUT方法源码分析
大概流程
PUT方法的大概流程,即当我们想要调用ConcurrentHashMap的PUT方法往里面添加元素的时候,首先会根据元素的key值生成一个hashCode哈希值,得到这个哈希值后会去做两件事:
第一件事就是将这个哈希值跟Segment数组长度-1进行相与操作,得到新添加的元素放在Segment数组中的下标位置;
得到这个下标位置后,判断该位置上的Segment对象是否为空,若是为空的话就得去给该位置生成一个Segment对象。然后去调用这个Segment对象的PUT方法,这个Segment对象的PUT方法里面仍然是根据之前的哈希值去计算元素应该放在Segment对象内部的table数组的哪个位置,内置的算法也是与操作,只是数组的长度变成了table数组的长度;得到该下标后就将元素添加进table数组的该位置。
下面是ConcurrentHashMap中PUT方法源码
/**
*/
public V put(K key, V value) {
Segment<K,V> s;
//若是添加的元素value值为null,抛出异常
if (value == null)
throw new NullPointerException();
//这里的key没有进行处理,里面的hash方法也没有处理就调用了它的hashCode方法,也就是说,如果你的key是null的话,调用hashCode方法就会报错。这就说明1.7中的ConcurrentHashMap不支持key或者value为null的元素。
int hash = hash(key);
//这行代码就是用来计算元素存放在Segment数组下标的哪一个位置的
int j = (hash >>> segmentShift) & segmentMask;
//调用UNSAFE.getObject方法去获取 Segment数组的第j个位置的值,并判断是否为空
if (
( s = (Segment<K,V>)UNSAFE.getObject( segments, (j << SSHIFT) + SBASE)) == null
)
//若果是空的话就去创建一个Segment对象
s = ensureSegment(j);
//不为空就调用该对象的put方法将元素进行添加
return s.put(key, hash, value, false);
}
注意下面这段代码:它的目的是计算元素存放在Segment数组下标的哪一个位置
int j = (hash >>> segmentShift) & segmentMask;
它明明可以直接用hash去跟segmentMask相与就行了,为什么还要进行右移segmentShift位呢?而这个segmentShift是在构造方法的时候进行赋值的,跟concurrencyLevel有关
/* https://www.bilibili.com/video/BV1x741117jq?p=4&t=2038.7 33分 */
this.segmentShift = 32 - sshift;
//concurrencyLevel=16的话,sshift=4,即二的sshift次方等于concurrencyLevel
结论:因为这个哈希值要用两次,一次是算元素在segment数组中的位置,这个时候取的是哈希值的高位计算;第二次用在计算元素在segment对象内部的table数组中位置,这个时候取的是哈希值的低位进行计算;
ensureSegment方法
这个方法是得到元素在这个Segment数组下标位置后,判断该位置上的Segment对象是否为空,若是为空的话就得调用该ensureSegment方法去给该位置生成一个Segment对象。这个时候就有可能会存在并发问题了,比如同时有两个线程同时使用同一个ConcurrentHashMap对象且同时调用PUT方法又同时来到segment数组的第J个位置来创建segment的对象,这个时候可能会产生覆盖的问题,毕竟只能有一个线程创建segment对象。我们来看看ConcurrentHashMap是如何解决这个并发问题的。
假如真的有这么巧的情况出现,我们要保证其中一个线程创建了segment对象之后,另一个线程只需要返回这个创建好的segment对象就行了。不管是哪个线程创建的,只要保证每个线程都能拿到这个生成的对象即可。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
//跟外面一层的获取第j位置的值那个操作类似
long u = (k << SSHIFT) + SBASE; //左移又加SBASE
Segment<K,V> seg;
/*
当线程执行完这个方法的前三行代码之后,调用UNSAFE类的方法getObjectVolatile判断第u个位置是否已经生成了segment对象,生成了的话就跳出判断直接返回这个seg对象;
没有生成的话就执行if里面的代码进行生成;
*/
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//这个if里面的代码目的是生成一个seg对象
//从S0原型获取创建对象时需要的参数
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//上面还没有真正生成seg对象,而是为其所需的属性进行准备
//准备工作做完之后又去判断是否有其他线程已经创建好seg对象
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
//第二次判断之后才真正去创建一个s对象,但仅仅是将其创建出来,还没有将其放在Segment数组内
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//不断利用CAS进行自旋尝试,将s对象赋值给seg对象
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//CAS操作,如果数组ss的第u个位置为null的话就将第u个位置更新为seg
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
Segment对象的PUT方法
这个put方法是在Segment下标确定后且该位置上的Segment对象已经存在的情况下,调用该Segment对象的Put方法将元素添加到Segment对象内部的HashEntry数组table中。而且是要支持并发且安全地将元素放到table数组中。
/*
为什么这里要加锁而不是用cas呢?因为我们将元素添加到数组table中时更多的时候是将元素往table数组中的链表插入元素,而不是像在Segment数组的某一个位置为空时创建Segment对象。正是因为更多的情况是在table数组中链表插入元素,所以cas操作对将元素插入链表中就没什么展示的空间。所以这里就只能加锁来保证并发安全性了,包括在1.8中的ConcurrentHashMap中也是用到锁但不是ReentrantLock。
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
/**
tryLock()方法跟lock()方法的区别是是否阻塞,tryLock()方法不阻塞
lock方法在获取不到锁的时候会进行阻塞等待,直到获取到锁
*/
//首先尝试加锁,若加锁成功则返回一个true,若加锁失败则里面返回一个false;也就是说这里获取到锁的话node的值就是一个null;没有获取到锁的话就去调用scanAndLockForPut方法,他的目的仍然是去获取锁;
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//获取到Segment对象内部的table数组
HashEntry<K,V>[] tab = table;
//用哈希码进行与操作获取元素在table数组的位置
int index = (tab.length - 1) & hash;
//获取到table数组中第index位置的值
HashEntry<K,V> first = entryAt(tab, index);
//开始遍历该位置的链表(如果已经链化的话)
for (HashEntry<K,V> e = first;;) {
//倘若当前元素不为空,遍历该链表是否存在跟当前添加元素相同的key
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//onlyIfAbsent标记表示不存在的情况下,直接进行赋值
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//遍历完节点后e=null,跳转到else分支
e = e.next;
}
//当前元素是空表示该位置还没有元素,直接将其放进去即可(当然需要判断是否扩容)
else {
if (node != null)
node.setNext(first);
else
//将元素进行封装,且包含头插法过程
node = new HashEntry<K,V>(hash, key, value, first);
//这个count表示该位置的table数组中的所有元素
int c = count + 1;
//是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//这个node表示的是table当前位置的头节点
rehash(node);
else
//没有达到扩容条件的话就去将封装好的元素放在table中的index位置,里面没有CAS操作,因为加锁了。但是里面改的是内存里面的值而不是线程的工作内存,保证可见性
setEntryAt(tab, index, node);
//修改次数自增
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
scanAndLockForPut方法
在 tryLock()方法获取锁失败的时候就会调用该方法继续尝试获取锁;这个方法里面即用到了不阻塞的 tryLock()方法,又使用阻塞的 lock() 方法去获取锁。注意下面代码的区别
/***************方式一*****************/
while (!tryLock()) {
sout("在尝试获取锁的期间,没有获取到锁时执行循环");
}
sout("获取到锁后退出循环");
/***************方式二*****************/
lock();
/***********
虽然这两个写法都会不断地尝试去获取锁,但是方式二在获取锁期间若是没有获取到锁,则会阻塞线程,同时在次期间CPU也会受到牵连;而方式一虽然也是不断尝试,但是在期间CPU还是能够做其他事情的
*************/
ConcurrentHashMap中的scanAndLockForPut方法就采用了这个思想,在尝试获取锁且获取到锁之前去做一些其他的事情,比如说去提前创建一个HashEntry对象。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//取到hash对应下标的元素
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
//重试次数,表示循环多少次后直接调用lock方法
int retries = -1; // negative while locating node
//还没有获取到锁的话就进入循环提前创建对象
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//每次循环只能进某个分支;第一次循环retries=-1,
if (retries < 0) {
//要么链表遍历结束,要么first为null
if (e == null) {
if (node == null) // speculatively create node
//这个对象不一定需要在这里创建,所以外面一层的if还要进行判断
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//下面这两个分支才是进行重试获取锁的代码
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
rehash扩容方法
这个rehash扩容方法是Segment类内部的一个扩容方法,它的大概思路跟HashMap的扩容思路是类似的,有一点不一样的地方。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
//计算新的扩容阈值
threshold = (int)(newCapacity * loadFactor);
//新的table数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//为了方便取余
int sizeMask = newCapacity - 1;
//遍历旧table数组
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
//记录下一个元素
HashEntry<K,V> next = e.next;
//直接取当前元素哈希值进行与操作计算元素在新数组的下标
int idx = e.hash & sizeMask;
if (next == null) // 只有一个元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//从next节点开始遍历,这个for循环结束后,只记录链表在新数组相同位置的最后几个节点
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//将上面for循环得到的在新数组相同下标的几个节点的第一个节点移动到新数组,该节点后面的就不需要移动
newTable[lastIdx] = lastRun;
//从当前链表头节点开始转移剩余的节点
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//将需要新添加的节点添加到扩容后的数组中
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
GET方法源码分析
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
//获取下标
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//遍历链表查看是否存在
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
JDK1.8ConcurrentHashMap源码
ArrayList源码
https://www.bilibili.com/video/BV1j4411L7f4?t=190.3
源码分析(一)
重要常量以及属性
常量
属性
构造方法
构造方法一
/**
DEFAULTCAPACITY_EMPTY_ELEMENTDATA是一个默认的空数组
*/
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
构造方法二
/**
*
* @param initialCapacity 初始化数组的长度
* @throws IllegalArgumentException if the specified initial capacity is negative
*/
public ArrayList(int initialCapacity) {
if (initialCapacity > 0) {
//创建一个指定容量的数组
this.elementData = new Object[initialCapacity];
} else if (initialCapacity == 0) {
//容量为0的话仍是赋值成一个空数组
this.elementData = EMPTY_ELEMENTDATA;
} else {
throw new IllegalArgumentException("Illegal Capacity: "+initialCapacity);
}
}
构造方法三
/**
* @param c the collection whose elements are to be placed into this list
* @throws NullPointerException if the specified collection is null
*/
public ArrayList(Collection<? extends E> c) {
//将传入的集合c变成一个数组赋值给a
Object[] a = c.toArray();
//判断该集合是否有元素
if ((size = a.length) != 0) {
//判断传入的集合是否是ArrayList类型的集合
if (c.getClass() == ArrayList.class) {
elementData = a;
} else {
//如果不是就将其进行拷贝并赋值给elementData
elementData = Arrays.copyOf(a, size, Object[].class);
}
} else {
// 传入的集合没有元素,将其赋值成空数组
elementData = EMPTY_ELEMENTDATA;
}
}
add添加方法
方法一
public boolean add(E e) {
//将当前数组元素个数值size+1后传入该方法检查是否需要进行扩容,需要就扩容
ensureCapacityInternal(size + 1);
// 进行赋值
elementData[size++] = e;
return true;
}
方法二
public void add(int index, E element) {
//检查当前下标是否异常,里面是越界判断
rangeCheckForAdd(index);
// 同样是检查是否扩容
ensureCapacityInternal(size + 1);
//将elementData的index 之后的元素往后移,为新增的元素腾地儿
System.arraycopy(elementData, index, elementData, index + 1, size - index);
//将腾出来的index位置用添加的新值覆盖
elementData[index] = element;
size++;
}
扩容相关的方法
https://www.bilibili.com/video/BV1j4411L7f4?t=1845.9
30分钟
ensureCapacityInternal方法
private void ensureCapacityInternal(int minCapacity) {
//先调用calculateCapacity方法计算容量,然后将返回的容量传入
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}
private void ensureExplicitCapacity(int minCapacity) {
//修改次数+1
modCount++;
//判断是否满足扩容标准:最小容量 - elementData数组长度
if (minCapacity - elementData.length > 0)
//进行数组扩容
grow(minCapacity);
}
private static int calculateCapacity(Object[] elementData, int minCapacity) {
//如果elementData是空数组的话返回一个,返回两者之中大的一方
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
//默认容量DEFAULT_CAPACITY是10,用户传入的容量小于10的话就返回10,大于10就返回用户的容量值
return Math.max(DEFAULT_CAPACITY, minCapacity);
}
//
return minCapacity;
}
grow方法
该方法是扩容的关键
private void grow(int minCapacity) {
// 保存旧数组容量大小
int oldCapacity = elementData.length;
//计算新数组容量:旧容量=旧容量+旧容量/2(右移一位相当于除以2)
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}
删除方法
方法一
/*
根据指定下标删除元素
*/
public E remove(int index) {
//同样是检查index下标在数组中是否越界
rangeCheck(index);
//修改次数增一
modCount++;
//返回要删除的元素
E oldValue = elementData(index);
//将index+1以及之后的元素向前移动,覆盖要删除的元素(就是将删除位置之后的元素覆盖被删除元素)
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--size] = null; //将最后一个元素清空
return oldValue;
}
方法二
/*
删除指定元素
*/
public boolean remove(Object o) {
//先判断要删除的元素是否为空,不管是否为空都调用fastRemove方法进行快速删除
if (o == null) {
//没有找到也不影响结果
for (int index = 0; index < size; index++)
if (elementData[index] == null) {
fastRemove(index);
return true;
}
} else {
for (int index = 0; index < size; index++)
//匹配到要删除的元素
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
//没有匹配到要删除的元素
return false;
}
fastRemove
//根据元素进行删除,不需要判断下标是否越界
private void fastRemove(int index) {
modCount++;
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--size] = null; // clear to let GC do its work
}
为什么使用迭代器遍历过程中调用remove方法修改集合中的元素会报错,这是因为下图中两个变量不一致