1. public class LongAdder extends Striped64 implements Serializable {
    2. private static final long serialVersionUID = 7249069246863182397L;
    3. /**
    4. * Creates a new adder with initial sum of zero.
    5. */
    6. public LongAdder() {
    7. }
    8. /**
    9. * Adds the given value.
    10. *
    11. * @param x the value to add
    12. */
    13. public void add(long x) {
    14. //as 表示cells引用
    15. //b 表示获取的base值
    16. //v 表示 期望值
    17. //m 表示 cells 数组的长度
    18. //a 表示当前线程命中的cell单元格
    19. Cell[] as; long b, v; int m; Cell a;
    20. //条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
    21. // false->表示cells未初始化,当前所有线程应该将数据写到base中
    22. //条件二:false->表示当前线程cas替换数据成功,
    23. // true->表示发生竞争了,可能需要重试 或者 扩容
    24. if ((as = cells) != null || !casBase(b = base, b + x)) {
    25. //什么时候会进来?
    26. //1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
    27. //2.true->表示发生竞争了,可能需要重试 或者 扩容
    28. //true -> 未竞争 false->发生竞争
    29. boolean uncontended = true;
    30. //条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了
    31. // false->说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值
    32. //条件二:getProbe() 获取当前线程的hash值 m表示cells长度-1 cells长度 一定是2的次方数 15= b1111
    33. // true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
    34. // false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。
    35. //条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争
    36. // false->表示cas成功
    37. if (as == null || (m = as.length - 1) < 0 ||
    38. (a = as[getProbe() & m]) == null ||
    39. !(uncontended = a.cas(v = a.value, v + x)))
    40. //都有哪些情况会调用?
    41. //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
    42. //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
    43. //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
    44. longAccumulate(x, null, uncontended);
    45. }
    46. }
    47. /**
    48. * Equivalent to {@code add(1)}.
    49. */
    50. //+1
    51. public void increment() {
    52. add(1L);
    53. }
    54. /**
    55. * Equivalent to {@code add(-1)}.
    56. */
    57. //-1
    58. public void decrement() {
    59. add(-1L);
    60. }
    61. /**
    62. * Returns the current sum. The returned value is <em>NOT</em> an
    63. * atomic snapshot; invocation in the absence of concurrent
    64. * updates returns an accurate result, but concurrent updates that
    65. * occur while the sum is being calculated might not be
    66. * incorporated.
    67. *
    68. * @return the sum
    69. */
    70. public long sum() {
    71. Cell[] as = cells; Cell a;
    72. long sum = base;
    73. if (as != null) {
    74. for (int i = 0; i < as.length; ++i) {
    75. if ((a = as[i]) != null)
    76. sum += a.value;
    77. }
    78. }
    79. return sum;
    80. }
    81. /**
    82. * Resets variables maintaining the sum to zero. This method may
    83. * be a useful alternative to creating a new adder, but is only
    84. * effective if there are no concurrent updates. Because this
    85. * method is intrinsically racy, it should only be used when it is
    86. * known that no threads are concurrently updating.
    87. */
    88. public void reset() {
    89. Cell[] as = cells; Cell a;
    90. base = 0L;
    91. if (as != null) {
    92. for (int i = 0; i < as.length; ++i) {
    93. if ((a = as[i]) != null)
    94. a.value = 0L;
    95. }
    96. }
    97. }
    98. /**
    99. * Equivalent in effect to {@link #sum} followed by {@link
    100. * #reset}. This method may apply for example during quiescent
    101. * points between multithreaded computations. If there are
    102. * updates concurrent with this method, the returned value is
    103. * <em>not</em> guaranteed to be the final value occurring before
    104. * the reset.
    105. *
    106. * @return the sum
    107. */
    108. public long sumThenReset() {
    109. Cell[] as = cells; Cell a;
    110. long sum = base;
    111. base = 0L;
    112. if (as != null) {
    113. for (int i = 0; i < as.length; ++i) {
    114. if ((a = as[i]) != null) {
    115. sum += a.value;
    116. a.value = 0L;
    117. }
    118. }
    119. }
    120. return sum;
    121. }
    122. /**
    123. * Returns the String representation of the {@link #sum}.
    124. * @return the String representation of the {@link #sum}
    125. */
    126. public String toString() {
    127. return Long.toString(sum());
    128. }
    129. /**
    130. * Equivalent to {@link #sum}.
    131. *
    132. * @return the sum
    133. */
    134. public long longValue() {
    135. return sum();
    136. }
    137. /**
    138. * Returns the {@link #sum} as an {@code int} after a narrowing
    139. * primitive conversion.
    140. */
    141. public int intValue() {
    142. return (int)sum();
    143. }
    144. /**
    145. * Returns the {@link #sum} as a {@code float}
    146. * after a widening primitive conversion.
    147. */
    148. public float floatValue() {
    149. return (float)sum();
    150. }
    151. /**
    152. * Returns the {@link #sum} as a {@code double} after a widening
    153. * primitive conversion.
    154. */
    155. public double doubleValue() {
    156. return (double)sum();
    157. }
    158. /**
    159. * Serialization proxy, used to avoid reference to the non-public
    160. * Striped64 superclass in serialized forms.
    161. * @serial include
    162. */
    163. private static class SerializationProxy implements Serializable {
    164. private static final long serialVersionUID = 7249069246863182397L;
    165. /**
    166. * The current value returned by sum().
    167. * @serial
    168. */
    169. private final long value;
    170. SerializationProxy(LongAdder a) {
    171. value = a.sum();
    172. }
    173. /**
    174. * Return a {@code LongAdder} object with initial state
    175. * held by this proxy.
    176. *
    177. * @return a {@code LongAdder} object with initial state
    178. * held by this proxy.
    179. */
    180. private Object readResolve() {
    181. LongAdder a = new LongAdder();
    182. a.base = value;
    183. return a;
    184. }
    185. }
    186. /**
    187. * Returns a
    188. * <a href="../../../../serialized-form.html#java.util.concurrent.atomic.LongAdder.SerializationProxy">
    189. * SerializationProxy</a>
    190. * representing the state of this instance.
    191. *
    192. * @return a {@link SerializationProxy}
    193. * representing the state of this instance
    194. */
    195. private Object writeReplace() {
    196. return new SerializationProxy(this);
    197. }
    198. /**
    199. * @param s the stream
    200. * @throws java.io.InvalidObjectException always
    201. */
    202. private void readObject(java.io.ObjectInputStream s)
    203. throws java.io.InvalidObjectException {
    204. throw new java.io.InvalidObjectException("Proxy required");
    205. }
    206. }
    1. @SuppressWarnings("serial")
    2. abstract class Striped64 extends Number {
    3. @sun.misc.Contended static final class Cell {
    4. volatile long value;
    5. Cell(long x) { value = x; }
    6. final boolean cas(long cmp, long val) {
    7. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    8. }
    9. // Unsafe mechanics
    10. private static final sun.misc.Unsafe UNSAFE;
    11. private static final long valueOffset;
    12. static {
    13. try {
    14. UNSAFE = sun.misc.Unsafe.getUnsafe();
    15. Class<?> ak = Cell.class;
    16. valueOffset = UNSAFE.objectFieldOffset
    17. (ak.getDeclaredField("value"));
    18. } catch (Exception e) {
    19. throw new Error(e);
    20. }
    21. }
    22. }
    23. /** Number of CPUS, to place bound on table size */
    24. //表示当前计算机CPU数量,什么用? 控制cells数组长度的一个关键条件
    25. static final int NCPU = Runtime.getRuntime().availableProcessors();
    26. /**
    27. * Table of cells. When non-null, size is a power of 2.
    28. */
    29. transient volatile Cell[] cells;
    30. /**
    31. * Base value, used mainly when there is no contention, but also as
    32. * a fallback during table initialization races. Updated via CAS.
    33. * 没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中
    34. */
    35. transient volatile long base;
    36. /**
    37. * Spinlock (locked via CAS) used when resizing and/or creating Cells.
    38. * 初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了
    39. */
    40. transient volatile int cellsBusy;
    41. /**
    42. * Package-private default constructor
    43. */
    44. Striped64() {
    45. }
    46. /**
    47. * CASes the base field.
    48. */
    49. final boolean casBase(long cmp, long val) {
    50. return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    51. }
    52. /**
    53. * CASes the cellsBusy field from 0 to 1 to acquire lock.
    54. * 通过CAS方式获取锁
    55. */
    56. final boolean casCellsBusy() {
    57. return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    58. }
    59. /**
    60. * Returns the probe value for the current thread.
    61. * Duplicated from ThreadLocalRandom because of packaging restrictions.
    62. *
    63. * 获取当前线程的Hash值
    64. */
    65. static final int getProbe() {
    66. return UNSAFE.getInt(Thread.currentThread(), PROBE);
    67. }
    68. /**
    69. * Pseudo-randomly advances and records the given probe value for the
    70. * given thread.
    71. * Duplicated from ThreadLocalRandom because of packaging restrictions.
    72. *
    73. * 重置当前线程的Hash值
    74. */
    75. static final int advanceProbe(int probe) {
    76. probe ^= probe << 13; // xorshift
    77. probe ^= probe >>> 17;
    78. probe ^= probe << 5;
    79. UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    80. return probe;
    81. }
    82. /**
    83. * Handles cases of updates involving initialization, resizing,
    84. * creating new Cells, and/or contention. See above for
    85. * explanation. This method suffers the usual non-modularity
    86. * problems of optimistic retry code, relying on rechecked sets of
    87. * reads.
    88. *
    89. * @param x the value
    90. * @param fn the update function, or null for add (this convention
    91. * avoids the need for an extra field or function in LongAdder).
    92. * @param wasUncontended false if CAS failed before call
    93. */
    94. //都有哪些情况会调用?
    95. //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
    96. //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
    97. //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
    98. // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
    99. final void longAccumulate(long x, LongBinaryOperator fn,
    100. boolean wasUncontended) {
    101. //h 表示线程hash值
    102. int h;
    103. //条件成立:说明当前线程 还未分配hash值
    104. if ((h = getProbe()) == 0) {
    105. //给当前线程分配hash值
    106. ThreadLocalRandom.current(); // force initialization
    107. //取出当前线程的hash值 赋值给h
    108. h = getProbe();
    109. //为什么? 因为默认情况下 当前线程 肯定是写入到了 cells[0] 位置。 不把它当做一次真正的竞争
    110. wasUncontended = true;
    111. }
    112. //表示扩容意向 false 一定不会扩容,true 可能会扩容。
    113. boolean collide = false; // True if last slot nonempty
    114. //自旋
    115. for (;;) {
    116. //as 表示cells引用
    117. //a 表示当前线程命中的cell
    118. //n 表示cells数组长度
    119. //v 表示 期望值
    120. Cell[] as; Cell a; int n; long v;
    121. //CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
    122. if ((as = cells) != null && (n = as.length) > 0) {
    123. //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
    124. //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
    125. //CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
    126. if ((a = as[(n - 1) & h]) == null) {
    127. //true->表示当前锁 未被占用 false->表示锁被占用
    128. if (cellsBusy == 0) { // Try to attach new Cell
    129. //拿当前的x创建Cell
    130. Cell r = new Cell(x); // Optimistically create
    131. //条件一:true->表示当前锁 未被占用 false->表示锁被占用
    132. //条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
    133. if (cellsBusy == 0 && casCellsBusy()) {
    134. //是否创建成功 标记
    135. boolean created = false;
    136. try { // Recheck under lock
    137. //rs 表示当前cells 引用
    138. //m 表示cells长度
    139. //j 表示当前线程命中的下标
    140. Cell[] rs; int m, j;
    141. //条件一 条件二 恒成立
    142. //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
    143. //导致丢失数据
    144. if ((rs = cells) != null &&
    145. (m = rs.length) > 0 &&
    146. rs[j = (m - 1) & h] == null) {
    147. rs[j] = r;
    148. created = true;
    149. }
    150. } finally {
    151. cellsBusy = 0;
    152. }
    153. if (created)
    154. break;
    155. continue; // Slot is now non-empty
    156. }
    157. }
    158. //扩容意向 强制改为了false
    159. collide = false;
    160. }
    161. // CASE1.2:
    162. // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
    163. else if (!wasUncontended) // CAS already known to fail
    164. wasUncontended = true; // Continue after rehash
    165. //CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空
    166. //true -> 写成功,退出循环
    167. //false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次
    168. else if (a.cas(v = a.value, ((fn == null) ? v + x :
    169. fn.applyAsLong(v, x))))
    170. break;
    171. //CASE 1.4:
    172. //条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容
    173. //条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可
    174. else if (n >= NCPU || cells != as)
    175. //扩容意向 改为false,表示不扩容了
    176. collide = false; // At max size or stale
    177. //CASE 1.5:
    178. //!collide = true 设置扩容意向 为true 但是不一定真的发生扩容
    179. else if (!collide)
    180. collide = true;
    181. //CASE 1.6:真正扩容的逻辑
    182. //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁
    183. //条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑
    184. // false->表示当前时刻有其它线程在做扩容相关的操作。
    185. else if (cellsBusy == 0 && casCellsBusy()) {
    186. try {
    187. //cells == as
    188. if (cells == as) { // Expand table unless stale
    189. Cell[] rs = new Cell[n << 1];
    190. for (int i = 0; i < n; ++i)
    191. rs[i] = as[i];
    192. cells = rs;
    193. }
    194. } finally {
    195. //释放锁
    196. cellsBusy = 0;
    197. }
    198. collide = false;
    199. continue; // Retry with expanded table
    200. }
    201. //重置当前线程Hash值
    202. h = advanceProbe(h);
    203. }
    204. //CASE2:前置条件cells还未初始化 as 为null
    205. //条件一:true 表示当前未加锁
    206. //条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells
    207. //条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁
    208. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    209. boolean init = false;
    210. try { // Initialize table
    211. //cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
    212. if (cells == as) {
    213. Cell[] rs = new Cell[2];
    214. rs[h & 1] = new Cell(x);
    215. cells = rs;
    216. init = true;
    217. }
    218. } finally {
    219. cellsBusy = 0;
    220. }
    221. if (init)
    222. break;
    223. }
    224. //CASE3:
    225. //1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base
    226. //2.cells被其它线程初始化后,当前线程需要将数据累加到base
    227. else if (casBase(v = base, ((fn == null) ? v + x :
    228. fn.applyAsLong(v, x))))
    229. break; // Fall back on using base
    230. }
    231. }
    232. /**
    233. * Same as longAccumulate, but injecting long/double conversions
    234. * in too many places to sensibly merge with long version, given
    235. * the low-overhead requirements of this class. So must instead be
    236. * maintained by copy/paste/adapt.
    237. */
    238. final void doubleAccumulate(double x, DoubleBinaryOperator fn,
    239. boolean wasUncontended) {
    240. int h;
    241. if ((h = getProbe()) == 0) {
    242. ThreadLocalRandom.current(); // force initialization
    243. h = getProbe();
    244. wasUncontended = true;
    245. }
    246. boolean collide = false; // True if last slot nonempty
    247. for (;;) {
    248. Cell[] as; Cell a; int n; long v;
    249. if ((as = cells) != null && (n = as.length) > 0) {
    250. if ((a = as[(n - 1) & h]) == null) {
    251. if (cellsBusy == 0) { // Try to attach new Cell
    252. Cell r = new Cell(Double.doubleToRawLongBits(x));
    253. if (cellsBusy == 0 && casCellsBusy()) {
    254. boolean created = false;
    255. try { // Recheck under lock
    256. Cell[] rs; int m, j;
    257. if ((rs = cells) != null &&
    258. (m = rs.length) > 0 &&
    259. rs[j = (m - 1) & h] == null) {
    260. rs[j] = r;
    261. created = true;
    262. }
    263. } finally {
    264. cellsBusy = 0;
    265. }
    266. if (created)
    267. break;
    268. continue; // Slot is now non-empty
    269. }
    270. }
    271. collide = false;
    272. }
    273. else if (!wasUncontended) // CAS already known to fail
    274. wasUncontended = true; // Continue after rehash
    275. else if (a.cas(v = a.value,
    276. ((fn == null) ?
    277. Double.doubleToRawLongBits
    278. (Double.longBitsToDouble(v) + x) :
    279. Double.doubleToRawLongBits
    280. (fn.applyAsDouble
    281. (Double.longBitsToDouble(v), x)))))
    282. break;
    283. else if (n >= NCPU || cells != as)
    284. collide = false; // At max size or stale
    285. else if (!collide)
    286. collide = true;
    287. else if (cellsBusy == 0 && casCellsBusy()) {
    288. try {
    289. if (cells == as) { // Expand table unless stale
    290. Cell[] rs = new Cell[n << 1];
    291. for (int i = 0; i < n; ++i)
    292. rs[i] = as[i];
    293. cells = rs;
    294. }
    295. } finally {
    296. cellsBusy = 0;
    297. }
    298. collide = false;
    299. continue; // Retry with expanded table
    300. }
    301. h = advanceProbe(h);
    302. }
    303. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    304. boolean init = false;
    305. try { // Initialize table
    306. if (cells == as) {
    307. Cell[] rs = new Cell[2];
    308. rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
    309. cells = rs;
    310. init = true;
    311. }
    312. } finally {
    313. cellsBusy = 0;
    314. }
    315. if (init)
    316. break;
    317. }
    318. else if (casBase(v = base,
    319. ((fn == null) ?
    320. Double.doubleToRawLongBits
    321. (Double.longBitsToDouble(v) + x) :
    322. Double.doubleToRawLongBits
    323. (fn.applyAsDouble
    324. (Double.longBitsToDouble(v), x)))))
    325. break; // Fall back on using base
    326. }
    327. }
    328. // Unsafe mechanics
    329. private static final sun.misc.Unsafe UNSAFE;
    330. private static final long BASE;
    331. private static final long CELLSBUSY;
    332. private static final long PROBE;
    333. static {
    334. try {
    335. UNSAFE = sun.misc.Unsafe.getUnsafe();
    336. Class<?> sk = Striped64.class;
    337. BASE = UNSAFE.objectFieldOffset
    338. (sk.getDeclaredField("base"));
    339. CELLSBUSY = UNSAFE.objectFieldOffset
    340. (sk.getDeclaredField("cellsBusy"));
    341. Class<?> tk = Thread.class;
    342. PROBE = UNSAFE.objectFieldOffset
    343. (tk.getDeclaredField("threadLocalRandomProbe"));
    344. } catch (Exception e) {
    345. throw new Error(e);
    346. }
    347. }
    348. }