一、CAS

CAS(Compare And Swap,比较并交换),当多个线程并发处理某个属性的情况,保证线程安全可以使用锁机制,synchronized、ReentrantLock,synchronized需要阻塞线程,需要进入内核态,是一个比较重的操作,而且性能不高,ReentrantLock需要手动的加锁解锁,基于AQS和Lock实现,底层也是阻塞线程。
CAS对于多个线程操作变量,先进行比较,如果与原值相等,则修改内存中的值,如果不相等,则不修改返回最新的值。
CAS 可以看作是它们合并后的整体 ——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的,属于乐观锁的一种实现方式,Java原子类中的递增操作就通过CAS自旋实现的,在没有阻塞线程的情况下实现多线程之间变量的同步。

1.png1、CAS的应用

在 Java 中,CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操作
2.png它们都是 native 方法,由 Java 虚拟机提供具体实现,这意味着不同的 Java 虚拟机对它们的实现可能会略有不同, 以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别 是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。

  1. public class CASTest {
  2. public static void main(String[] args) {
  3. Entity entity = new Entity();
  4. Unsafe unsafe = UnsafeFactory.getUnsafe();
  5. long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
  6. System.out.println(offset);
  7. boolean successful;
  8. // 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段更新值
  9. successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
  10. System.out.println(successful + "\t" + entity.x);
  11. successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
  12. System.out.println(successful + "\t" + entity.x);
  13. successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
  14. System.out.println(successful + "\t" + entity.x);
  15. }
  16. }
  17. class Entity{
  18. int x;
  19. }

CAS是在unsafe类中,代码中首先要获得偏移量,在Entity对象中有一个int类型的属性,其中对象头占8个字节,类型指针4个字节(指针压缩),所以属性x的偏移量是12,成员属性int类型4个字节,对象大小是16字节。
第一次cas和第二次cas的修改都可以成功,第三次就会失败,因为第三次x的值已经不等于3了。
对于先比较再更新,这两次读写操作,会在底层保证原子性,但是不能保证可见性,所以java对于cas进行了封装,使其可以保证可见性和有序性,所以java的cas操作和操作系统的cas原语有所不同,底层的cas返回的是原值或者最新的值,java中的cas返回的是一个布尔类型,java在cas这个原子指令前加了lock前缀指令,用来保证可见性。

2、CAS源码分析

