Java LongAdder
阿里《Java开发手册》最新嵩山版在 8.3 日发布,其中有一段内容如下:

【参考】volatile 解决多线程内存不可见问题。对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。 说明:如果是 count++ 操作,使用如下类实现:AtomicInteger count = new AtomicInteger(); count.addAndGet(1); 如果是 JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观 锁的重试次数)。

以上内容共有两个重点:

  1. 类似于 count++ 这种非一写多读的场景不能使用 volatile
  2. 如果是 JDK8 推荐使用 LongAdder 而非 AtomicLong 来替代 volatile,因为 LongAdder 的性能更好。

    为什么需要AtomicInteger原子操作类?

    对于Java中的运算操作,例如自增或自减,若没有进行额外的同步操作,在多线程环境下就是线程不安全的。num++解析为num=num+1,明显,这个操作不具备原子性,多线程并发共享这个变量时必然会出现问题。测试代码如下:

    1. public class AtomicIntegerTest {
    2. private static final int THREADS_CONUT = 20;
    3. public static int count = 0;
    4. public static void increase() {
    5. count++;
    6. }
    7. public static void main(String[] args) {
    8. Thread[] threads = new Thread[THREADS_CONUT];
    9. for (int i = 0; i < THREADS_CONUT; i++) {
    10. threads[i] = new Thread(new Runnable() {
    11. @Override
    12. public void run() {
    13. for (int i = 0; i < 1000; i++) {
    14. increase();
    15. }
    16. }
    17. });
    18. threads[i].start();
    19. }
    20. while (Thread.activeCount() > 1) {
    21. Thread.yield();
    22. }
    23. System.out.println(count);
    24. }
    25. }

    这里运行了20个线程,每个线程对count变量进行1000此自增操作,如果上面这段代码能够正常并发的话,最后的结果应该是20000才对,但实际结果却发现每次运行的结果都不相同,都是一个小于20000的数字。这是为什么呢?

    volatile 线程安全测试

    volatile关键字很重要的两个特性:
    1、保证变量在线程间可见,对volatile变量所有的写操作都能立即反应到其他线程中,换句话说,volatile变量在各个线程中是一致的(得益于java内存模型—“先行发生原则”);
    2、禁止指令的重排序优化;
    首先来测试 volatile 在多写环境下的线程安全情况,测试代码如下:

    1. public class VolatileExample {
    2. public static volatile int count = 0; // 计数器
    3. public static final int size = 100000; // 循环测试次数
    4. public static void main(String[] args) {
    5. // ++ 方式 10w 次
    6. Thread thread = new Thread(() -> {
    7. for (int i = 1; i <= size; i++) {
    8. count++;
    9. }
    10. });
    11. thread.start();
    12. // -- 10w 次
    13. for (int i = 1; i <= size; i++) {
    14. count--;
    15. }
    16. // 等所有线程执行完成
    17. while (thread.isAlive()) {}
    18. System.out.println(count); // 打印结果
    19. }
    20. }

    volatile 修饰的 count 变量 ++ 10w 次,在启动另一个线程 — 10w 次,正常来说结果应该是 0,但是执行的结果却为:

    1063

:::tips 结论:由以上结果可以看出 volatile 在多写环境下是非线程安全的,测试结果和《Java开发手册》相吻合。 :::

AtomicInteger类实现多线程自增操作

把上面的代码改造成AtomicInteger原子类型,先看看效果

  1. import java.util.concurrent.atomic.AtomicInteger;
  2. public class AtomicIntegerTest {
  3. private static final int THREADS_CONUT = 20;
  4. public static AtomicInteger count = new AtomicInteger(0);
  5. public static void increase() {
  6. count.incrementAndGet();
  7. }
  8. public static void main(String[] args) {
  9. Thread[] threads = new Thread[THREADS_CONUT];
  10. for (int i = 0; i < THREADS_CONUT; i++) {
  11. threads[i] = new Thread(new Runnable() {
  12. @Override
  13. public void run() {
  14. for (int i = 0; i < 1000; i++) {
  15. increase();
  16. }
  17. }
  18. });
  19. threads[i].start();
  20. }
  21. while (Thread.activeCount() > 1) {
  22. Thread.yield();
  23. }
  24. System.out.println(count);
  25. }
  26. }

