原子操作类


基本类型原子类

  • AtomicInteger
  • AtomicBoolean
  • AtomicLong

    常用API简介

    1. public final int get() // 获取当前的值
    2. public final int getAndSet(int newValue)// 获取当前的值,并设置新的值
    3. public final int getAndIncrement()// 获取当前的值,并自增
    4. public final int getAndDecrement() // 获取当前的值,并自减
    5. public final int getAndAdd(int delta) // 获取当前的值,并加上预期的值
    6. boolean compareAndSet(int expect, int update) // 如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
    1. class MyNumber {
    2. public AtomicInteger getAtomicInteger() {
    3. return atomicInteger;
    4. }
    5. private AtomicInteger atomicInteger = new AtomicInteger();
    6. public void addPlusPlus() {
    7. atomicInteger.incrementAndGet();
    8. }
    9. }
    10. public class AtomicIntegerDemo {
    11. public static void main(String[] args) throws InterruptedException {
    12. MyNumber myNumber = new MyNumber();
    13. CountDownLatch countDownLatch = new CountDownLatch(100);
    14. for (int i = 1; i <= 100; i++) {
    15. new Thread(() -> {
    16. try {
    17. for (int j = 1; j <= 5000; j++) {
    18. myNumber.addPlusPlus();
    19. }
    20. } finally {
    21. countDownLatch.countDown();
    22. }
    23. }, String.valueOf(i)).start();
    24. }
    25. countDownLatch.await();
    26. TimeUnit.SECONDS.sleep(10);
    27. System.out.println(myNumber.getAtomicInteger().get());
    28. }
    29. }

    数组类型原子类


  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

    1. public class AtomicIntegerArrayDemo {
    2. public static void main(String[] args) {
    3. AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
    4. //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
    5. //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
    6. for (int i = 0; i < atomicIntegerArray.length(); i++) {
    7. System.out.println(atomicIntegerArray.get(i));
    8. }
    9. System.out.println();
    10. System.out.println();
    11. System.out.println();
    12. int tmpInt = 0;
    13. tmpInt = atomicIntegerArray.getAndSet(0, 1122);
    14. System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
    15. atomicIntegerArray.getAndIncrement(1);
    16. atomicIntegerArray.getAndIncrement(1);
    17. tmpInt = atomicIntegerArray.getAndIncrement(1);
    18. System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));
    19. }
    20. }

    引用类型原子类


  • AtomicReference

    1. class AtomicReferenceDemoUser {
    2. String userName;
    3. int age;
    4. public AtomicReferenceDemoUser(String userName, int age) {
    5. this.userName = userName;
    6. this.age = age;
    7. }
    8. public String getUserName() {
    9. return userName;
    10. }
    11. public void setUserName(String userName) {
    12. this.userName = userName;
    13. }
    14. public int getAge() {
    15. return age;
    16. }
    17. public void setAge(int age) {
    18. this.age = age;
    19. }
    20. @Override
    21. public String toString() {
    22. return "AtomicReferenceDemoUser{" +
    23. "userName='" + userName + '\'' +
    24. ", age=" + age +
    25. '}';
    26. }
    27. }
    28. public class AtomicReferenceDemo {
    29. public static void main(String[] args) {
    30. AtomicReferenceDemoUser z3 = new AtomicReferenceDemoUser("z3", 24);
    31. AtomicReferenceDemoUser li4 = new AtomicReferenceDemoUser("li4", 26);
    32. AtomicReference atomicReferenceUser = new AtomicReference<>();
    33. atomicReferenceUser.set(z3);
    34. System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
    35. System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
    36. }
    37. }

    自旋锁

    1. // 自旋锁
    2. public class SpinLockDemo {
    3. // 原子引用线程
    4. AtomicReference atomicReference = new AtomicReference<>();
    5. public void myLock() {
    6. Thread thread = Thread.currentThread();
    7. System.out.println(Thread.currentThread().getName() + "\t come in.");
    8. while (!atomicReference.compareAndSet(null, thread)) {
    9. }
    10. }
    11. public void myUnlock() {
    12. Thread thread = Thread.currentThread();
    13. atomicReference.compareAndSet(thread, null);
    14. System.out.println(Thread.currentThread().getName() + "\t invoke myUnlock().");
    15. }
    16. public static void main(String[] args) {
    17. SpinLockDemo demo = new SpinLockDemo();
    18. new Thread("t1") {
    19. @Override
    20. public void run() {
    21. demo.myLock();
    22. try {
    23. sleep(5000);
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. }
    27. demo.myUnlock();
    28. }
    29. }.start();
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. new Thread("t2") {
    36. @Override
    37. public void run() {
    38. demo.myLock();
    39. try {
    40. sleep(1000);
    41. } catch (InterruptedException e) {
    42. e.printStackTrace();
    43. }
    44. demo.myUnlock();
    45. }
    46. }.start();
    47. }
    48. }
  • AtomicStampedReference:携带版本号的引用类型原子类,可以解决ABA问题

  • AtomicMarkableReference:原子更新带有标记位的引用类型对象。AtomicStampedReference可以知道,引用变量中途被更改了几次。有时候,我们并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference。
    • 解决是否修改过,它的定义就是将状态戳简化为true | false