Hotspot 虚拟机对compareAndSwapInt 方法的实现如下:
3.png核心逻辑在Atomic::cmpxchg方法中,这个根据不同操作系统和不同CPU会有不同的实现。这里我们以linux_64x的为例,查看Atomic::cmpxchg的实现。
4.png cmpxchgl的详细执行过程: 首先,输入是”r” (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表 示compare_value存入eax寄存器,而exchange_value、dest、mp的值存入任意的通用寄 存器。嵌入式汇编规定把输出和输入寄存器按统一顺序编号,顺序是从输出寄存器序列从左 到右从上到下以“%0”开始,分别记为%0、%1∙∙∙%9。也就是说,输出的eax是%0,输入 的exchange_value、compare_value、dest、mp分别是%1、%2、%3、%4。 因此,cmpxchg %1,(%3)实际上表示cmpxchg exchange_value,(dest) 需要注意的是cmpxchg有个隐含操作数eax,其实际过程是先比较eax的值(也就是 compare_value)和dest地址所存的值是否相等, 输出是”=a” (exchange_value),表示把eax中存的值写入exchange_value变量中。 Atomic::cmpxchg这个函数最终返回值是exchange_value,也就是说,如果cmpxchgl执行时compare_value和dest指针指向内存值相等则会使得dest指针指向内存值变成 exchange_value,最终eax存的compare_value赋值给了exchange_value变量,即函数最 终返回的值是原先的compare_value。此时Unsafe_CompareAndSwapInt的返回值(jint) (Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。如果cmpxchgl执行时 compare_value和(dest)不等则会把当前dest指针指向内存的值写入eax,最终输出时赋值 给exchange_value变量作为返回值,导致(jint)(Atomic::cmpxchg(x, addr, e)) == e得到 false,表明CAS失败。
现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。 不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。
我么可以通过CAS操作来保证多线程下的变量操作是线程安全的。

  1. public class CASLock {
  2. //加锁标记
  3. private volatile int state;
  4. private static final Unsafe UNSAFE;
  5. private static final long OFFSET;
  6. static {
  7. try {
  8. UNSAFE = UnsafeFactory.getUnsafe();
  9. OFFSET = UnsafeFactory.getFieldOffset(
  10. UNSAFE, CASLock.class, "state");
  11. } catch (Exception e) {
  12. throw new Error(e);
  13. }
  14. }
  15. public boolean cas() {
  16. return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1);
  17. }
  18. public int getState() {
  19. return state;
  20. }
  21. public void setState(int state) {
  22. this.state = state;
  23. }
  24. }

代码中,多个线程同时修改变量sum,需要判断state的值是否为0,如果为0,则进行cas操作把state修改为1,cas操作保证统一时间只有一个线程操作成功,其他未操作成功的线程会继续循环,等待cas修改成功后再对sum进行自增操作,state必须使用volatile。

  1. public class Test {
  2. private volatile static int sum = 0;
  3. static CASLock casLock = new CASLock();
  4. public static void main(String[] args) {
  5. for (int i = 0; i < 10; i++) {
  6. Thread thread = new Thread(() -> {
  7. for(;;){
  8. if(casLock.getState() == 0 && casLock.cas()) {
  9. try {
  10. for (int j = 0; j < 10000; j++) {
  11. sum++;
  12. }
  13. System.out.println(casLock.getState());
  14. } finally {
  15. casLock.setState(0);
  16. }
  17. break;
  18. }
  19. }
  20. });
  21. thread.start();
  22. }
  23. try {
  24. Thread.sleep(3000);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. System.out.println(sum);
  29. }
  30. }

3、CAS缺陷

1)代码中可以看出,大量线程都在自旋,如果逻辑比较复杂,会导致系统变慢,CAS 长时间地不成功,则会给 CPU 带来非常大的开销。
2)只能保证一个共享变量原子操作,这个问题在AtomicInteger等对象的自增 incrementAndGet() 方法,
只能保证一个对象的cas操作。

  1. public final int incrementAndGet() {
  2. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  3. }
  1. public final int getAndAddInt(Object var1, long var2, int var4) {
  2. int var5;
  3. do {
  4. var5 = this.getIntVolatile(var1, var2);
  5. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  6. return var5;
  7. }

这个方法底层是使用do-while循环进行比较与交换,成功后返回原值,所以最后返回的值会+1。this.getIntVolatile(var1, var2) 这个方法是一个本地方法,会根据偏移量从内存中获取值。
3)会存在 ABA 问题。

二、ABA问题

当一个变量X初始值是A,线程1获取了X的初始值,此时线程2也获取了X的初始值,线程2先把变量X的值通过CAS修改为B,然后又修改回了A,此时线程1通过CAS修改变量X的值仍然是成功的,因为线程1并不知道变量X已经被修改过了。
ABA问题的解决方案
数据库有个锁称为乐观锁,是一种基于数据版本实现数据同步的机制,每次修改一次数据,版本就会进行累加。 同样,Java也提供了相应的原子引用类AtomicStampedReference

  1. private static class Pair<T> {
  2. final T reference;
  3. final int stamp;
  4. private Pair(T reference, int stamp) {
  5. this.reference = reference;
  6. this.stamp = stamp;
  7. }
  8. static <T> Pair<T> of(T reference, int stamp) {
  9. return new Pair<T>(reference, stamp);
  10. }
  11. }
  12. private volatile Pair<V> pair;

