4.1 问题提出

4.1.1 提取款问题

有如下需求,保证account、withdraw取款方法的线程安全。

  1. interface Account {
  2. // 获取余额
  3. Integer geBalance();
  4. // 取款
  5. void withdraw(Integer amount);
  6. /**
  7. * 方法内会启动1000个线程,每个线程做-10元操作
  8. * 如果初始余额为10000,那么正确的结果应该是0
  9. * @param account 账户
  10. */
  11. static void demo(Account account) {
  12. List<Thread> ts = new ArrayList<>();
  13. for (int i = 0; i < 1000; i++) {
  14. ts.add(new Thread(() -> {
  15. account.withdraw(10);
  16. }));
  17. }
  18. long start = System.nanoTime();
  19. ts.forEach(Thread::start);
  20. ts.forEach(t -> {
  21. try {
  22. t.join();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. });
  27. ts.forEach(t -> {
  28. try {
  29. t.join();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. });
  34. long end = System.nanoTime();
  35. System.out.printf("final balance: %d, spend time(ms): %d] ", account.geBalance(), (end - start) / 1000_000);
  36. }
  37. }

显然以上做法无法保证线程安全。

4.1.2 锁解决方案

保护共享变量,给临界区加锁。

  1. class AccountSafe implements Account {
  2. // 余额
  3. private Integer balance;
  4. public AccountSafe(Integer balance) {
  5. this.balance = balance;
  6. }
  7. @Override
  8. public Integer geBalance() {
  9. return this.balance;
  10. }
  11. @Override
  12. public void withdraw(Integer amount) {
  13. synchronized (this) {
  14. this.balance -= amount;
  15. }
  16. }
  17. }

4.1.2 无锁解决方案

使用原子整数进行CAS操作:

  1. class AccountCAS implements Account {
  2. private AtomicInteger balance;
  3. public AccountCAS(Integer balance) {
  4. this.balance = new AtomicInteger(balance);
  5. }
  6. @Override
  7. public Integer geBalance() {
  8. return balance.get();
  9. }
  10. @Override
  11. public void withdraw(Integer amount) {
  12. while (true) {
  13. // 获取余额的最新值
  14. int prev = this.balance.get();
  15. // 修改后的余额
  16. int next = prev - amount;
  17. // 同步到主存
  18. // CAS(预期值,修改值) => boolean(是否修改成功)
  19. if (this.balance.compareAndSet(prev, next)) {
  20. break;
  21. }
  22. }
  23. }
  24. }

4.1.3 测试

编写以下代码进行测试:

  1. public class AccountDemo {
  2. public static void main(String[] args) {
  3. System.out.print("[Unsafe => ");
  4. Account a1 = new AccountUnsafe(10000);
  5. Account.demo(a1);
  6. System.out.print("[synchronized => ");
  7. Account a2 = new AccountSafe(10000);
  8. Account.demo(a2);
  9. System.out.print("[compareAndSet => ");
  10. Account a3 = new AccountCAS(10000);
  11. Account.demo(a3);
  12. System.out.println();
  13. }
  14. }

测试脚本:

  1. for ($i=0; $i -le 10; $i++) { java top.parak.none.AccountDemo }

运行结果:

image-20210426225654053.png

4.1.5 compareAndSet

compareAndSet,简称CAS(也有Compare And Swap的说法),它必须是原子操作。

  1. while (true) {
  2. // 获取余额的最新值
  3. int prev = this.balance.get();
  4. // 修改后的余额
  5. int next = prev - amount;
  6. // 同步到主存
  7. // CAS(预期值,修改值) => boolean(是否修改成功)
  8. if (this.balance.compareAndSet(prev, next)) {
  9. break;
  10. }
  11. }

image-20210426230136937.png

4.2 CAS与volatile

4.2.1 volatile

获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。

它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是操作主存。即一个线程对volatile变量的修改,对另一个线程可见。

CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果。

4.2.2 为什么CAS+重试效率高?

  • 在无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
  • 但是,在无锁情况下,因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然也不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

4.2.3 CAS应用场景

结合CAS和volatile可以实现无锁并发,适用于线程数少、多核CPU的场景下:

  • CAS是基于乐观锁的思想:非常乐观,假设没有别的线程来修改共享变量,如果其他线程修改了当前线程就再次重试。
  • synchronized是基于悲观锁的思想:非常悲观,提防其他线程来修改共享变量,当前线程获取资源就立马上锁,其他争抢资源失败的线程进入阻塞状态,修改结束才开锁,
  • CAS体现的是无锁并发、无阻塞并发
    • 因为没有使用无锁并发、无阻塞并发,所以线程不会陷入阻塞。
    • 但是如果竞争激烈,重试必然 频繁发生,反而效率会收到影响。

4.2.4 CAS特点

优点:

  • 可以保证变量操作的原子性
  • 并发量低时,CAS效率高于synchronized
  • 在线程对共享资源占用时间较短的情况下,使用CAS机制效率也会较高

缺点:

  • 无法解决ABA问题
  • 可能会消耗较高的CPU
  • 不能保证代码块的原子性

4.3 原子整数

JUC提供如下原子整数类:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

AtomicInteger常用API如下:

image.png

4.4 原子引用

JUC提供如下原子引用类:

  • AtomicReference:普通原子引用类型,对对象进行原子操作
  • AtomicStampedReference:带int类型版本戳的原子引用类型,记录更改次数
  • AtomicMarkableReference:带boolean类型版本戳的原子引用类型,记录是否更改

作用:保证引用类型的共享变量是线程安全的。

使用BigDemical实现提取款问题的线程安全解决方案:

  1. class DecimalAccountCAS implements DecimalAccount {
  2. private AtomicReference<BigDecimal> balance;
  3. public DecimalAccountCAS(BigDecimal balance) {
  4. this.balance = new AtomicReference<>(balance);
  5. }
  6. @Override
  7. public BigDecimal getBalance() {
  8. return balance.get();
  9. }
  10. @Override
  11. public void withdraw(BigDecimal amount) {
  12. while (true) {
  13. BigDecimal prev = balance.get();
  14. BigDecimal next = prev.subtract(amount);
  15. if (balance.compareAndSet(prev, next)) {
  16. break;
  17. }
  18. }
  19. }
  20. }
  21. interface DecimalAccount {
  22. BigDecimal getBalance();
  23. void withdraw(BigDecimal amount);
  24. }

4.4.1 ABA问题

如下程序所示,虽然在other方法中存在两个线程对共享变量进行了修改,但是经过了两轮修改又变成了原值,main线程对修改共享变量的过程是不可见的,这种操作对业务代码并无影响。

  1. @Slf4j(topic = "ABAAtomicReference")
  2. public class ABAAtomicReferenceDemo {
  3. private static AtomicReference<String> ref = new AtomicReference<>("A");
  4. public static void main(String[] args) throws InterruptedException {
  5. log.debug("main start...");
  6. String prev = ref.get();
  7. other();
  8. TimeUnit.SECONDS.sleep(1);
  9. log.debug("change A -> K ? {}", ref.compareAndSet(prev, "K"));
  10. }
  11. private static void other() {
  12. new Thread(() -> log.debug("change A -> B ? {}", ref.compareAndSet(ref.get(), "B")), "B").start();
  13. new Thread(() -> log.debug("change B -> A ? {}", ref.compareAndSet(ref.get(), "A")), "A").start();
  14. }
  15. }

运行结果:

  1. 2021-04-27 17:29:12.538 [main] DEBUG ABAAtomicReference - main start...
  2. 2021-04-27 17:29:12.575 [A] DEBUG ABAAtomicReference - change B -> A ? true
  3. 2021-04-27 17:29:12.575 [B] DEBUG ABAAtomicReference - change A -> B ? true
  4. 2021-04-27 17:29:13.580 [main] DEBUG ABAAtomicReference - change A -> K ? true

虽然ABA对业务没有影响,但是如何让主线程感知到其他线程的修改呢?

4.4.2 AtomicStampedReference

解决ABA问题:

  1. @Slf4j(topic = "ABAAtomicStampedReference")
  2. public class ABAAtomicStampedReferenceDemo {
  3. private static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
  4. public static void main(String[] args) throws InterruptedException {
  5. log.debug("main start...");
  6. int stamp = ref.getStamp();
  7. other();
  8. TimeUnit.SECONDS.sleep(1);
  9. boolean res = ref.compareAndSet(ref.getReference(), "K", stamp, stamp + 1);
  10. log.debug("change A -> K ? {}", res);
  11. }
  12. private static void other() {
  13. new Thread(() -> {
  14. int stamp = ref.getStamp();
  15. boolean res = ref.compareAndSet(ref.getReference(), "B", stamp, stamp + 1);
  16. log.debug("change A -> B ? {}", res);
  17. }).start();
  18. new Thread(() -> {
  19. int stamp = ref.getStamp();
  20. boolean res = ref.compareAndSet(ref.getReference(), "A", stamp, stamp + 1);
  21. log.debug("change B -> A ? {}", res);
  22. }).start();
  23. }
  24. }

运行结果:

  1. 2021-04-27 18:18:00.754 [main] DEBUG ABAAtomicStampedReference - main start...
  2. 2021-04-27 18:18:00.787 [A] DEBUG ABAAtomicStampedReference - change B -> A ? true
  3. 2021-04-27 18:18:00.787 [B] DEBUG ABAAtomicStampedReference - change A -> B ? true
  4. 2021-04-27 18:18:01.792 [main] DEBUG ABAAtomicStampedReference - change A -> K ? false

4.4.3 AtomicMarkableReference

不关心引用变量更改了几次,只是单纯的关心是否更改过。

案例:

家里有清洁机器人和保洁阿姨,垃圾袋满时,需要更换,机器人换了阿姨则不需要换,反之亦然。

  1. @Slf4j(topic = "ABAAtomicMarkableReference")
  2. public class ABAAtomicMarkableReferenceDemo {
  3. private static GarbageBag bag = new GarbageBag("装满垃圾");
  4. private static AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
  5. public static void main(String[] args) throws InterruptedException {
  6. log.debug("家里需要换垃圾袋...");
  7. GarbageBag prev = ref.getReference();
  8. robot();
  9. TimeUnit.MILLISECONDS.sleep(10);
  10. aunt();
  11. TimeUnit.MILLISECONDS.sleep(10);
  12. log.debug(ref.getReference().toString());
  13. }
  14. // 清洁机器人
  15. private static void robot() {
  16. new Thread(() -> {
  17. log.debug("清洁机器人开始打扫卫生...");
  18. boolean res = ref.compareAndSet(ref.getReference(), new GarbageBag("新垃圾袋"),
  19. true, false);
  20. log.debug("机器人是否换了垃圾袋 ? {}", res);
  21. }, "robot").start();
  22. }
  23. // 保洁阿姨
  24. private static void aunt() {
  25. new Thread(() -> {
  26. log.debug("保洁阿姨开始打扫卫生...");
  27. bag.setDesc("空垃圾袋");
  28. boolean res = ref.compareAndSet(ref.getReference(), new GarbageBag("新垃圾袋"),
  29. true, false);
  30. log.debug("阿姨是否换了垃圾袋 ? {}", res);
  31. }, "aunt").start();
  32. }
  33. }
  34. class GarbageBag {
  35. String desc;
  36. public GarbageBag(String desc) { this.desc = desc; }
  37. public void setDesc(String desc) { this.desc = desc; }
  38. @Override
  39. public String toString() { return "GarbageBag[desc='" + desc + "']"; }
  40. }

运行结果:

  1. 2021-04-27 20:01:20.764 [main] DEBUG ABAAtomicMarkableReference - 需要换垃圾袋...
  2. 2021-04-27 20:01:20.796 [robot] DEBUG ABAAtomicMarkableReference - 清洁机器人开始打扫卫生...
  3. 2021-04-27 20:01:20.796 [robot] DEBUG ABAAtomicMarkableReference - 机器人是否换了垃圾袋 ? true
  4. 2021-04-27 20:01:20.809 [aunt] DEBUG ABAAtomicMarkableReference - 保洁阿姨开始打扫卫生...
  5. 2021-04-27 20:01:20.809 [aunt] DEBUG ABAAtomicMarkableReference - 阿姨是否换了垃圾袋 ? false
  6. 2021-04-27 20:01:20.823 [main] DEBUG ABAAtomicMarkableReference - GarbageBag[desc='新垃圾袋']

4.5 原子数组

JUC提供如下原子数组类:

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

作用:保证数组内的元素的线程安全。

示例:

  1. @Slf4j(topic = "AtomicArray")
  2. public class AtomicArrayDemo {
  3. public static void main(String[] args) {
  4. demo(
  5. () -> new int[10],
  6. array -> array.length,
  7. (array, index) -> array[index]++,
  8. array -> log.debug("普通数组:{}", Arrays.toString(array))
  9. );
  10. demo(
  11. () -> new AtomicIntegerArray(10),
  12. AtomicIntegerArray::length,
  13. AtomicIntegerArray::getAndIncrement,
  14. array -> log.debug("安全数组:{}", array)
  15. );
  16. }
  17. /**
  18. * @param arraySupplier 提供数组
  19. * @param lengthFunction 获取数组长度的方法
  20. * @param putConsumer 指定元素的自增方法
  21. * @param printConsumer 打印数组元素的方法
  22. * @apiNote
  23. * <p> Supplier 提供者 无中生有 () -> 结果 </p>
  24. * <p> Function 函数 一个参数一个结果 (参数) -> 结果 | BiFunction (参数1,参数2) -> 结果 </p>
  25. * <p> Consumer 消费者 一个参数没有结果 (参数) -> Void | BiConsumer (参数1,参数2) -> Void </p>
  26. */
  27. private static <T> void demo(Supplier<T> arraySupplier, Function<T, Integer> lengthFunction,
  28. BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer) {
  29. List<Thread> list = new ArrayList<>();
  30. T array = arraySupplier.get();
  31. int length = lengthFunction.apply(array);
  32. for (int i = 0; i < length; i++) {
  33. list.add(new Thread(() -> {
  34. for (int j = 0; j < 10000; j++) { // 正确结果应该是数组元素都为10000
  35. putConsumer.accept(array, j % length);
  36. }
  37. }));
  38. }
  39. list.forEach(Thread::start);
  40. list.forEach(t -> {
  41. try {
  42. t.join();
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. });
  47. printConsumer.accept(array);
  48. }
  49. }

运行结果:

  1. 2021-04-27 21:09:01.160 [main] DEBUG AtomicArray - 普通数组:[6531, 6533, 6501, 6566, 6515, 6508, 6499, 6519, 6489, 6527]
  2. 2021-04-27 21:09:01.166 [main] DEBUG AtomicArray - 安全数组:[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

4.6 字段更新器

JUC提供如下字段更新器:

  • AtomicReferenceFeildUpdater:引用类型的属性
  • AtomicIntegerFieldUpdater:整形的属性
  • AtomicLongFeildUpdater:长整形的属性

注意:利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合volatile修饰的字段使用,否则会出现异常。

  1. Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type

示例:

  1. @Slf4j(topic = "AtomicFieldUpdater")
  2. public class AtomicFieldUpdaterDemo {
  3. public static void main(String[] args) {
  4. Student stu = new Student();
  5. AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
  6. log.debug("update ? {}", updater.compareAndSet(stu, null, "RubbishK"));
  7. log.debug("update ? {}", updater.compareAndSet(stu, stu.getName(), "FlowerK"));
  8. log.debug(stu.toString());
  9. }
  10. }
  11. class Student {
  12. volatile String name;
  13. public String getName() { return name; }
  14. @Override
  15. public String toString() { return "Student[name='" + name + "']"; }
  16. }

运行结果:

  1. 2021-04-27 21:36:51.784 [main] DEBUG AtomicFieldUpdater - update ? true
  2. 2021-04-27 21:36:51.786 [main] DEBUG AtomicFieldUpdater - update ? true
  3. 2021-04-27 21:36:51.786 [main] DEBUG AtomicFieldUpdater - Student[name='FlowerK']

4.7 原子累加器

JUC提供如下原子累加器:

  • LongAddr
  • LongAccumulator
  • DouleAddr
  • DoubleAccumulator

4.7.1 累加性能比较

累加性能比较AtomicLongLongAddr

  1. @Slf4j(topic = "Compare")
  2. public class PerformanceCompareDemo {
  3. public static void main(String[] args) {
  4. for (int i = 0; i < 5; i++) {
  5. demo(AtomicLong::new, AtomicLong::getAndIncrement);
  6. }
  7. for (int i = 0; i < 5; i++) {
  8. demo(LongAdder::new, LongAdder::increment);
  9. }
  10. }
  11. private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) {
  12. T adder = supplier.get();
  13. long start = System.nanoTime();
  14. List<Thread> list = new ArrayList<>();
  15. for (int i = 0; i < 40; i++) {
  16. list.add(new Thread(() -> {
  17. for (int k = 0; k < 50_0000; k++) {
  18. consumer.accept(adder);
  19. }
  20. }));
  21. }
  22. list.forEach(Thread::start);
  23. list.forEach(t -> {
  24. try {
  25. t.join();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. });
  30. long end = System.nanoTime();
  31. log.debug("{} cost: {}(ns)", adder.getClass().getSimpleName(), (end - start));
  32. }
  33. }

运行结果:

  1. 2021-04-27 22:34:29.842 [main] DEBUG Compare - AtomicLong cost: 363865900(ns)
  2. 2021-04-27 22:34:30.176 [main] DEBUG Compare - AtomicLong cost: 331326300(ns)
  3. 2021-04-27 22:34:30.565 [main] DEBUG Compare - AtomicLong cost: 388361700(ns)
  4. 2021-04-27 22:34:30.961 [main] DEBUG Compare - AtomicLong cost: 396090500(ns)
  5. 2021-04-27 22:34:31.349 [main] DEBUG Compare - AtomicLong cost: 386800900(ns)
  6. 2021-04-27 22:34:31.404 [main] DEBUG Compare - LongAdder cost: 53539000(ns)
  7. 2021-04-27 22:34:31.438 [main] DEBUG Compare - LongAdder cost: 33946400(ns)
  8. 2021-04-27 22:34:31.479 [main] DEBUG Compare - LongAdder cost: 40203000(ns)
  9. 2021-04-27 22:34:31.511 [main] DEBUG Compare - LongAdder cost: 32314300(ns)
  10. 2021-04-27 22:34:31.546 [main] DEBUG Compare - LongAdder cost: 34245400(ns)

可以发现,LongAddr的速度要比AtomicLong高出一个数量级。

4.7.2 LongAdder源码分析

先贴一下前辈的主页:http://gee.cs.oswego.edu

作为并发大师@Doug lea 的作品 LongAdder,它的设计非常精巧。

LongAdder类有几个关键域:

  1. // 累加单元数组,懒惰初始化
  2. transient volatile Cell[] cells;
  3. // 基础值,如果没有竞争,则用CAS累加这个域
  4. transient volatile long base;
  5. // 在cells创建或者扩容时,置为1,表示加锁
  6. transient volatile int cellsBusy;

4.7.2.1 CAS锁

切勿使用生产环境。

  1. @Slf4j(topic = "LockCAS")
  2. public class LockCASDemo {
  3. // 0表示没加锁
  4. // 1表示加了锁
  5. private final AtomicInteger state = new AtomicInteger(0);
  6. private void lock() {
  7. while (true) {
  8. if (state.compareAndSet(0, 1)) {
  9. break;
  10. }
  11. }
  12. }
  13. public void unlock() {
  14. log.debug("unlock...");
  15. state.set(0);
  16. }
  17. }

4.7.2.1 原理之伪共享

Cell即为累加单元。

  1. // 防止缓存行伪共享(一个缓存行容纳多个Cell对象)
  2. @Sun.misc.Contented
  3. static final class Cell {
  4. volatile long value;
  5. Cell(long x) { value = x; }
  6. // 最重要的方法,用CAS方式进行累加,prev表示旧值,next表示新值
  7. final boolean cas(long prev, long next) {
  8. return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
  9. }
  10. // ...
  11. }

从缓存说起,缓存与内存的速度比较:

image-20210427234550770.png

从CPU到 大约需要的时钟周期
寄存器 1 cycle(4GHz的CPU约为0.25ns)
L1 3~4 cycle
L2 10~20 cycle
L3 40~45 cycle
内存 120~240 cycle

因为CPU与内存的速度差异很大,需要靠预读数据至缓存来提升效率。

而缓存以缓存行为单位,每个缓存对应着一块内存,一般是64 byte(8个long)。

缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中。

CPU要保证数据的一致性,如果某个CPU核心更改了数据,其他CPU核心对应的整个缓存行必须失效。

image-20210428124344773.png

因为Cell是数组形式,在内存中是连续存储的,一个Cell为24字节(16字节的对象头和8字节的value),因此缓存行可以存下2个的Cell对象。这样问题来了;

  • Core-0要修改Cell[0]
  • Core-1要修改Cell[1]

无论谁修改成功,都会导致对方Core的缓存行失效,比如Core-0中Cell[0] = 6000,Cell[1] = 8000。要累加Cell[0] = 6001,Cell[1] = 8000,这时会让Core-1缓存行失效。

@sun.misc.Contented用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,从而让CPU将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效。

image-20210428200500865.png

4.7.2.2 主要方法

add方法:

  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. if ((as = cells) != null /* cells是否为空 */
  4. || !casBase(b = base, b + x) /* cas base累加,成功则不会进入if代码块 */) {
  5. boolean uncontended = true;
  6. if (as == null || (m = as.length - 1) < 0 ||
  7. (a = as[getProbe() & m]) == null || /* 当前线程cell是否创建 */
  8. !(uncontended = a.cas(v = a.value, v + x))) /* cas base累加,失败则进入 */
  9. longAccumulate(x, null, uncontended);
  10. }
  11. }

image-20210428203546219.png

longAccumulate方法:

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. int h;
  4. if ((h = getProbe()) == 0) {
  5. ThreadLocalRandom.current(); // force initialization
  6. h = getProbe();
  7. wasUncontended = true;
  8. }
  9. boolean collide = false; // True if last slot nonempty
  10. for (;;) { // 不断尝试
  11. Cell[] as; Cell a; int n; long v;
  12. if ((as = cells) != null && (n = as.length) > 0) {
  13. // (1)cells已创建,Cell未创建
  14. if ((a = as[(n - 1) & h]) == null) {
  15. if (cellsBusy == 0) { // 尝试获取cells数组
  16. Cell r = new Cell(x); // 乐观创建Cell对象
  17. if (cellsBusy == 0 && casCellsBusy()) {
  18. // 当前无人上锁,自己尝试上锁
  19. boolean created = false; // 是否已创建
  20. try { // Recheck under lock
  21. Cell[] rs; int m, j;
  22. if ((rs = cells) != null &&
  23. (m = rs.length) > 0 &&
  24. rs[j = (m - 1) & h] == null) {
  25. // 再次检查数组数组不为空且长度大于0
  26. // 且数组中空置的槽位为空
  27. rs[j] = r; // 将创建的新的Cell对象填充到数组的空槽位
  28. created = true;
  29. }
  30. } finally {
  31. cellsBusy = 0; // 释放锁
  32. }
  33. if (created)
  34. break; // 创建成功则退出循环
  35. continue; // Slot is now non-empty
  36. }
  37. }
  38. collide = false;
  39. }
  40. // (2)cells已创建。Cell已创建
  41. else if (!wasUncontended) // CAS already known to fail
  42. wasUncontended = true; // Continue after rehash
  43. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  44. fn.applyAsLong(v, x))))
  45. // 对累加单元a进行CAS+x,操作成功则退出
  46. break;
  47. else if (n >= NCPU || cells != as)
  48. // 检查当前n是否超过机器的CPU上限
  49. collide = false; // At max size or stale
  50. else if (!collide)
  51. // 上个条件匹配之后,下次循环就走这个判断,不会进入下面的扩容逻辑
  52. collide = true;
  53. else if (cellsBusy == 0 && casCellsBusy()) {
  54. // 扩容
  55. try {
  56. if (cells == as) { // Expand table unless stale
  57. Cell[] rs = new Cell[n << 1]; // 左移一位,扩容两倍
  58. for (int i = 0; i < n; ++i)
  59. rs[i] = as[i]; // 将旧数组拷贝到新数组
  60. cells = rs; // 替换cells
  61. }
  62. } finally {
  63. cellsBusy = 0;
  64. }
  65. collide = false;
  66. continue; // Retry with expanded table
  67. }
  68. h = advanceProbe(h); // 以上条件均不匹配,改变线程对应的cell对象
  69. }
  70. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  71. // (3)cells数组未创建,三个条件如下
  72. // cellsBusy是标记位,0代表未加锁,1代表已加锁
  73. // cells == as代表没有其他线程改变cells数组,as是第一次if判断读取到的数组引用
  74. // casCellsBusy()这个方法的作用就是尝试通过CAS将cellsBusy从0改为1,成功说明加锁成功
  75. boolean init = false; // 是否初始化
  76. try { // Initialize table
  77. if (cells == as) { // 再次判断是否有其他线程修改了cells
  78. Cell[] rs = new Cell[2]; // 初始大小为2
  79. rs[h & 1] = new Cell(x); // 赋初始值1
  80. cells = rs; // 将刚刚创建的数组赋值给成员变量
  81. init = true;
  82. }
  83. } finally {
  84. cellsBusy = 0; // 将标记位设为0,代表解锁
  85. }
  86. if (init) // 初始化成功则退出循环
  87. break;
  88. }
  89. else if (casBase(v = base, ((fn == null) ? v + x :
  90. fn.applyAsLong(v, x))))
  91. // 加锁失败,进行cas base累加,成功则break,失败则继续循环
  92. break; // Fall back on using base
  93. }
  94. }