AtomicMarkableReference的唯一区别就是不再用int标识引用,而是使用boolean变量——表示引用变量是否被更改过。

对象的属性修改原子类


使用目的:**以一种线程安全的方式操作非线程安全对象内的某些字段**
使用要求:

  • 更新的对象属性必须使用 public volatile 修饰符
  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
  • AtomicIntegerFieldUpdater:原子更新对象中int类型字段的值

    1. class BankAccount {
    2. private String bankName = "CCB";// 银行
    3. public volatile int money = 0;// 钱数
    4. static final AtomicIntegerFieldUpdater<BankAccount> accountAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");
    5. // 不加锁 + 性能高,局部微创
    6. public void transferMoney(BankAccount bankAccount) {
    7. accountAtomicIntegerFieldUpdater.incrementAndGet(bankAccount);
    8. }
    9. }
    10. public class AtomicIntegerFieldUpdaterDemo {
    11. public static void main(String[] args) {
    12. BankAccount bankAccount = new BankAccount();
    13. for (int i = 1; i <= 1000; i++) {
    14. int finalI = i;
    15. new Thread(() -> {
    16. bankAccount.transferMoney(bankAccount);
    17. }, String.valueOf(i)).start();
    18. }
    19. //暂停毫秒
    20. try {
    21. TimeUnit.MILLISECONDS.sleep(500);
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. }
    25. System.out.println(bankAccount.money);
    26. }
    27. }
  • AtomicLongFieldUpdater:原子更新对象中Long类型字段的值

  • AtomicReferenceFieldUpdater:原子更新引用类型字段的值

    1. class MyVar {
    2. public volatile Boolean isInit = Boolean.FALSE;
    3. static final AtomicReferenceFieldUpdater atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class, "isInit");
    4. public void init(MyVar myVar) {
    5. if (atomicReferenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
    6. System.out.println(Thread.currentThread().getName() + "\t" + "---init.....");
    7. //暂停几秒钟线程
    8. try {
    9. TimeUnit.SECONDS.sleep(2);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. System.out.println(Thread.currentThread().getName() + "\t" + "---init.....over");
    14. } else {
    15. System.out.println(Thread.currentThread().getName() + "\t" + "------其它线程正在初始化");
    16. }
    17. }
    18. }
    19. /**
    20. * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
    21. */
    22. public class AtomicReferenceFieldUpdaterDemo {
    23. public static void main(String[] args) throws InterruptedException {
    24. MyVar myVar = new MyVar();
    25. for (int i = 1; i <= 5; i++) {
    26. new Thread(() -> {
    27. myVar.init(myVar);
    28. }, String.valueOf(i)).start();
    29. }
    30. }
    31. }

    原子操作增强类


  • LongAdder

    常用API

    1. void add(long x) // 将当前的value加x
    2. void increment() // 将当前的value加1
    3. void decrement() // 将当前的value减1
    4. long sum() // 返回当前值,在没有并发更新value的情况下,sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值
    5. void reset() // 将value重置为0,可用于替代重新new一个LongAdder,但此方法只可以在没有并发更新情况下使用
    6. long sumThenReset() // 获取当前value并将value重置为0

    ```java public class LongAccumulatorDemo { LongAdder longAdder = new LongAdder();

    public void add_LongAdder() {

    1. longAdder.increment();

    } LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {

    1. @Override
    2. // left:当前值(若没有,使用初始化的值),right:形参传入的值
    3. public long applyAsLong(long left, long right) {
    4. return left - right;
    5. }

    }, 777);

    public void add_LongAccumulator() {

    1. longAccumulator.accumulate(1);

    } public static void main(String[] args) {

    1. LongAccumulatorDemo demo = new LongAccumulatorDemo();
    2. demo.add_LongAccumulator();
    3. demo.add_LongAccumulator();
    4. System.out.println(demo.longAccumulator.longValue());

    } }

public class LongAdderAPIDemo { public static void main(String[] args) { LongAdder longAdder = new LongAdder(); longAdder.increment(); longAdder.increment(); longAdder.increment(); System.out.println(longAdder.longValue());

  1. // 传入的y值和x值(x值为空,使用默认值2)做乘法后返回
  2. LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x * y, 2);
  3. longAccumulator.accumulate(1);// 2 * 1 = 2,x = 2
  4. longAccumulator.accumulate(2);// 2 * 2 = 4,x = 4
  5. longAccumulator.accumulate(3);// 4 * 3 = 12,x = 12
  6. System.out.println(longAccumulator.longValue());
  7. }

}

  1. <a name="v7amw"></a>
  2. ## LongAdder高性能对比
  3. ```java
  4. class ClickNumberNet {
  5. int number = 0;
  6. public synchronized void clickBySync() {
  7. number++;
  8. }
  9. AtomicLong atomicLong = new AtomicLong(0);
  10. public void clickByAtomicLong() {
  11. atomicLong.incrementAndGet();
  12. }
  13. LongAdder longAdder = new LongAdder();
  14. public void clickByLongAdder() {
  15. longAdder.increment();
  16. }
  17. LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);
  18. public void clickByLongAccumulator() {
  19. longAccumulator.accumulate(1);
  20. }
  21. }
  22. public class LongAdderDemo2 {
  23. public static void main(String[] args) throws InterruptedException {
  24. ClickNumberNet clickNumberNet = new ClickNumberNet();
  25. long startTime;
  26. long endTime;
  27. CountDownLatch countDownLatch = new CountDownLatch(50);
  28. CountDownLatch countDownLatch2 = new CountDownLatch(50);
  29. CountDownLatch countDownLatch3 = new CountDownLatch(50);
  30. CountDownLatch countDownLatch4 = new CountDownLatch(50);
  31. startTime = System.currentTimeMillis();
  32. for (int i = 1; i <= 50; i++) {
  33. new Thread(() -> {
  34. try {
  35. for (int j = 1; j <= 100 * 10000; j++) {
  36. clickNumberNet.clickBySync();
  37. }
  38. } finally {
  39. countDownLatch.countDown();
  40. }
  41. }, String.valueOf(i)).start();
  42. }
  43. countDownLatch.await();
  44. endTime = System.currentTimeMillis();
  45. System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickBySync result: " + clickNumberNet.number);
  46. startTime = System.currentTimeMillis();
  47. for (int i = 1; i <= 50; i++) {
  48. new Thread(() -> {
  49. try {
  50. for (int j = 1; j <= 100 * 10000; j++) {
  51. clickNumberNet.clickByAtomicLong();
  52. }
  53. } finally {
  54. countDownLatch2.countDown();
  55. }
  56. }, String.valueOf(i)).start();
  57. }
  58. countDownLatch2.await();
  59. endTime = System.currentTimeMillis();
  60. System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByAtomicLong result: " + clickNumberNet.atomicLong);
  61. startTime = System.currentTimeMillis();
  62. for (int i = 1; i <= 50; i++) {
  63. new Thread(() -> {
  64. try {
  65. for (int j = 1; j <= 100 * 10000; j++) {
  66. clickNumberNet.clickByLongAdder();
  67. }
  68. } finally {
  69. countDownLatch3.countDown();
  70. }
  71. }, String.valueOf(i)).start();
  72. }
  73. countDownLatch3.await();
  74. endTime = System.currentTimeMillis();
  75. System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAdder result: " + clickNumberNet.longAdder.sum());
  76. startTime = System.currentTimeMillis();
  77. for (int i = 1; i <= 50; i++) {
  78. new Thread(() -> {
  79. try {
  80. for (int j = 1; j <= 100 * 10000; j++) {
  81. clickNumberNet.clickByLongAccumulator();
  82. }
  83. } finally {
  84. countDownLatch4.countDown();
  85. }
  86. }, String.valueOf(i)).start();
  87. }
  88. countDownLatch4.await();
  89. endTime = System.currentTimeMillis();
  90. System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAccumulator result: " + clickNumberNet.longAccumulator.longValue());
  91. }
  92. }