reference即我们实际存储的变量,stamp是版本,每次修改可以通过+1保证版本唯一性。这样 就可以保证每次修改后的版本也会往上递增。

  1. public class AtomicStampedReferenceTest {
  2. public static void main(String[] args) {
  3. // 定义AtomicStampedReference Pair.reference值为1, Pair.stamp为1
  4. AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);
  5. new Thread(()->{
  6. int[] stampHolder = new int[1];
  7. int value = (int) atomicStampedReference.get(stampHolder);
  8. int stamp = stampHolder[0];
  9. log.debug("Thread1 read value: " + value + ", stamp: " + stamp);
  10. // 阻塞1s
  11. LockSupport.parkNanos(1000000000L);
  12. // Thread1通过CAS修改value值为3 stamp是版本,每次修改可以通过+1保证版本唯一性
  13. if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) {
  14. log.debug("Thread1 update from " + value + " to 3");
  15. } else {
  16. log.debug("Thread1 update fail!");
  17. }
  18. },"Thread1").start();
  19. new Thread(()->{
  20. int[] stampHolder = new int[1];
  21. int value = (int)atomicStampedReference.get(stampHolder);
  22. int stamp = stampHolder[0];
  23. log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);
  24. // Thread2通过CAS修改value值为2
  25. if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) {
  26. log.debug("Thread2 update from " + value + " to 2");
  27. // do something
  28. value = (int) atomicStampedReference.get(stampHolder);
  29. stamp = stampHolder[0];
  30. log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);
  31. // Thread2通过CAS修改value值为1
  32. if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) {
  33. log.debug("Thread2 update from " + value + " to 1");
  34. }
  35. }
  36. },"Thread2").start();
  37. }
  38. }

AtomicMarkableReference可以理解为上面AtomicStampedReference的简化版,就是不关心修改过几次,仅仅关心是否修改过。因此变量mark是boolean类型,仅记录值是否有过修改。

  1. private static class Pair<T> {
  2. final T reference;
  3. final boolean mark;
  4. private Pair(T reference, boolean mark) {
  5. this.reference = reference;
  6. this.mark = mark;
  7. }
  8. static <T> Pair<T> of(T reference, boolean mark) {
  9. return new Pair<T>(reference, mark);
  10. }
  11. }
  12. private volatile Pair<V> pair;

三、 Atomic原子操作类

1、Atomic原子操作类介绍

在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量i=1,比 如多个线程执行i++操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过 Synchronized进行控制来达到线程安全的目的。但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案。实际上,在J.U.C下的atomic包提供了一系列的操作简单, 性能高效,并能保证线程安全的类去更新基本类型变量,数组元素,引用类型以及更新对象中的字段类型。atomic包下的这些类都是采用的是乐观锁策略去原子更新数据,在java中则是使用CAS操作具体实现。
在java.util.concurrent.atomic包里提供了一组原子操作类:
基本类型:AtomicInteger、AtomicLong、AtomicBoolean;
引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicReferenceFieldUpdater 原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、 LongAccumulator、LongAdder、Striped64
5.png

2、原子更新基本类型

以AtomicInteger为例总结常用的API

  1. // 自增1,并返回旧值
  2. public final int getAndIncrement() {
  3. return unsafe.getAndAddInt(this, valueOffset, 1);
  4. }
  5. // 自增指定的值,并返回旧值
  6. public final int getAndAdd(int delta) {
  7. return unsafe.getAndAddInt(this, valueOffset, delta);
  8. }
  9. // 自增1,并返回自增后的值
  10. public final int incrementAndGet() {
  11. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  12. }
  13. // 自增指定的值,并返回自增后的值
  14. public final int addAndGet(int delta) {
  15. return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
  16. }
  17. // 设置为指定的值,并返回旧值
  18. public final int getAndSet(int newValue) {
  19. return unsafe.getAndSetInt(this, valueOffset, newValue);
  20. }

3、原子更新数组类型

  1. public class AtomicIntegerArrayTest {
  2. static int[] value = new int[]{ 1, 2, 3, 4, 5 };
  3. static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);
  4. public static void main(String[] args) throws InterruptedException {
  5. //设置索引0的元素为100
  6. atomicIntegerArray.set(0, 100);
  7. System.out.println(atomicIntegerArray.get(0));
  8. //以原子更新的方式将数组中索引为1的元素与输入值相加
  9. atomicIntegerArray.getAndAdd(1,5);
  10. // [100,7,3,4,5]
  11. System.out.println(atomicIntegerArray);
  12. }
  13. }

4、原子更新引用类型

AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性。

  1. public class AtomicReferenceTest {
  2. public static void main( String[] args ) {
  3. User user1 = new User("张三", 23);
  4. User user2 = new User("李四", 25);
  5. User user3 = new User("王五", 20);
  6. //初始化为 user1
  7. AtomicReference<User> atomicReference = new AtomicReference<>();
  8. atomicReference.set(user1);
  9. //把 user2 赋给 atomicReference
  10. atomicReference.compareAndSet(user1, user2);
  11. System.out.println(atomicReference.get());
  12. //把 user3 赋给 atomicReference
  13. atomicReference.compareAndSet(user1, user3);
  14. System.out.println(atomicReference.get());
  15. }
  16. }
  17. @Data
  18. @AllArgsConstructor
  19. class User {
  20. private String name;
  21. private Integer age;
  22. }

