【参考】volatile能解决多线程内存不可见问题。对于一些多读吗,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。 说明:如果是count++操作,使用如下类实现:AtomicInteger count = new AtomicInteger();count.addAndGet(1)。如果是jdk8,推荐使用LongAdder对象,比AtomicInteger性能更好(减少乐观锁的重试次数)。
以上内容来自阿里《java开发手册》。
从上面说,LongAdder在jdk8中比AtomicInteger性能要好。
一、为什么LongAdder比Atomic原子类性能好?
这里做出一个测试
/*** @Created by 勺子* @Description LongAdder Atomic测试* @Date 2022/3/1 12:59*/public class LongAdderAndAtomicTest {private static AtomicInteger a = new AtomicInteger(0);private static LongAdder b = new LongAdder();public static void main(String[] args) throws InterruptedException {testAtomicAndLongAdder(1,10000000);testAtomicAndLongAdder(10,10000000);testAtomicAndLongAdder(20,10000000);testAtomicAndLongAdder(50,10000000);testAtomicAndLongAdder(100,10000000);}static void testAtomicAndLongAdder(int threadSize, int times) throws InterruptedException {System.out.println(String.format("线程数为: %s",threadSize));testAtomic(threadSize,times);testLongAdder(threadSize,times);}/*** 测试atomic效率* @param threadSize* @param times* @throws InterruptedException*/private static void testAtomic(int threadSize, int times) throws InterruptedException {long startTime = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threadSize);for (int i = 0; i < threadSize; i++) {new Thread(){@Overridepublic void run() {for (int j = 0; j < times; j++) {a.incrementAndGet();}latch.countDown();}}.start();}latch.await();long endTime = System.currentTimeMillis();System.out.println(String.format("atomic耗时时间 :%s",endTime-startTime));}/*** 测试longAdder效率* @param threadSize* @param times* @throws InterruptedException*/private static void testLongAdder(int threadSize, int times) throws InterruptedException {long startTime = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threadSize);for (int i = 0; i < threadSize; i++) {new Thread(){@Overridepublic void run() {for (int j = 0; j < times; j++) {b.increment();}latch.countDown();}}.start();}latch.await();long endTime = System.currentTimeMillis();System.out.println(String.format("longAdder耗时时间 :%s",endTime-startTime));}}
测试结果
由此可见,在线程越多的时候,longAdder的性能是比Atomic原子类更高一些。
二、LongAdder源码阅读
首先先看下LongAdder的继承体系
LongAdder继承了Striped64 类, Striped64 类有一个静态内部类 Cell类
@sun.misc.Contended static final class Cell {//保证可见性的valuevolatile long value;//带参数的构造函数,初始化时候传递value值Cell(long x) { value = x; }//cell类的cas操作交给unsafe类实现,返回是否成功,true表示cas写入成功,false->表示cas写入失败final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;//指定属性的内存偏移量private static final long valueOffset;//静态代码块,类加载时候执行static {try {//先获取unsafe类UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;//然后获取到对应类 value的内存地址的偏移量valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}
除此之外,Striped64 还有一些其他属性:
//获取当前系统的cpu数,有什么用呢?控制cells数组长度的一个关键条件,在cells数组扩容时候,cells数组长度最大不能超过当前系统的cpu个数static final int NCPU = Runtime.getRuntime().availableProcessors();//cell数组,只要数组不为空,那么长度就是2的倍数transient volatile Cell[] cells;//多线程没有发生竞争时候,数据会累加到base上面 | 当cells在扩容时候,会将数据写入到base里transient volatile long base;//初始化cells数组或者cells数组扩容时候都需要先获取锁,0表示无锁状态,1表示有锁状态,有其他线程已经持有该锁了transient volatile int cellsBusy;//通过cas方式获取锁,调用unsafe类的cas方式实现final boolean casCellsBusy() {return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);}//获取当前线程的hash值static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);}//重置当前线程的hash值static final int advanceProbe(int probe) {probe ^= probe << 13; // xorshiftprobe ^= probe >>> 17;probe ^= probe << 5;UNSAFE.putInt(Thread.currentThread(), PROBE, probe);return probe;}
看完longAdder的结构,我们接下来看他的重要方法,add()方法
public void add(long x) {//as 表示 cells引用//b 表示获取的base值//v 表示期望值//m 表示cells 数组的长度//a 表示当前线程命中的cel单元格Cell[] as; long b, v; int m; Cell a;//条件一:true=>表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中;// false=> 当前cells未初始化,当前所有线程应该将数据写入到base中//条件二:false => 表示当前线程cas替换数据成功// true=> 表示发生竞争了,可能需要重试 或者扩容if ((as = cells) != null || !casBase(b = base, b + x)) {//什么条件会进来?//1. true=>表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中//2. true=> 表示发生竞争了,可能需要重试 或者扩容//true=> 未竞争 false=> 发生竞争boolean uncontended = true;//条件一:true=> 说明cells未初始化,也就是多线程写base发生竞争了// fasle=> 说明cells已经初始化了,当前线程应该是 找自己的cell写值//条件二:getProbe() 方法:获取当前线程的hash值 m表示cells长度减一。cells长度一定是2的次方数,15 => 1111// true=>说明当前线程对应下标的cell为空,需要创建// false=> 说明当前线程对应的cell不为空,说明 下一步想要将x值 添加到cell中//条件三:true=> 表示cas失败,意味着当前线程对应的cell有竞争了// false=> 表示cas成功if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))//进入该方法哪有三种情况?//1.cells未初始化,多线程写base发生竞争了,cas base失败//2.cell数组已经初始化了,当前线程对应cells数组下标的cell为空,需要创建cell//3.cell数组初始化了,并且cell存在,意味着当前线程对应的cell有竞争了,多线程在该cell位置发生竞争了【重试|扩容】//uncontended 默认true,false表示是cells数组已经初始化了,但是竞争cell cas失败了longAccumulate(x, null, uncontended);}}
看重要的longAccumulate方法
//进入该方法有三种情况//1.cells未初始化,多线程写base发生竞争了 cas失败,会初始化cells//2.cell数组已经初始化了,说明当前线程对应下标的cell为空,需要创建cell//3.cell数组初始化了,意味着当前线程对应的cell有竞争了【重试|扩容】//uncontended 默认true,false表示是cells数组已经初始化了,但是竞争cell cas失败了final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {//当前线程的hash值int h;//条件成立:说明当前线程还未分配hash值if ((h = getProbe()) == 0) {//给当前线程分配hash值ThreadLocalRandom.current(); // force initialization//取出当前线程的hash值,赋值给hh = getProbe();//为什么?因为默认情况下, 当前线程肯定是写入到了cells[0]位置,不把当作一次真正的竞争wasUncontended = true;}//表示扩容意向 ,默认false一定不会扩容, true 可能会扩容boolean collide = false;//自旋for (;;) {//as 表示cells数组引用//a 表示当前线程命中的cell//n 表示cells数组长度//v 表示期望值Cell[] as; Cell a; int n; long v;//case1:表示cells数组已经初始化了,当前线程应该将数据写入到对应的cell中if ((as = cells) != null && (n = as.length) > 0) {//case1.1 处理数组初始化了,但是当前线程对应下标的cell为空,进行创建cell处理if ((a = as[(n - 1) & h]) == null) {//ture->表示当前锁未被占用,false 表示锁已被占用if (cellsBusy == 0) { // Try to attach new Cell//拿当前x创建cellCell r = new Cell(x); // Optimistically create//条件一:ture->表示当前锁未被占用,false 表示锁已被占用//条件二:true-> 当前线程获取锁成功,false当前线程获取锁失败if (cellsBusy == 0 && casCellsBusy()) {//是否创建成功标记boolean created = false;try { // Recheck under lock//rs cells数组引用//m cells长度//j 当前线程命中的下表Cell[] rs; int m, j;//多线程double check//条件一,条件二恒成立//条件三:rs[j = (m - 1) & h] == null为了防止其他线程初始化了该位置,然后当前线程再初始化该cellif ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}//强制改为falsecollide = false;}//case1.2 处理第三种情况,竞争cell cas失败 线程竞争cell失败else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash//case1.3 当前线程rehash值,新命中的cell不为空 指定cell cas//true->写成功,退出循环//false-> 表示rehash之后命中的新的cell也有竞争,重试第一次else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;//case1.4 如果cells数组超过cpu核数或者已经cells数组已经扩容了,修改扩容意向为fasleelse if (n >= NCPU || cells != as)//扩容意向改为false,表示不扩容了collide = false; // At max size or stale//case1.5 修改扩容意向为true,但并不代表一定要扩容,只是有可能else if (!collide)collide = true;//case1.6 实际的扩容操作//条件一:cellsBusy == 0 true=>表示当前无锁状态,当前线程可以竞争这把锁//条件二:casCellsBusy() true->表示当前线程cas方式获取锁成功,可以执行扩容逻辑;false当前时刻有其他线程正在做扩容相关的操作else if (cellsBusy == 0 && casCellsBusy()) {try {//多线程double checkif (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {//释放锁cellsBusy = 0;}collide = false;continue; // Retry with expanded table}//当前线程的hash值 rehashh = advanceProbe( h);}//case2:前置条件cells数组未初始化,代表竞争base cas失败这种情况,进行初始化cells数组处理//条件一:true 表示当前未加锁//条件二:cells == as因为其他线程可能会在你给as赋值之前已经初始化了cells数组//条件三:true表示获取锁成功,会把cellsBusy设置为1,表示其他线程正持有这把锁else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize table//double check,处理AB线程同时执行该代码块逻辑//防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据if (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}//case3:如果竞争base失败,然后初始化数组被其他线程完成后了,那么直接加到base里面//1.当前cellsBusy加锁状态,表示其他线程正在初始化cells,所有当前线程将值累加到base//2.cells被其他线程初始化后,当前线程需要将数据累加到base中else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}
总结:三种情况
- 线程竞争base失败,会有一个获取到锁的线程执行初始化cells数组,其他线程会把数据累加到base中
- 线程cells数组已经初始化,但是cell为空,则会创建cell
- 线程cells数组已经初始化了,但是竞争cell cas失败,那么会先case1.2修改竞争失败标识,rehash重试一次, 如果失败,会修改扩容意向为true,然后第二次rehash cas重试,如果失败则最后需要进行cells扩容操作。