LongAdder性能高的原因

  • LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作。这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回
  • sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点

原子类、ThreadLocal - 图1

LongAdder源码、原理分析

LongAdder是Striped64的子类
原子类、ThreadLocal - 图2

Striped64的成员变量

// CPU数量,即cells数组的最大长度
static final int NCPU = Runtime.getRuntime().availableProcessors();

// cells数组,为2的幂,2,4,8,16.....,方便以后位运算
transient volatile Cell[] cells; 

// 基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新
transient volatile long base;

//  创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁
transient volatile int cellsBusy;

Striped64中方法的定义

// 类似于AtomicLong中全局的value值,在没有竞争情况下数据直接累计在base上,或者cells扩容时,也需要将数据写入到base上
base

// 表示扩容一项,false:一定不会扩容,true:可能会扩容
collide

// 初始化cells或者扩容cells需要获取锁,0:表示无锁状态 1:表示其他线程已经持有了锁
cellsBusy

// 通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
casCellsBusy()

// 当前计算机CPU数量,Cell数组扩容时会使用到
NCPU

// 获取当前线程的hash值
getProbe()

// 重置当前线程的hash值
advanceProbe()

longAdder.incremenet()

public void add(long x) {
     /**
      as是Striped64中的cells数组属性 
      b是Striped64中的base属性 
      v是当前线程hash到的Cell中存储的值 
      m是cells的长度减1,hash时作为掩码使用 
      a是当前线程hash到的Cell 
      */
     Cell[] as;
     long b, v;
     int m;
     Cell a;
     //  首次首线程((as = cells) != null)  一定是false,此时走casBase方法,以CAS的方式更新base的值,且只有当cas失败时,才会走到if中 
     条件①:cells不为空,说明出现过竞争,Cell[] 已创建
     条件②:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
     if ((as = cells) != null || !casBase(b = base, b + x)) {
         boolean uncontended = true;//  true无竞争,false表示竞争激烈,多个线程hash到同一个Cell,可能要扩容
         //  条件①:cells为空,说明正在出现竞争,是从上面条件②过来的 
         //  条件②:不会出现 
         //  条件③:当前线程所在的Cell为空,说明当前线程还没有更新过Cell,应该初始化一个Cell 
         //  条件④:更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到了同一个Cell,应该扩容 
         if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v 
                 = a.value, v + x)))
  // getProbe()方法返回的是线程中的threadLocalRandomProbe字段,它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改)
             longAccumulate(x, null, uncontended);
     }
}