5、对象属性原子修改器

AtomicIntegerFieldUpdater可以线程安全地更新对象中的整型变量。

  1. public class AtomicIntegerFieldUpdaterTest {
  2. public static class Candidate {
  3. //字段必须是volatile类型
  4. volatile int score = 0;
  5. AtomicInteger score2 = new AtomicInteger();
  6. }
  7. // 传入要修改的类和要修改的属性
  8. public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
  9. AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
  10. public static AtomicInteger realScore = new AtomicInteger(0);
  11. public static void main(String[] args) throws InterruptedException {
  12. final Candidate candidate = new Candidate();
  13. Thread[] t = new Thread[10000];
  14. for (int i = 0; i < 10000; i++) {
  15. t[i] = new Thread(new Runnable() {
  16. @Override
  17. public void run() {
  18. if (Math.random() > 0.4) {
  19. // 属性修改器和atomica结果是一样的
  20. candidate.score2.incrementAndGet();
  21. scoreUpdater.incrementAndGet(candidate);
  22. realScore.incrementAndGet();
  23. }
  24. }
  25. });
  26. t[i].start();
  27. }
  28. for (int i = 0; i < 10000; i++) {
  29. t[i].join();
  30. }
  31. System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
  32. System.out.println("AtomicInteger Score=" + candidate.score2.get());
  33. System.out.println("realScore=" + realScore.get());
  34. }
  35. }

对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下:
1)字段必须是volatile类型的,在线程之间共享变量时保证立即可见.eg:volatile int value = 3。
2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父 类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
3)只能是实例变量,不能是类变量,也就是说不能加static关键字。
4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和 volatile是有冲突的,这两个关键字不能同时存在。
5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字 段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用 AtomicReferenceFieldUpdater。

四、LongAdder/DoubleAdder详解

AtomicLong是利用了底层的CAS操作来提供并发性的,比如addAndGet方法:

  1. public final long getAndAdd(long delta) {
  2. return unsafe.getAndAddLong(this, valueOffset, delta);
  3. }
  1. public final long getAndAddLong(Object var1, long var2, long var4) {
  2. long var6;
  3. do {
  4. var6 = this.getLongVolatile(var1, var2);
  5. } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
  6. return var6;
  7. }

上述方法调用了Unsafe类的getAndAddLong方法,该方法内部是个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。 在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下, N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,会占用CPU和抢占时间片,此时AtomicLong的自旋会成为瓶颈。
这就是LongAdder引入的初衷——解决高并发环境下AtomicInteger, AtomicLong的自旋瓶颈问题。

  1. static void testLongAdder(final int threadCount, final int times)
  2. throws InterruptedException {
  3. CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  4. LongAdder longAdder = new LongAdder();
  5. for (int i = 0; i < threadCount; i++) {
  6. new Thread(new Runnable() {
  7. @Override
  8. public void run() {
  9. for (int j = 0; j < times; j++) {
  10. longAdder.add(1);
  11. }
  12. countDownLatch.countDown();
  13. }
  14. }, "my-thread" + i).start();
  15. }
  16. countDownLatch.await();
  17. }

LongAdder和AtomicLong相比线程数越多,并发操作数越大,LongAdder的优势越明显。6.png

1、LongAdder原理

在LongAdder中有一个基数base,如果多个线程没有产生竞争,是一个一个操作的,那么会使用cas来操作base这个基数,都是在这个基数上面自增。如果产生竞争,那么cas会失败,这样会把每个线程使用哈希运算,分配在cells数组中的某一个下标的槽位上(cell),通常cells数组的长度最大是逻辑核的数量,初始长度是2。
假设线程1分配到cell1上,那么线程1就会对cell1进行增加操作,如果有其他线程也被分配到cell1上,那么线程1和其他线程都会操作cell1。7.png

2、LongAdder的内部结构