结果每次都输出20000,程序输出了正确的结果,这都归功于AtomicInteger.incrementAndGet()方法的原子性。

非阻塞同步

同步:多线程并发访问共享数据时,保证共享数据再同一时刻只被一个或一些线程使用。
阻塞同步和非阻塞同步都是实现线程安全的两个保障手段,非阻塞同步对于阻塞同步而言主要解决了阻塞同步中线程阻塞和唤醒带来的性能问题,那什么叫做非阻塞同步呢?
在并发环境下,某个线程对共享变量先进行操作,如果没有其他线程争用共享数据那操作就成功;如果存在数据的争用冲突,那就才去补偿措施,比如不断的重试机制,直到成功为止,因为这种乐观的并发策略不需要把线程挂起,也就把这种同步操作称为非阻塞同步(操作和冲突检测具备原子性)。
在硬件指令集的发展驱动下,使得 “操作和冲突检测” 这种看起来需要多次操作的行为只需要一条处理器指令便可以完成,这些指令中就包括非常著名的CAS指令(Compare-And-Swap比较并交换)。《深入理解Java虚拟机第二版.周志明》第十三章中这样描述关于CAS机制:

CAS指合需要有3个操作数,分别是内存位置(在Java中可以简单理解为变量的内存地址,用V表示)、旧的预期值(用A表示)和新值(用B表示)。CAs指合执行时,当且仅当V符合旧预期值A时,处理器用新值B更新v的值,否则它就不执行更新﹐但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作。 在JDK 1.5之后,Java程序中才可以使用CAS操作,该操作由sun.misc.Unsafe类里面的compareAndSwapInt()compareAndSwapLong()等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令,没有方法调用的过程,或者可以认为是无条件内联进去了。 由于Unsafe类不是提供给用户程序调用的类(Unsafe.getUnsafe()的代码中限制了只有启动类加载器(Bootstrap ClassLoader)加载的Class才能访问它),因此,如果不采用反射手段﹐只能通过其他的Java API来间接使用它,如J.U.C包里面的整数原子类,其中的compareAndSet()getAndIncrement()等方法都使用了Unsafe类的CAS操作。

所以再返回来看AtomicInteger.incrementAndGet()方法,它的实现也比较简单

  1. /**
  2. * Atomically increments by one the current value.
  3. *
  4. * @return the updated value
  5. */
  6. public final int incrementAndGet() {
  7. for (;;) {
  8. int current = get();
  9. int next = current + 1;
  10. if (compareAndSet(current, next))
  11. return next;
  12. }
  13. }

incrementAndGet()方法在一个无限循环体内,不断尝试将一个比当前值大1的新值赋给自己,如果失败则说明在执行”获取-设置”操作的时已经被其它线程修改过了,于是便再次进入循环下一次操作,直到成功为止。这个便是AtomicInteger原子性的”诀窍”了,继续进源码看它的compareAndSet方法:

  1. /**
  2. * Atomically sets the value to the given updated value
  3. * if the current value {@code ==} the expected value.
  4. *
  5. * @param expect the expected value
  6. * @param update the new value
  7. * @return true if successful. False return indicates that
  8. * the actual value was not equal to the expected value.
  9. */
  10. public final boolean compareAndSet(int expect, int update) {
  11. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  12. }

可以看到,compareAndSet()调用的就是Unsafe.compareAndSwapInt()方法,即Unsafe类的CAS操作。