原子类、ThreadLocal - 图3

  • 最初无竞争时只更新base
  • 如果更新base失败后,首次新建一个Cell[]数组
  • 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容

    longAccumulate()方法入参

  • long x 需要增加的值,一般都默认是1

  • LongBinaryOperator fn 默认传递的是null
  • wasUncontended竞争标识,如果是false则代表竞争。只有cells初始化之后,并且当前线程失败,才会是false

    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
          int h;
          if ((h = getProbe()) == 0) { //  给当前线程生成一个非0的hash值
              ThreadLocalRandom.current(); // force initialization
              h = getProbe();
              wasUncontended = true;
          }
          boolean collide = false;//  如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看做是扩容意向
          for (; ; ) {
              Cell[] as;
              Cell a;
              int n;
              long v;
              // CASE1:cells已经被初始化了(Cell[]数组已经初始化)
              if ((as = cells) != null && (n = as.length) > 0) {
                  if ((a = as[(n - 1) & h]) == null) {// 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
                      if (cellsBusy == 0) { //  Cell[]数组没有正在扩容
                          Cell r = new Cell(x);//  创建一个Cell单元
                          if (cellsBusy == 0 && casCellsBusy()) {//  尝试加锁,成功后cellsBusy = 1
                              boolean created = false;
                              try {
                                  Cell[] rs;
                                  int m, j;
                                  if ((rs = cells) != null &&
                                          (m = rs.length) > 0 &&
                                          rs[j = (m - 1) & h] == null) {
                                      rs[j] = r;//  将Cell单元赋值到Cell[]数组上
                                      created = true;
                                  }
                              } finally {
                                  cellsBusy = 0;//  清空自旋标识,释放锁
                              }
                              if (created) // 如果原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环
                                  break;
                              continue; //  不是自己进行第一次累积操作,重头再来
                          }
                      }
                      collide = false;
                  } else if (!wasUncontended)
                  /**
                   wasUncontended表示cells初始化后,当前线程竞争修改失败,wasUncontended=false,这里只是重新设置了这个值为true,紧接着执行advanceProbe(h)重置  
    
                   当前线程的hash,重新循环
                   */
                      wasUncontended = true;
                  else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                      //  说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。 
                      break;
                  else if (n >= NCPU || cells != as)//  如果n大于CPU最大数量,不可扩容,并通过下面的h =  advanceProbe(h)方法修改线程的probe再重新尝试
                      collide = false;
                  else if (!collide)
                      collide = true;//  如果扩容意向为collide是false,则修改它为true,然后重新计算当前线程的hash值继续循环
                  else if (cellsBusy == 0 && casCellsBusy()) {
                      try {
                          if (cells == as) {//  当前的cells数组和最先赋值的as是同一个,代表没有被其他线程扩容过
                              Cell[] rs = new Cell[n << 1];//  左移1,扩容为之前容量的两倍
                              for (int i = 0; i < n; ++i)
                                  rs[i] = as[i];//  扩容之后再将之前数组的元素拷贝到新数组中
                              cells = rs;
                          }
                      } finally {
                          cellsBusy = 0;//  释放锁设置cellsBusy = 0,设置扩容状态,然后继续循环执行
                      }
                      collide = false;
                      continue;
                  }
                  h = advanceProbe(h);// 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况
              }
              // CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组(Cell[]数组未初始化,首次新建)
              else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                  /**
                   如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 
                   表示创建一个新的Cell元素,value是x值,默认为1。
                   h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思
                   */
                  boolean init = false;
                  try {
                      if (cells == as) {//  双重检查
                          Cell[] rs = new Cell[2];
                          rs[h & 1] = new Cell(x);//  找到当前线程hash到数组中的位置并创建其对应的Cell
                          cells = rs;
                          init = true;
                      }
                  } finally {
                      cellsBusy = 0;
                  }
                  if (init)
                      break;
              }
              // CASE3:cells正在进行初始化操作,则尝试直接在基数base上进行累加操作(Cell[]数组正在初始化中)
              else if (casBase(v = base, ((fn == null) ? v + fn.applyAsLong(v, x))))
                  //  该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
                  break;
          }
      }
    

    原子类、ThreadLocal - 图4

    sum()

    sum执行时,并没有限制对base和cells的更新。所以LongAdder不是强一致性的,它是最终一致性的。最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