LongAdder内部有一个base变量,一个Cell[]数组:
base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
LongAdder的父类 Striped64 中定义了这些属性:

  1. // 获取npc核数
  2. static final int NCPU = Runtime.getRuntime().availableProcessors();
  3. /**
  4. * Table of cells. When non-null, size is a power of 2.
  5. * 定义一个volatile修饰的cell数组,长度是2的n次幂
  6. */
  7. transient volatile Cell[] cells;
  8. /**
  9. * Base value, used mainly when there is no contention, but also as
  10. * a fallback during table initialization races. Updated via CAS.
  11. * 基数:volatile修饰
  12. * 1、没有遇到并发竞争时,直接使用base累加数值
  13. * 2、初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),
  14. * 其他竞争初始化cells数组失败的线程会讲数值累加到base上
  15. */
  16. transient volatile long base;
  17. /**
  18. * Spinlock (locked via CAS) used when resizing and/or creating Cells.
  19. * cas加锁的标记,扩容或者创建cells的时候会用到
  20. */
  21. transient volatile int cellsBusy;

定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过 Unsafe来CAS操作它的值,Cell类是使用 @sun.misc.Contended 注解,解决伪共享问题

  1. @sun.misc.Contended static final class Cell {
  2. volatile long value;
  3. Cell(long x) { value = x; }
  4. final boolean cas(long cmp, long val) {
  5. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  6. }
  7. // Unsafe mechanics
  8. private static final sun.misc.Unsafe UNSAFE;
  9. private static final long valueOffset;
  10. static {
  11. try {
  12. UNSAFE = sun.misc.Unsafe.getUnsafe();
  13. Class<?> ak = Cell.class;
  14. valueOffset = UNSAFE.objectFieldOffset
  15. (ak.getDeclaredField("value"));
  16. } catch (Exception e) {
  17. throw new Error(e);
  18. }
  19. }
  20. }

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

3、LongAdder#add方法

8.png
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。 如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容 了。 这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。

  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. if ((as = cells) != null || !casBase(b = base, b + x)) {
  4. boolean uncontended = true;
  5. if (as == null || (m = as.length - 1) < 0 ||
  6. (a = as[getProbe() & m]) == null ||
  7. !(uncontended = a.cas(v = a.value, v + x)))
  8. longAccumulate(x, null, uncontended);
  9. }
  10. }

假设调用add(1),首先会判断Cells数据是否为空,如果为空,则表示没有创建,没有出现并发冲突,那么会继续执行“或”的判断逻辑, !casBase(b = base, b + x),这个方法是cas原子操作base,cmp是预期值,val是要修改的值,就是把base的值修改为b + x,修改返回true,则不会进入if逻辑,如果此时有线程并发的修改base,那么cas操作不会成功,会进入if逻辑。

  1. final boolean casBase(long cmp, long val) {
  2. return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
  3. }

if逻辑中还有一个if判断,
as == null || // cells没有被创建
(m = as.length - 1) < 0 || // cells没有初始化
(a = as[getProbe() & m]) == null || // 根据当前线程转int类型,和数组的长度取模得出的下标位置的槽位为空
!(uncontended = a.cas(v = a.value, v + x)) // 取出来的Cell a,调用cas方法修改a.value 为 v + x,没有成功

  1. static final int getProbe() {
  2. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  3. }