image-20210428204940662.pngimage-20210428211351713.png
image-20210428220931544.png

sum方法:

  1. public long sum() {
  2. Cell[] as = cells; Cell a;
  3. long sum = base;
  4. if (as != null) { // 判空
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) != null) // 对每个元素判空
  7. sum += a.value; // 累加每个单元值
  8. }
  9. }
  10. return sum;
  11. }

4.8 Unsafe

4.8.1 概述

Unsafe对象提供了非常底层的,操作内存、线程的办法,Unsafe对象不能直接调用,智能通过反射获得:

  1. public class UnsafeAccessor {
  2. private static Unsafe unsafe;
  3. static {
  4. try {
  5. Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
  6. theUnsafe.setAccessible(true);
  7. unsafe = (Unsafe) theUnsafe.get(null);
  8. } catch (NoSuchFieldException | IllegalAccessException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. static Unsafe getUnsafe() {
  13. return unsafe;
  14. }
  15. }

4.8.2 CAS操作

  1. public class UnsafeDemo {
  2. public static void main(String[] args) throws NoSuchFieldException {
  3. Unsafe unsafe = UnsafeAccessor.getUnsafe();
  4. Field id = User.class.getDeclaredField("id");
  5. Field name = User.class.getDeclaredField("name");
  6. // 获得域的偏移地址
  7. long idOffset = unsafe.objectFieldOffset(id);
  8. long nameOffset = unsafe.objectFieldOffset(name);
  9. // 使用CAS方法替换成员变量
  10. User user = new User();
  11. unsafe.compareAndSwapInt(user, idOffset, 0, 1);
  12. unsafe.compareAndSwapObject(user, nameOffset, null, "KHighness");
  13. System.out.println(user);
  14. }
  15. }
  16. class User {
  17. volatile int id;
  18. volatile String name;
  19. public int getId() { return id; }
  20. public void setId(int id) { this.id = id; }
  21. public String getName() { return name; }
  22. public void setName(String name) { this.name = name; }
  23. @Override
  24. public String toString() {
  25. return "User[" +
  26. "id=" + id +
  27. ", name='" + name + '\'' +
  28. ']';
  29. }
  30. }

运行结果:

  1. User[id=1, name='KHighness']

4.8.3 自定义原子实现类

使用自定义的AtomicData实现之前线程安全的原子整数Account实现:

  1. class KAtomicInteger implements Account{
  2. private volatile int value;
  3. private static long valueOffset;
  4. private static final Unsafe UNSAFE;
  5. static {
  6. UNSAFE = UnsafeAccessor.getUnsafe();
  7. try {
  8. valueOffset = UNSAFE.objectFieldOffset(KAtomicInteger.class.getDeclaredField("value"));
  9. } catch (NoSuchFieldException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. public KAtomicInteger(int value) {
  14. this.value = value;
  15. }
  16. public int getValue() {
  17. return value;
  18. }
  19. public void decrement(int amount) {
  20. while (true) {
  21. int prev = this.value;
  22. int next = prev - amount;
  23. if (UNSAFE.compareAndSwapInt(this, valueOffset, prev, next)) {
  24. break;
  25. }
  26. }
  27. }
  28. @Override
  29. public Integer geBalance() {
  30. return getValue();
  31. }
  32. @Override
  33. public void withdraw(Integer amount) {
  34. decrement(amount);
  35. }
  36. }