总结

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
原子类、ThreadLocal - 图5

AtomicLong和LongAdder对比

AtomicLong

  • 原理:CAS + 自旋
  • 场景:低并发下的全局计算,AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。线程安全,可允许一些性能损耗,要求高精度时可使用
  • 缺陷:高并发情况下性能急剧下降,N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

LongAdder

  • 原理:CAS + Base + Cell数组分散,空间换时间并分散了热点数据
  • 场景:高并发下的全局计算。当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用。LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
  • 缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确

ThreadLocal


介绍

ThreadLocal提供线程局部变量。这些变量与正常的变量不同,因为每一个线程在访问ThreadLocal实例的时候(通过其get或set方法)都有自己的、独立初始化的变量副本。ThreadLocal实例通常是类中的私有静态字段,使用它的目的是希望将状态(例如,用户ID或事务ID)与线程关联起来。

作用

实现每一个线程都有自己专属的本地变量副本(自己用自己的变量不麻烦别人,不和其他人共享,人人有份,人各一份),主要解决了让每个线程绑定自己的值,通过使用get()和set()方法,获取默认值或将其值更改为当前线程所存的副本的值从而避免了线程安全问题。

class House {
    ThreadLocal threadLocal = ThreadLocal.withInitial(() -> 0);
    public void saleHouse() {
        Integer value = threadLocal.get();
        value++;
        threadLocal.set(value);
    }
}
public class ThreadLocalDemo {
    public static void main(String[] args) {
        House house = new House();
        new Thread(() -> {
            try {
                for (int i = 1; i <= 3; i++) {
                    house.saleHouse();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "---" + house.threadLocal.get());
            } finally {
                house.threadLocal.remove();// 如果不清理自定义的 ThreadLocal 变量,可能会影响后续业务逻辑和造成内存泄露等问题
            }
        }, "t1").start();
        new Thread(() -> {
            try {
                for (int i = 1; i <= 2; i++) {
                    house.saleHouse();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "---" + house.threadLocal.get());
            } finally {
                house.threadLocal.remove();
            }
        }, "t2").start();
        new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    house.saleHouse();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "---" + house.threadLocal.get());
            } finally {
                house.threadLocal.remove();
            }
        }, "t3").start();
        System.out.println(Thread.currentThread().getName() + "\t" + "---" + house.threadLocal.get());
    }
}

