add流程

image.png

  1. public void add(long x) {
  2. // as 为累加单元数组
  3. // b 为基础值
  4. // x 为累加值
  5. Cell[] as; long b, v; int m; Cell a;
  6. // 进入 if 的两个条件
  7. // 1. as 有值, 表示已经发生过竞争, 进入 if
  8. // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
  9. if ((as = cells) != null || !casBase(b = base, b + x)) {
  10. // uncontended 表示 cell 没有竞争
  11. boolean uncontended = true;
  12. if (
  13. // as 还没有创建
  14. as == null || (m = as.length - 1) < 0 ||
  15. // 当前线程对应的 cell 还没有
  16. (a = as[getProbe() & m]) == null ||
  17. // cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
  18. !(uncontended = a.cas(v = a.value, v + x))
  19. ) {
  20. // 进入 cell 数组创建、cell 创建的流程
  21. longAccumulate(x, null, uncontended);
  22. }
  23. }
  24. }

longAccumulate 流程

image.png
image.png
每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
image.png

  1. final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
  2. int h;
  3. // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
  4. if ((h = getProbe()) == 0) {
  5. // 初始化 probe
  6. ThreadLocalRandom.current();
  7. // h 对应新的 probe 值, 用来对应 cell
  8. h = getProbe();
  9. wasUncontended = true;
  10. }
  11. // collide 为 true 表示需要扩容
  12. boolean collide = false;
  13. for (; ; ) {
  14. Cell[] as; Cell a; int n; long v;
  15. // 已经有了 cells
  16. if ((as = cells) != null && (n = as.length) > 0) {
  17. // 还没有 cell
  18. if ((a = as[(n - 1) & h]) == null) {
  19. // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
  20. // 成功则 break, 否则继续 continue 循环
  21. }
  22. // 有竞争, 改变线程对应的 cell 来重试 cas
  23. else if (!wasUncontended)
  24. wasUncontended = true;
  25. // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
  26. else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
  27. break;
  28. // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
  29. else if (n >= NCPU || cells != as)
  30. collide = false;
  31. // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
  32. else if (!collide)
  33. collide = true;
  34. // 加锁
  35. else if (cellsBusy == 0 && casCellsBusy()) {
  36. // 加锁成功, 扩容
  37. continue;
  38. }
  39. // 改变线程对应的 cell
  40. h = advanceProbe(h);
  41. }
  42. // 还没有 cells, 尝试给 cellsBusy 加锁
  43. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  44. // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
  45. // 成功则 break;
  46. }
  47. // 上两种情况失败, 尝试给 base 累加
  48. else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
  49. break;
  50. }
  51. }

sum方法

  1. public long sum() {
  2. Cell[] as = cells; Cell a;
  3. long sum = base;
  4. if (as != null) {
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) != null)
  7. sum += a.value;
  8. }
  9. }
  10. return sum;
  11. }