满足以上其中一个条件就会调用 longAccumulate(x, null, uncontended) 方法,这个方法是在父类 Striped64 中实现的。
9.png
代码中使用了for死循环,使用cas操作要保证失败之后再次执行,直到成功执行。

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. int h;
  4. if ((h = getProbe()) == 0) {
  5. ThreadLocalRandom.current();
  6. h = getProbe();
  7. wasUncontended = true;
  8. }
  9. boolean collide = false;
  10. for (;;) {
  11. Cell[] as; Cell a; int n; long v;
  12. // 判断cells是否已经创建并长度大于0
  13. if ((as = cells) != null && (n = as.length) > 0) {
  14. // 如果cells数据已经创建好并初始化了,取到当前线程和数组长度取模的下标,判断该
  15. // 下标的位置是否为空
  16. if ((a = as[(n - 1) & h]) == null) {
  17. if (cellsBusy == 0) {
  18. // 把cell创建出来,给cell的value属性赋值x,x就是传进来要累加的值
  19. Cell r = new Cell(x);
  20. // 如果锁标记为0,如果为0,cas把标记改为1
  21. if (cellsBusy == 0 && casCellsBusy()) {
  22. // 如果当前线程加锁成功
  23. boolean created = false;
  24. try {
  25. Cell[] rs; int m, j;
  26. if ((rs = cells) != null &&
  27. (m = rs.length) > 0 &&
  28. rs[j = (m - 1) & h] == null) {
  29. // j是cells数组下标,r是新创建的cell
  30. rs[j] = r;
  31. created = true;
  32. }
  33. } finally {
  34. // 把锁标记置为0
  35. cellsBusy = 0;
  36. }
  37. if (created)
  38. break;
  39. continue;
  40. }
  41. }
  42. collide = false;
  43. }
  44. else if (!wasUncontended)
  45. wasUncontended = true;
  46. // 如果当前线程分配的cells数组下标位置不为空,直接操作下标位置的cell,进行
  47. // cas操作value,如果更新成功则结束
  48. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  49. fn.applyAsLong(v, x))))
  50. break;
  51. else if (n >= NCPU || cells != as)
  52. collide = false;
  53. else if (!collide)
  54. collide = true;
  55. // 如果cell没有cas更新成功,则扩容,判断锁标记是否为0,且修改锁标记为1成功
  56. else if (cellsBusy == 0 && casCellsBusy()) {
  57. try {
  58. if (cells == as) {
  59. // 新创建一个cells数组,n是原来cells数组的长度,进行位移
  60. Cell[] rs = new Cell[n << 1];
  61. // 把旧数组的内容赋值到新数组中
  62. for (int i = 0; i < n; ++i)
  63. rs[i] = as[i];
  64. // 新数组赋值到成员属性
  65. cells = rs;
  66. }
  67. } finally {
  68. // 锁标记置为0
  69. cellsBusy = 0;
  70. }
  71. collide = false;
  72. continue;
  73. }
  74. h = advanceProbe(h);
  75. }
  76. // 如果cells没有创建,且锁标记等于0,且cas修改锁标记为1成功
  77. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  78. boolean init = false;
  79. try {
  80. // 再次判断cells数组是否为空
  81. if (cells == as) {
  82. // 创建cells数组,长度为2
  83. Cell[] rs = new Cell[2];
  84. // 创建一个cell,把x赋值给cell的value属性
  85. rs[h & 1] = new Cell(x);
  86. // 创建好的cells数组赋值给成员变量
  87. cells = rs;
  88. init = true;
  89. }
  90. } finally {
  91. // 锁标记位置为0
  92. cellsBusy = 0;
  93. }
  94. if (init)
  95. break;
  96. }
  97. else if (casBase(v = base, ((fn == null) ? v + x :
  98. fn.applyAsLong(v, x))))
  99. break;
  100. }
  101. }

4、LongAdder#sum方法

sum方法不是一个线程安全的方法,只能取到当前时刻的值,因为存在扩容和线程执行中的情况导致最终结果会时刻变化。
返回累加的和,也就是”当前时刻”的计数值,注意: 高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值,此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值。

  1. public long sum() {
  2. Cell[] as = cells; Cell a;
  3. long sum = base;
  4. if (as != null) {
  5. // 遍历cells中的每个cell的value和base相加的总和
  6. for (int i = 0; i < as.length; ++i) {
  7. if ((a = as[i]) != null)
  8. sum += a.value;
  9. }
  10. }
  11. return sum;
  12. }

5、 LongAccumulator

LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而 LongAccumulator提供了自定义的函数操作。其构造函数如下:

  1. public LongAccumulator(LongBinaryOperator accumulatorFunction,
  2. long identity) {
  3. this.function = accumulatorFunction;
  4. base = this.identity = identity;
  5. }

通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator 接收2个long作为参数,并返回1个long)。LongAccumulator内部原理和LongAdder几乎完全 一样,都是利用了父类Striped64的longAccumulate方法。

  1. public class LongAccumulatorTest {
  2. public static void main(String[] args) throws InterruptedException {
  3. // 累加 x+y
  4. LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
  5. ExecutorService executor = Executors.newFixedThreadPool(8);
  6. // 1到9累加
  7. IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
  8. Thread.sleep(2000);
  9. System.out.println(accumulator.getThenReset());
  10. }
  11. }