因为每个Thread内有自己的实例副本且该副本只由当前线程自己使用,既然其它Thread不可访问,那就不存在多线程间共享的问题。统一设置初始值,但是每个线程对这个值的修改都是各自线程互相独立的,加入synchronized或者Lock控制资源的访问顺序,人手一份,大家各自安好,没必要抢夺。

非线程安全的SimpleDateFormat

public class DateUtils {
    public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 模拟并发环境下使用SimpleDateFormat的parse方法将字符串转换成Date对象
     *
     * @param stringDate
     * @return
     * @throws Exception
     */
    public static Date parseDate(String stringDate) throws Exception {
        return sdf.parse(stringDate);
    }
    public static void main(String[] args) throws Exception {
        for (int i = 1; i <= 30; i++) {
            new Thread(() -> {
                try {
                    System.out.println(DateUtils.parseDate("2020-11-11 11:11:11"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}
  • SimpleDateFormat类内部有一个Calendar对象引用,它用来储存和这个SimpleDateFormat相关的日期信息,例如sdf.parse(dateStr),sdf.format(date)诸如此类的方法参数传入的日期相关String,Date等等,都是交由Calendar引用来储存的。这样就会导致一个问题如果你的SimpleDateFormat是个static的,那么多个thread之间就会共享这个SimpleDateFormat, 同时也是共享这个Calendar引用

    // 解决方案1
    public class DateUtils {
    
      public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
      /**
       * 模拟并发环境下使用SimpleDateFormat的parse方法将字符串转换成Date对象
       *
       * @param stringDate
       * @return
       * @throws Exception
       */
      public static Date parseDate(String stringDate) throws Exception {
          return sdf.parse(stringDate);
      }
      public static void main(String[] args) throws Exception {
          for (int i = 1; i <= 30; i++) {
              new Thread(() -> {
                  try {
                      // 每个线程创建一个sdf
                      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                      System.out.println(sdf.parse("2020-11-11 11:11:11"));
                      sdf = null;
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }, String.valueOf(i)).start();
          }
      }
    }
    
解决方案2
public class DateUtils {
    private static final ThreadLocal<SimpleDateFormat>  sdf_threadLocal = ThreadLocal.withInitial(()-> new  SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    /**
     * 模拟并发环境下使用SimpleDateFormat的parse方法将字符串转换成Date对象
     *
     * @param stringDate
     * @return
     * @throws Exception
     */
    public static Date parseDateTL(String stringDate)throws  Exception {
        return sdf_threadLocal.get().parse(stringDate);
    }
    public static void main(String[] args) throws Exception {
        for (int i = 1; i <= 30; i++) {
            new Thread(() -> {
                try {
                    System.out.println(DateUtils.parseDateTL("2020-11-11  11:11:11"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

Thread、ThreadLocal、ThreadLocalMap关系

ThreadLocalMap实际上就是一个以ThreadLocal实例为key,任意对象为value的Entry对象。当我们为ThreadLocal变量赋值,实际上就是以当前ThreadLocal实例为key,值为value的Entry往这个ThreadLocalMap中存放。JVM内部维护了一线程版的Map(通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLoalMap中),每个线程要用到这个T的时候,用当前的线程去Map里面获取,通过这样让每个线程都拥有了自己独立的变量。

原子类、ThreadLocal - 图6

ThreadLocal的内存泄漏问题

  • 弱引用:ThreadLocalMap从字面上就可以看出这是一个保存ThreadLocal对象的map(其实是以它为Key),不过是经过了两层包装的ThreadLocal对象:
    • 第一层包装是使用 WeakReference> 将ThreadLocal对象变成一个弱引用的对象
    • 第二层包装是定义了一个专门的类 Entry 来扩展 WeakReference>

原子类、ThreadLocal - 图7
每个Thread对象维护着一个ThreadLocalMap的引用。 ThreadLocalMap是ThreadLocal的内部类,用Entry来进行存储。调用ThreadLocal的set()方法时,实际上就是往ThreadLocalMap设置值,key是ThreadLocal对象,值Value是传递进来的对象。调用ThreadLocal的get()方法时,实际上就是往ThreadLocalMap获取值,key是ThreadLocal对象。ThreadLocal本身并不存储值,它只是自己作为一个key来让线程从ThreadLocalMap获取value,正因为这个原理,所以ThreadLocal能够实现”数据隔离”,获取当前线程的局部变量值,不受其他线程影响。当在一个线程中创建多个ThreaLocal对象时,实际就是在同一个ThreadLocalMap的Entry[]中存放多个Entry,通过ThreadLocal作为不同的key来区分的。

当 function01方法执行完毕后,栈帧销毁强引用 tl 也就没有了。但此时线程的ThreadLocalMap里某个entry的key引用还指向这个对象。若这个key引用是强引用,就会导致key指向的ThreadLocal对象及v指向的对象不能被gc回收,造成内存泄漏;若这个key引用是弱引用就大概率会减少内存泄漏的问题(还有一个key为null的雷)。使用弱引用,就可以使ThreadLocal对象在方法执行完毕后顺利被回收且Entry的key引用指向为null。当ThreadLocal变量用static来修饰时,使用强弱引用是无所谓的,因为static会使变量的生命周期延长。只有当变量不用static修饰时弱引用才会规避掉内存泄漏的问题

  • key=null时

    • 当我们为ThreadLocal变量赋值,实际上就是当前的Entry(threadLocal实例为key,值为value)往这个threadLocalMap中存放。Entry中的key是弱引用,当ThreadLocal外部强引用被置为null(tl=null),那么系统 GC 的时候,根据可达性分析,这个threadLocal实例就没有任何一条链路能够引用到它,这个ThreadLocal势必会被回收。这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:**Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value**永远无法回收,造成内存泄漏。
    • 如果当前Thread运行结束,ThreadLocal,ThreadLocalMap,Entry没有引用链可达,在垃圾回收的时候都会被系统进行回收。但在实际使用中我们有时候会用线程池去维护我们的线程,比如Executors.newFixedThreadPool()时创建线程的时候,为了复用线程是不会结束的,所以threadLocal内存泄漏就值得我们小心

       <br />ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用引用他,那么系统gc的时候,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现 **key为null的Entry**,就没有办法访问这些key为null的Entry的value。如果当前线程再迟迟不结束的话(比如正好用在线程池),这些key为null的Entry的value就会一直存在一条强引用链。虽然弱引用保证了key指向的ThreadLocal对象能被及时回收,但是v指向的value对象是需要ThreadLocalMap调用get、set时发现key为null时才会去回收整个entry、value。因此弱引用不能100%保证内存不泄露。**我们要在不使用某个ThreadLocal对象后,手动调用remoev方法来删除它**,尤其是在线程池中,不仅仅是内存泄露的问题,因为线程池中的线程是重复使用的,意味着这个线程的ThreadLocalMap对象也是重复使用的,如果我们不手动调用remove方法,那么后面的线程就有可能获取到上个线程遗留下来的value值,造成bug。<br /> 从前面的set、getEntry、remove方法看出,在ThreadLocal的生命周期里,针对ThreadLocal存在的内存泄漏的问题,都会通过**expungeStaleEntry()、cleanSomeSlots()、replaceStaleEntry()**这三个方法清理掉key为null的脏entry。
      

ThreadLocal解决Hash冲突

ThreadLocalMap使用闭散列(开放地址法或者也叫线性探测法)解决哈希冲突,线性探测法的地址增量di = 1, 2, … 其中,i为探测次数。该方法一次探测下一个地址,直到有空的地址后插入,若整个空间都找不到空余的地址,则产生溢出。假设当前table长度为16,也就是说如果计算出来key的hash值为14,如果table[14]上已经有值,并且其key与当前key不一致,那么就发生了hash冲突,这个时候将14加1得到15,取table[15]进行判断,这个时候如果还是冲突会回到0,取table[0],以此类推,直到可以插入。
当我们要往哈希表中插入一个数据时,通过哈希函数计算该值的哈希地址,当我们找到哈希地址时却发现该位置已经被别的数据插入了,那么此时我们就找紧跟着这一位置的下一个位置,看是否能够插入,如果能则插入,不能则继续探测紧跟着当前位置的下一个位置。
image.png
假设要将key=y的元素存入哈希表,通过哈希函数求出哈希地址为7,比较哈希地址7的元素的key是否等于y,不相等,继续比对哈希地址为8的元素…直到找到哈希地址为2的位置,可以存储。

private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}
private static int prevIndex(int i, int len) {
    return ((i - 1 >= 0) ? i - 1 : len - 1);
}
private void set(ThreadLocal<?> key, Object value) {
    ThreadLocal.ThreadLocalMap.Entry[] tab = table;
    int len = tab.length;
    //计算索引,上面已经有说过。
    int i = key.threadLocalHashCode & (len-1);
    /**根据获取到的索引进行循环,如果当前索引上的table[i]不为空,在没有return的情况下,
    * 就使用nextIndex()获取下一个(上面提到到线性探测法)。*/
    for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null;
        e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //table[i]上key不为空,并且和当前key相同,更新value
        if (k == key) {
            e.value = value;
            return;
        }
        /**table[i]上的key为空,说明被回收了
         * 说明改table[i]可以重新使用,用新的key-value将其替换,并删除其他无效的entry*/
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    //不存在也没有旧元素就创建一个
    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();//扩容
}

ThreadLocal总结

  • ThreadLocal 并不解决线程间共享数据的问题
  • ThreadLocal 适用于变量在线程间隔离且在方法间共享的场景
  • ThreadLocal 通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题
  • 每个线程持有一个只属于自己的专属ThreadLocalMap并维护了ThreadLocal对象与具体实例的映射,该Map由于只被持有它的线程访问,故不存在线程安全以及锁的问题
  • ThreadLocalMap的Entry对ThreadLocal的引用为弱引用,避免了ThreadLocal对象无法被回收的问题
  • 都会通过expungeStaleEntry()、cleanSomeSlots()、replaceStaleEntry()这三个方法回收键为Entry 对象的值(即为具体实例)为null以及 Entry 对象本身从而防止内存泄漏,属于安全加固的方法