一、CAS
CAS(Compare And Swap,比较并交换),当多个线程并发处理某个属性的情况,保证线程安全可以使用锁机制,synchronized、ReentrantLock,synchronized需要阻塞线程,需要进入内核态,是一个比较重的操作,而且性能不高,ReentrantLock需要手动的加锁解锁,基于AQS和Lock实现,底层也是阻塞线程。
CAS对于多个线程操作变量,先进行比较,如果与原值相等,则修改内存中的值,如果不相等,则不修改返回最新的值。
CAS 可以看作是它们合并后的整体 ——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的,属于乐观锁的一种实现方式,Java原子类中的递增操作就通过CAS自旋实现的,在没有阻塞线程的情况下实现多线程之间变量的同步。
1、CAS的应用
在 Java 中,CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操作
它们都是 native 方法,由 Java 虚拟机提供具体实现,这意味着不同的 Java 虚拟机对它们的实现可能会略有不同, 以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别 是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。
public class CASTest {public static void main(String[] args) {Entity entity = new Entity();Unsafe unsafe = UnsafeFactory.getUnsafe();long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");System.out.println(offset);boolean successful;// 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段更新值successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);System.out.println(successful + "\t" + entity.x);successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);System.out.println(successful + "\t" + entity.x);successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);System.out.println(successful + "\t" + entity.x);}}class Entity{int x;}
CAS是在unsafe类中,代码中首先要获得偏移量,在Entity对象中有一个int类型的属性,其中对象头占8个字节,类型指针4个字节(指针压缩),所以属性x的偏移量是12,成员属性int类型4个字节,对象大小是16字节。
第一次cas和第二次cas的修改都可以成功,第三次就会失败,因为第三次x的值已经不等于3了。
对于先比较再更新,这两次读写操作,会在底层保证原子性,但是不能保证可见性,所以java对于cas进行了封装,使其可以保证可见性和有序性,所以java的cas操作和操作系统的cas原语有所不同,底层的cas返回的是原值或者最新的值,java中的cas返回的是一个布尔类型,java在cas这个原子指令前加了lock前缀指令,用来保证可见性。
2、CAS源码分析
Hotspot 虚拟机对compareAndSwapInt 方法的实现如下:
核心逻辑在Atomic::cmpxchg方法中,这个根据不同操作系统和不同CPU会有不同的实现。这里我们以linux_64x的为例,查看Atomic::cmpxchg的实现。
cmpxchgl的详细执行过程: 首先,输入是”r” (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表 示compare_value存入eax寄存器,而exchange_value、dest、mp的值存入任意的通用寄 存器。嵌入式汇编规定把输出和输入寄存器按统一顺序编号,顺序是从输出寄存器序列从左 到右从上到下以“%0”开始,分别记为%0、%1∙∙∙%9。也就是说,输出的eax是%0,输入 的exchange_value、compare_value、dest、mp分别是%1、%2、%3、%4。 因此,cmpxchg %1,(%3)实际上表示cmpxchg exchange_value,(dest) 需要注意的是cmpxchg有个隐含操作数eax,其实际过程是先比较eax的值(也就是 compare_value)和dest地址所存的值是否相等, 输出是”=a” (exchange_value),表示把eax中存的值写入exchange_value变量中。 Atomic::cmpxchg这个函数最终返回值是exchange_value,也就是说,如果cmpxchgl执行时compare_value和dest指针指向内存值相等则会使得dest指针指向内存值变成 exchange_value,最终eax存的compare_value赋值给了exchange_value变量,即函数最 终返回的值是原先的compare_value。此时Unsafe_CompareAndSwapInt的返回值(jint) (Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。如果cmpxchgl执行时 compare_value和(dest)不等则会把当前dest指针指向内存的值写入eax,最终输出时赋值 给exchange_value变量作为返回值,导致(jint)(Atomic::cmpxchg(x, addr, e)) == e得到 false,表明CAS失败。
现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。 不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。
我么可以通过CAS操作来保证多线程下的变量操作是线程安全的。
public class CASLock {//加锁标记private volatile int state;private static final Unsafe UNSAFE;private static final long OFFSET;static {try {UNSAFE = UnsafeFactory.getUnsafe();OFFSET = UnsafeFactory.getFieldOffset(UNSAFE, CASLock.class, "state");} catch (Exception e) {throw new Error(e);}}public boolean cas() {return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1);}public int getState() {return state;}public void setState(int state) {this.state = state;}}
代码中,多个线程同时修改变量sum,需要判断state的值是否为0,如果为0,则进行cas操作把state修改为1,cas操作保证统一时间只有一个线程操作成功,其他未操作成功的线程会继续循环,等待cas修改成功后再对sum进行自增操作,state必须使用volatile。
public class Test {private volatile static int sum = 0;static CASLock casLock = new CASLock();public static void main(String[] args) {for (int i = 0; i < 10; i++) {Thread thread = new Thread(() -> {for(;;){if(casLock.getState() == 0 && casLock.cas()) {try {for (int j = 0; j < 10000; j++) {sum++;}System.out.println(casLock.getState());} finally {casLock.setState(0);}break;}}});thread.start();}try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(sum);}}
3、CAS缺陷
1)代码中可以看出,大量线程都在自旋,如果逻辑比较复杂,会导致系统变慢,CAS 长时间地不成功,则会给 CPU 带来非常大的开销。
2)只能保证一个共享变量原子操作,这个问题在AtomicInteger等对象的自增 incrementAndGet() 方法,
只能保证一个对象的cas操作。
public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}
public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}
这个方法底层是使用do-while循环进行比较与交换,成功后返回原值,所以最后返回的值会+1。this.getIntVolatile(var1, var2) 这个方法是一个本地方法,会根据偏移量从内存中获取值。
3)会存在 ABA 问题。
二、ABA问题
当一个变量X初始值是A,线程1获取了X的初始值,此时线程2也获取了X的初始值,线程2先把变量X的值通过CAS修改为B,然后又修改回了A,此时线程1通过CAS修改变量X的值仍然是成功的,因为线程1并不知道变量X已经被修改过了。
ABA问题的解决方案:
数据库有个锁称为乐观锁,是一种基于数据版本实现数据同步的机制,每次修改一次数据,版本就会进行累加。 同样,Java也提供了相应的原子引用类AtomicStampedReference
private static class Pair<T> {final T reference;final int stamp;private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}}private volatile Pair<V> pair;
reference即我们实际存储的变量,stamp是版本,每次修改可以通过+1保证版本唯一性。这样 就可以保证每次修改后的版本也会往上递增。
public class AtomicStampedReferenceTest {public static void main(String[] args) {// 定义AtomicStampedReference Pair.reference值为1, Pair.stamp为1AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);new Thread(()->{int[] stampHolder = new int[1];int value = (int) atomicStampedReference.get(stampHolder);int stamp = stampHolder[0];log.debug("Thread1 read value: " + value + ", stamp: " + stamp);// 阻塞1sLockSupport.parkNanos(1000000000L);// Thread1通过CAS修改value值为3 stamp是版本,每次修改可以通过+1保证版本唯一性if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) {log.debug("Thread1 update from " + value + " to 3");} else {log.debug("Thread1 update fail!");}},"Thread1").start();new Thread(()->{int[] stampHolder = new int[1];int value = (int)atomicStampedReference.get(stampHolder);int stamp = stampHolder[0];log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);// Thread2通过CAS修改value值为2if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) {log.debug("Thread2 update from " + value + " to 2");// do somethingvalue = (int) atomicStampedReference.get(stampHolder);stamp = stampHolder[0];log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);// Thread2通过CAS修改value值为1if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) {log.debug("Thread2 update from " + value + " to 1");}}},"Thread2").start();}}
AtomicMarkableReference可以理解为上面AtomicStampedReference的简化版,就是不关心修改过几次,仅仅关心是否修改过。因此变量mark是boolean类型,仅记录值是否有过修改。
private static class Pair<T> {final T reference;final boolean mark;private Pair(T reference, boolean mark) {this.reference = reference;this.mark = mark;}static <T> Pair<T> of(T reference, boolean mark) {return new Pair<T>(reference, mark);}}private volatile Pair<V> pair;
三、 Atomic原子操作类
1、Atomic原子操作类介绍
在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量i=1,比 如多个线程执行i++操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过 Synchronized进行控制来达到线程安全的目的。但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案。实际上,在J.U.C下的atomic包提供了一系列的操作简单, 性能高效,并能保证线程安全的类去更新基本类型变量,数组元素,引用类型以及更新对象中的字段类型。atomic包下的这些类都是采用的是乐观锁策略去原子更新数据,在java中则是使用CAS操作具体实现。
在java.util.concurrent.atomic包里提供了一组原子操作类:
基本类型:AtomicInteger、AtomicLong、AtomicBoolean;
引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicReferenceFieldUpdater 原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、 LongAccumulator、LongAdder、Striped64
2、原子更新基本类型
以AtomicInteger为例总结常用的API
// 自增1,并返回旧值public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);}// 自增指定的值,并返回旧值public final int getAndAdd(int delta) {return unsafe.getAndAddInt(this, valueOffset, delta);}// 自增1,并返回自增后的值public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}// 自增指定的值,并返回自增后的值public final int addAndGet(int delta) {return unsafe.getAndAddInt(this, valueOffset, delta) + delta;}// 设置为指定的值,并返回旧值public final int getAndSet(int newValue) {return unsafe.getAndSetInt(this, valueOffset, newValue);}
3、原子更新数组类型
public class AtomicIntegerArrayTest {static int[] value = new int[]{ 1, 2, 3, 4, 5 };static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);public static void main(String[] args) throws InterruptedException {//设置索引0的元素为100atomicIntegerArray.set(0, 100);System.out.println(atomicIntegerArray.get(0));//以原子更新的方式将数组中索引为1的元素与输入值相加atomicIntegerArray.getAndAdd(1,5);// [100,7,3,4,5]System.out.println(atomicIntegerArray);}}
4、原子更新引用类型
AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性。
public class AtomicReferenceTest {public static void main( String[] args ) {User user1 = new User("张三", 23);User user2 = new User("李四", 25);User user3 = new User("王五", 20);//初始化为 user1AtomicReference<User> atomicReference = new AtomicReference<>();atomicReference.set(user1);//把 user2 赋给 atomicReferenceatomicReference.compareAndSet(user1, user2);System.out.println(atomicReference.get());//把 user3 赋给 atomicReferenceatomicReference.compareAndSet(user1, user3);System.out.println(atomicReference.get());}}@Data@AllArgsConstructorclass User {private String name;private Integer age;}
5、对象属性原子修改器
AtomicIntegerFieldUpdater可以线程安全地更新对象中的整型变量。
public class AtomicIntegerFieldUpdaterTest {public static class Candidate {//字段必须是volatile类型volatile int score = 0;AtomicInteger score2 = new AtomicInteger();}// 传入要修改的类和要修改的属性public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");public static AtomicInteger realScore = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {final Candidate candidate = new Candidate();Thread[] t = new Thread[10000];for (int i = 0; i < 10000; i++) {t[i] = new Thread(new Runnable() {@Overridepublic void run() {if (Math.random() > 0.4) {// 属性修改器和atomica结果是一样的candidate.score2.incrementAndGet();scoreUpdater.incrementAndGet(candidate);realScore.incrementAndGet();}}});t[i].start();}for (int i = 0; i < 10000; i++) {t[i].join();}System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);System.out.println("AtomicInteger Score=" + candidate.score2.get());System.out.println("realScore=" + realScore.get());}}
对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下:
1)字段必须是volatile类型的,在线程之间共享变量时保证立即可见.eg:volatile int value = 3。
2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父 类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
3)只能是实例变量,不能是类变量,也就是说不能加static关键字。
4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和 volatile是有冲突的,这两个关键字不能同时存在。
5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字 段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用 AtomicReferenceFieldUpdater。
四、LongAdder/DoubleAdder详解
AtomicLong是利用了底层的CAS操作来提供并发性的,比如addAndGet方法:
public final long getAndAdd(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta);}
public final long getAndAddLong(Object var1, long var2, long var4) {long var6;do {var6 = this.getLongVolatile(var1, var2);} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));return var6;}
上述方法调用了Unsafe类的getAndAddLong方法,该方法内部是个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。 在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下, N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,会占用CPU和抢占时间片,此时AtomicLong的自旋会成为瓶颈。
这就是LongAdder引入的初衷——解决高并发环境下AtomicInteger, AtomicLong的自旋瓶颈问题。
static void testLongAdder(final int threadCount, final int times)throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(threadCount);LongAdder longAdder = new LongAdder();for (int i = 0; i < threadCount; i++) {new Thread(new Runnable() {@Overridepublic void run() {for (int j = 0; j < times; j++) {longAdder.add(1);}countDownLatch.countDown();}}, "my-thread" + i).start();}countDownLatch.await();}
LongAdder和AtomicLong相比线程数越多,并发操作数越大,LongAdder的优势越明显。
1、LongAdder原理
在LongAdder中有一个基数base,如果多个线程没有产生竞争,是一个一个操作的,那么会使用cas来操作base这个基数,都是在这个基数上面自增。如果产生竞争,那么cas会失败,这样会把每个线程使用哈希运算,分配在cells数组中的某一个下标的槽位上(cell),通常cells数组的长度最大是逻辑核的数量,初始长度是2。
假设线程1分配到cell1上,那么线程1就会对cell1进行增加操作,如果有其他线程也被分配到cell1上,那么线程1和其他线程都会操作cell1。
2、LongAdder的内部结构
LongAdder内部有一个base变量,一个Cell[]数组:
base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
LongAdder的父类 Striped64 中定义了这些属性:
// 获取npc核数static final int NCPU = Runtime.getRuntime().availableProcessors();/*** Table of cells. When non-null, size is a power of 2.* 定义一个volatile修饰的cell数组,长度是2的n次幂*/transient volatile Cell[] cells;/*** Base value, used mainly when there is no contention, but also as* a fallback during table initialization races. Updated via CAS.* 基数:volatile修饰* 1、没有遇到并发竞争时,直接使用base累加数值* 2、初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),* 其他竞争初始化cells数组失败的线程会讲数值累加到base上*/transient volatile long base;/*** Spinlock (locked via CAS) used when resizing and/or creating Cells.* cas加锁的标记,扩容或者创建cells的时候会用到*/transient volatile int cellsBusy;
定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过 Unsafe来CAS操作它的值,Cell类是使用 @sun.misc.Contended 注解,解决伪共享问题
@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
3、LongAdder#add方法
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。 如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容 了。 这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。
public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}}
假设调用add(1),首先会判断Cells数据是否为空,如果为空,则表示没有创建,没有出现并发冲突,那么会继续执行“或”的判断逻辑, !casBase(b = base, b + x),这个方法是cas原子操作base,cmp是预期值,val是要修改的值,就是把base的值修改为b + x,修改返回true,则不会进入if逻辑,如果此时有线程并发的修改base,那么cas操作不会成功,会进入if逻辑。
final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);}
if逻辑中还有一个if判断,
as == null || // cells没有被创建
(m = as.length - 1) < 0 || // cells没有初始化
(a = as[getProbe() & m]) == null || // 根据当前线程转int类型,和数组的长度取模得出的下标位置的槽位为空
!(uncontended = a.cas(v = a.value, v + x)) // 取出来的Cell a,调用cas方法修改a.value 为 v + x,没有成功
static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);}
满足以上其中一个条件就会调用 longAccumulate(x, null, uncontended) 方法,这个方法是在父类 Striped64 中实现的。
代码中使用了for死循环,使用cas操作要保证失败之后再次执行,直到成功执行。
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current();h = getProbe();wasUncontended = true;}boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;// 判断cells是否已经创建并长度大于0if ((as = cells) != null && (n = as.length) > 0) {// 如果cells数据已经创建好并初始化了,取到当前线程和数组长度取模的下标,判断该// 下标的位置是否为空if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {// 把cell创建出来,给cell的value属性赋值x,x就是传进来要累加的值Cell r = new Cell(x);// 如果锁标记为0,如果为0,cas把标记改为1if (cellsBusy == 0 && casCellsBusy()) {// 如果当前线程加锁成功boolean created = false;try {Cell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {// j是cells数组下标,r是新创建的cellrs[j] = r;created = true;}} finally {// 把锁标记置为0cellsBusy = 0;}if (created)break;continue;}}collide = false;}else if (!wasUncontended)wasUncontended = true;// 如果当前线程分配的cells数组下标位置不为空,直接操作下标位置的cell,进行// cas操作value,如果更新成功则结束else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;else if (n >= NCPU || cells != as)collide = false;else if (!collide)collide = true;// 如果cell没有cas更新成功,则扩容,判断锁标记是否为0,且修改锁标记为1成功else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) {// 新创建一个cells数组,n是原来cells数组的长度,进行位移Cell[] rs = new Cell[n << 1];// 把旧数组的内容赋值到新数组中for (int i = 0; i < n; ++i)rs[i] = as[i];// 新数组赋值到成员属性cells = rs;}} finally {// 锁标记置为0cellsBusy = 0;}collide = false;continue;}h = advanceProbe(h);}// 如果cells没有创建,且锁标记等于0,且cas修改锁标记为1成功else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try {// 再次判断cells数组是否为空if (cells == as) {// 创建cells数组,长度为2Cell[] rs = new Cell[2];// 创建一个cell,把x赋值给cell的value属性rs[h & 1] = new Cell(x);// 创建好的cells数组赋值给成员变量cells = rs;init = true;}} finally {// 锁标记位置为0cellsBusy = 0;}if (init)break;}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;}}
4、LongAdder#sum方法
sum方法不是一个线程安全的方法,只能取到当前时刻的值,因为存在扩容和线程执行中的情况导致最终结果会时刻变化。
返回累加的和,也就是”当前时刻”的计数值,注意: 高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值,此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值。
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {// 遍历cells中的每个cell的value和base相加的总和for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
5、 LongAccumulator
LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而 LongAccumulator提供了自定义的函数操作。其构造函数如下:
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) {this.function = accumulatorFunction;base = this.identity = identity;}
通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator 接收2个long作为参数,并返回1个long)。LongAccumulator内部原理和LongAdder几乎完全 一样,都是利用了父类Striped64的longAccumulate方法。
public class LongAccumulatorTest {public static void main(String[] args) throws InterruptedException {// 累加 x+yLongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);ExecutorService executor = Executors.newFixedThreadPool(8);// 1到9累加IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));Thread.sleep(2000);System.out.println(accumulator.getThenReset());}}
