1、AtomicLong的缺点
AtomicLong底层使用CAS实现,在高并发场景下,CAS自旋时间长,性能较差
@HotSpotIntrinsicCandidate
public final long getAndSetLong(Object o, long offset, long newValue) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!weakCompareAndSetLong(o, offset, v, newValue));
return v;
}
2、LongAddr基本原理
LongAdder的基本思路就是分散热点,将value值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个value值进行CAS操作,减少冲突的概率
LongAdder有一个全局变量volatile long base值,当并发不高的情况下都是通过CAS来直接操作base值,如果CAS失败,则针对LongAdder中的Cell[]数组中的Cell进行CAS操作,最后使用base值加cell中的和即为最终值
[
](https://blog.csdn.net/u012881584/article/details/106133349)
3、LongAddr源码分析
3.1、伪共享问题
LongAddr继承了Striped64,其中定义了base和cell数组
abstract class Striped64 extends Number {
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return VALUE.compareAndSet(this, cmp, val);
}
final void reset() {
VALUE.setVolatile(this, 0L);
}
final void reset(long identity) {
VALUE.setVolatile(this, identity);
}
final long getAndSet(long val) {
return (long)VALUE.getAndSet(this, val);
}
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(Cell.class, "value", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
}
Striped64中Cell使用@Contended
注解进行修饰,而 @Contended
注解可以进行缓存行填充,从而解决伪共享问题
伪共享问题 :
cpu和主存之间引入了多级的cache缓存,CPU缓存中的数据是以缓存行为单位处理的,缓存行通常是 64 字节(常用处理器的缓存行是 64 字节的,比较旧的处理器缓存行是 32 字节),并且它有效地引用主内存中的一块地址。在程序运行的过程中,缓存每次更新都从主内存中加载连续的 64 个字节(如果访问一个 long 类型的数组时,当数组中的一个值被加载到缓存中时,另外 7 个元素也会被加载到缓存中),所有cpu对内存连续的数据结构更为友好。但是当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能 ,如cpu核心1的线程a和cpu核心2的线程b操作了同一个缓存行的两个独立变量,当a更新目标变量后cpu核心2的对应缓存行失效,需要从主从重新读取。
伪共享解决方案:
进行缓存行填充(Padding),让不同线程操作的对象处于不同的缓存行
如果一条缓存行有 64 字节,而 Java 程序的对象头固定占 8 字节(32位系统)或 12 字节( 64 位系统默认开启压缩, 不开压缩为 16 字节),所以我们只需要填 6 个无用的长整型补上6*8=48字节,让不同的 VolatileLong 对象处于不同的缓存行,就避免了伪共享
3.2、add方法
供外部调用的api
public void add(long x) {
/**
* cs: cells引用
* b: base值
* v: 期望值
* m: cell数组的长度
* c: 当前线程命中的cell单元格
*
*/
Cell[] cs; long b, v; int m; Cell c;
/**
* 条件1: (cs = cells) != null
* true: cells已经初始化了 当前线程应该将新数据写到cell中
* false: cells还未被初始化 当前线程应该将数据写到base中
* 条件2: !casBase(b = base, b + x)
* 只有条件1为false 即cells还未被初始化时会执行
* true: 当前线程cas修改base值失败
* false: 当前线程cas修改base值成功
*/
if ((cs = cells) != null || !casBase(b = base, b + x)) {
/**
* 进入条件:
* 1、cells已经初始化了 当前线程应该将新数据写到cell中
* 2、cells还未被初始化,cas发生竞争,当前线程cas操作失败
*/
boolean uncontended = true;
/**
*条件1: cs == null || (m = cs.length - 1) < 0
* true: cells还未被初始化,是cas发生竞争操作失败进入代码块的
* false: cells已经初始化了 当前线程应该将新数据写到cell中
*条件2:c = cs[getProbe() & m]) == null
* getProbe(): 相当于获取当前线程的hash值
* m: cell的数量 -1 ,m为2的n次方,getProbe() & m 相当于 使用hash值对数组长度取余,但是位运算操作更快
*
* 前提条件:cells已经初始化了
* true: 说明当前线程对应下标的cell为空,需要创建longAccumulate支持
* false:说明当前线程对应下标的cell不为空,下一步需要将x值添加到cell中
*
*条件3: !(uncontended = c.cas(v = c.value, v + x))
* 前提条件:cells已经初始化了,且当前线程对应下标的cell不为空
* 对目标cell进行一次cas尝试操作
* true: cas操作失败 当前线程对应的cell有竞争
* false: cas成功
*/
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x))){
/**
* 进入条件:
* 1、cells还未被初始化,是cas发生竞争操作失败进入代码块的
* 2、cells已未被初始化,但是当前线程对应下标的cell为空
* 3、cells已未被初始化,当前线程对应下标的cell不为空,但是对目标cell进行cas操作时失败 当前线程对应的cell有竞争
*/
longAccumulate(x, null, uncontended);
}
}
}
3.3、longAccumulate方法
LongAddr统计实现核心方法
/**
* 进入条件:
* 1、cells还未被初始化,是cas发生竞争操作失败进入代码块的
* 2、cells已未被初始化,但是当前线程对应下标的cell为空
* 3、cells已未被初始化,当前线程对应下标的cell不为空,但是对目标cell进行cas操作时失败 当前线程对应的cell有竞争
*
*
* @param x 要累加的值
* @param fn 操作算法接口 默认为null
* @param wasUncontended 当前cell是否发生过竞争 true 未发生竞争 false 发送竞争
* 当为第3种条件进入时 wasUncontended为false 第1,2种条件为true
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//当前线程的hash值
int h;
if ((h = getProbe()) == 0) {
//当前线程还未分片hash值
ThreadLocalRandom.current();
h = getProbe();
/**
* 强制将竞争标识设置为true
*
* 默认情况下:
* 新线程的对应hash值为0,首先会写入cells[0]的位置 不将其算作一次竞争
*/
wasUncontended = true;
}
//扩容意向 false 一定不会扩容 true 可能会扩容
boolean collide = false;
done: for (;;) {
//cs : cells引用
//c :当前线程命中cell
//n: cells数组长度
//v: 期望值
Cell[] cs; Cell c; int n; long v;
/**
*CASE1: (cs = cells) != null && (n = cs.length) > 0 表示cells已经初始化了,当前线程应该将数据写到对应的cell中
*/
if ((cs = cells) != null && (n = cs.length) > 0) {
/**
* CASE1.1 :(c = cs[(n - 1) & h]) == null
* true: 当前线程对应的cell为空,需要创建 (n - 1) & h相当于 h % n
* false: 当前线程对应的cell不为空
*/
if ((c = cs[(n - 1) & h]) == null) {
//cellsBusy == 0:cells没有在扩容或者创建
if (cellsBusy == 0) {
//以累加值x 新建一个cell
Cell r = new Cell(x);
/**
* 如果cells没有在扩容或者创建,尝试使用cas获取锁
* 条件1: cellsBusy == 0
* true: 当前处于无锁状态
* false: 锁被占用
* 条件2:casCellsBusy()
* true: 当前线程获取锁成功 可以进行cell创建
* false: 当前线程获取锁失败
*/
if (cellsBusy == 0 && casCellsBusy()) {
try {
/**
* cas获取cells操作锁成功 将cells数组对应下标的元素设置为刚刚新建的cell
* rs cells引用
*
*/
Cell[] rs; int m, j;
/**
* 条件1:(rs = cells) != null 恒成立
* 条件2:m = rs.length) > 0 恒成立
* 条件3:rs[j = (m - 1) & h] == null
* 类似单例双重锁检查 防止其它线程初始化位置,当前线程再次初始化 导致数据丢失
* true: 没有其它线程对目标下标创建cell
* false: 其它线程对目标下标创建cell
*/
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
//释放锁
cellsBusy = 0;
}
//此时当前线程对应的cell已经初始化了 重试操作
continue;
}
}
//扩容意向设置为false
collide = false;
}
//CASE1.2: cells已未被初始化,当前线程对应下标的cell不为空,但是对目标cell进行cas操作时失败
else if (!wasUncontended){
//把发送竞争标识设置为true 表示当前cell未发生竞争 后面将线程hash值重置
wasUncontended = true;
}
//CASE1.3: 对当前cell进行cas操作
// 前置条件:当前线程重置过hash值,新命中的cell不为空
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x))){
//true: 当前cell进行cas操作成功 退出
// false: 重置hash值后,新命中的cell也有竞争,CAS失败
break;
}
/**
* CASE1.4:
* 条件1:n >= NCPU
* true: 当前cells数量大于cpu个数, 不再扩容
* false: 可以扩容
* 条件2:cells != cs
* true: 其它线程已经扩容过了,当前线程重置hash值后重试即可
* false: 其它线程未扩容cells
*
*/
else if (n >= NCPU || cells != cs){
collide = false;
}
/**
* CASE1.5: 设置扩容意向为true 但是不一定真的发生扩容,此时再去重试一次,如果还发生竞争就走到CASE1.6
* 前置条件:当前线程重置过hash值,新命中的cell不为空,cas发生竞争,操作失败,并且cells还可以扩容
*
*/
else if (!collide){
collide = true;
}
/**
* CASE1.6: 真正扩容逻辑
* 条件1:cellsBusy == 0
* true: 当前为无锁状态 当前线程可以去竞争锁
* false: 当前为有锁状态 cells在进行扩容
* 条件2:casCellsBusy()
* true 当前线程获取锁成功 可以执行扩容逻辑
* false: 当前时刻有其它线程在进行扩容操作
*
*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs){
//和上面的一样 类似单例双重锁检查 防止其它线程扩容,当前线程再次扩容
cells = Arrays.copyOf(cs, n << 1);
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重置线程hash值
h = advanceProbe(h);
}
/**
* CASE2:
* 前置条件:cells还未被初始化
* 条件1:cellsBusy == 0 当前未加锁,cell没有在扩容或者新建
* 条件2:cells == cs,cells为空 因为有可能其它线程修改了cells
* 条件3:casCellsBusy() 加锁成功 获取cells操作锁 cellsBusy改为1
*/
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try {
/**
* 类似单例双重锁检查 防止 a,b线程同时执行到cellsBusy == 0 && cells == cs 位置,然后a拿到锁,创建cells,再释放锁,此时b再次拿到锁,
* 如果不进行判断cells是否为null 则会覆盖掉cells
*/
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
}
/**
* CASE3:
* 前置条件:
* 1、cells还未被初始化,cellbusy为加锁状态 需要当前线程将数据累加到base
* 2、cells被其他线程初始化了 当前线程需要将数据累加到base
*/
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}
3.4、sum方法
LongAdder 的自增是没有返回值的,要获取当前值的时候,只能调用 sum 方法,将所有cell中的数据和base求和返回。
先自增,再获取值,不是原子操作,当多线程并发调用的时候,sum 方法返回的值必定不是一个准确的值
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}