注意:

  1. 子类相比于普通的锁,粒度更细、效率更高(除了高度竞争的情况下)
  2. 果对于上面的示例代码中使用了thread.yield()之类的方法不清晰的,可以直接看下面的代码压测:

    1. public class AtomicIntegerTest implements Runnable {
    2. static AtomicInteger atomicInteger = new AtomicInteger(0);
    3. static int commonInteger = 0;
    4. public void addAtomicInteger() {
    5. atomicInteger.getAndIncrement();
    6. }
    7. public void addCommonInteger() {
    8. commonInteger++;
    9. }
    10. @Override
    11. public void run() {
    12. //可以调大10000看效果更明显
    13. for (int i = 0; i < 10000; i++) {
    14. addAtomicInteger();
    15. addCommonInteger();
    16. }
    17. }
    18. public static void main(String[] args) throws InterruptedException {
    19. AtomicIntegerTest atomicIntegerTest = new AtomicIntegerTest();
    20. Thread thread1 = new Thread(atomicIntegerTest);
    21. Thread thread2 = new Thread(atomicIntegerTest);
    22. thread1.start();
    23. thread2.start();
    24. //join()方法是为了让main主线程等待thread1、thread2两个子线程执行完毕
    25. thread1.join();
    26. thread2.join();
    27. System.out.println("AtomicInteger add result = " + atomicInteger.get());
    28. System.out.println("CommonInteger add result = " + commonInteger);
    29. }
    30. }
  3. 子类一览图参考如下: | Atomic | 基本类型原子类 | AtomicInteger
    AtomicLong
    AtomicBoolean | | —- | —- | —- | | Atomic
    Array | 数组类型原子类 | AtomicIntegerArray
    AtomicLongArray
    AtomicReferenceArray | | AtomicReference | 应用类型原子类 | AtomicReference
    AtomicStampedReference
    AtomicMarkAbleReference | | Atomic
    FieldUpdater | 升级类型原子类 | AtomicIntegerFieldUpdater
    AtomicLongFieldUpdater
    AtomicReferenceFieldUpdater | | Adder | 累加器 | LongAdder
    DoubleAdder | |
    Accumulator | 累加器 | LongAccumulator
    DoubleAccumulator |

  4. 何把普通变量升级为原子变量?主要是tomicIntegerFieldUpdater<T>,参考如下代码:

    1. /**
    2. * @description 将普通变量升级为原子变量
    3. **/
    4. public class AtomicIntegerFieldUpdaterTest implements Runnable {
    5. static Goods phone;
    6. static Goods computer;
    7. AtomicIntegerFieldUpdater<Goods> atomicIntegerFieldUpdater =
    8. AtomicIntegerFieldUpdater.newUpdater(Goods.class, "price");
    9. @Override
    10. public void run() {
    11. for (int i = 0; i < 10000; i++) {
    12. phone.price++;
    13. atomicIntegerFieldUpdater.getAndIncrement(computer);
    14. }
    15. }
    16. static class Goods {
    17. //商品定价
    18. volatile int price;
    19. }
    20. public static void main(String[] args) throws InterruptedException {
    21. phone = new Goods();
    22. computer = new Goods();
    23. AtomicIntegerFieldUpdaterTest atomicIntegerFieldUpdaterTest = new AtomicIntegerFieldUpdaterTest();
    24. Thread thread1 = new Thread(atomicIntegerFieldUpdaterTest);
    25. Thread thread2 = new Thread(atomicIntegerFieldUpdaterTest);
    26. thread1.start();
    27. thread2.start();
    28. //join()方法是为了让main主线程等待thread1、thread2两个子线程执行完毕
    29. thread1.join();
    30. thread2.join();
    31. System.out.println("CommonInteger price = " + phone.price);
    32. System.out.println("AtomicInteger price = " + computer.price);
    33. }
    34. }

    LongAdder VS AtomicLong

    接下来,使用 Oracle 官方的 JMH(Java Microbenchmark Harness, JAVA 微基准测试套件)来测试一下两者的性能,测试代码如下:

    1. import org.openjdk.jmh.annotations.*;
    2. import org.openjdk.jmh.infra.Blackhole;
    3. import org.openjdk.jmh.runner.Runner;
    4. import org.openjdk.jmh.runner.RunnerException;
    5. import org.openjdk.jmh.runner.options.Options;
    6. import org.openjdk.jmh.runner.options.OptionsBuilder;
    7. import java.util.concurrent.TimeUnit;
    8. import java.util.concurrent.atomic.AtomicInteger;
    9. import java.util.concurrent.atomic.LongAdder;
    10. @BenchmarkMode(Mode.AverageTime) // 测试完成时间
    11. @OutputTimeUnit(TimeUnit.NANOSECONDS)
    12. @Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) // 预热 1 轮,每次 1s
    13. @Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) // 测试 5 轮,每次 3s
    14. @Fork(1) // fork 1 个线程
    15. @State(Scope.Benchmark)
    16. @Threads(1000) // 开启 1000 个并发线程
    17. public class AlibabaAtomicTest {
    18. public static void main(String[] args) throws RunnerException {
    19. // 启动基准测试
    20. Options opt = new OptionsBuilder()
    21. .include(AlibabaAtomicTest.class.getSimpleName()) // 要导入的测试类
    22. .build();
    23. new Runner(opt).run(); // 执行测试
    24. }
    25. @Benchmark
    26. public int atomicTest(Blackhole blackhole) throws InterruptedException {
    27. AtomicInteger atomicInteger = new AtomicInteger();
    28. for (int i = 0; i < 1024; i++) {
    29. atomicInteger.addAndGet(1);
    30. }
    31. // 为了避免 JIT 忽略未被使用的结果
    32. return atomicInteger.intValue();
    33. }
    34. @Benchmark
    35. public int longAdderTest(Blackhole blackhole) throws InterruptedException {
    36. LongAdder longAdder = new LongAdder();
    37. for (int i = 0; i < 1024; i++) {
    38. longAdder.add(1);
    39. }
    40. return longAdder.intValue();
    41. }
    42. }

    程序执行的结果为:
    LongAdder的使用 - 图1
    从上述的数据可以看出,在开启了 1000 个线程之后,程序的 LongAdder 的性能比 AtomicInteger 快了约 1.53 倍,开了 1000 个线程,为什么要开这么多呢?这其实是为了模拟高并发高竞争的环境下二者的性能查询。
    如果在低竞争下,比如开启 100 个线程,测试的结果如下:
    LongAdder的使用 - 图2
    在高度并发竞争情形下,AtomicLong每次进行add都需要flushrefresh(这一块涉及到java内存模型中的工作内存和主内存的,所有变量操作只能在工作内存中进行,然后写回主内存,其它线程再次读取新值),每次add()都需要同步,在高并发时会有比较多冲突,比较耗时导致效率低;而LongAdder中每个线程会维护自己的一个计数器,在最后执行LongAdder.sum()方法时候才需要同步,把所有计数器全部加起来,不需要flushrefresh操作。 :::success 结论:从上面结果可以看出,在低竞争的并发环境下 AtomicInteger 的性能是要比 LongAdder 的性能好,而高竞争环境下 LongAdder 的性能比 AtomicInteger,当有 1000 个线程运行时,LongAdder 的性能比 AtomicInteger 快了约 1.53 倍,所以要根据自己业务情况选择合适的类型来使用。 :::

    性能分析

    为什么会出现上面的情况?这是因为 AtomicInteger 在高并发环境下会有多个线程去竞争一个原子变量,而始终只有一个线程能竞争成功,而其他线程会一直通过 CAS 自旋尝试获取此原子变量,因此会有一定的性能消耗;而 LongAdder 会将这个原子变量分离成一个 Cell 数组,每个线程通过 Hash 获取到自己数组,这样就减少了乐观锁的重试次数,从而在高竞争下获得优势;而在低竞争下表现的又不是很好,可能是因为自己本身机制的执行时间大于了锁竞争的自旋时间,因此在低竞争下表现性能不如 AtomicInteger

    总结

    测试了 volatile 在多写情况下是非线程安全的,而在低竞争的并发环境下 AtomicInteger 的性能是要比 LongAdder 的性能好,而高竞争环境下 LongAdder 的性能比 AtomicInteger,因此在使用时要结合自身的业务情况来选择相应的类型。