一、介绍

1.1 简介

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

1.2 应用场景

  • Exchanger可以用于遗传算法
  • Exchanger也可以用于校对工作

1.3 Exchange api

  1. //构造函数
  2. public Exchanger()
  3. //等待其他线程到达交换点(除非线程被打断,或者交换超时),
  4. //传输给定对象到exchanger,接收exchanger 返回的对象
  5. public V exchange(V x);
  6. //等待其他线程到达交换点(除非线程被打断,或者交换超时),
  7. //传输给定对象到exchanger,接收exchanger 返回的对象
  8. public V exchange(V x, long timeout, TimeUnit unit);

1.4 简单的使用例子

  1. public class ExchangerTest {
  2. private static final Exchanger<String> exgr = new Exchanger<String>();
  3. private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
  4. public static void main(String[] args) {
  5. threadPool.execute(() -> {
  6. String A = "银行流水A";// A录入银行流水数据
  7. try {
  8. String result = exgr.exchange(A);
  9. System.out.println("receive " + result);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. threadPool.execute(() -> {
  15. String B = "银行流水B";// B录入银行流水数据
  16. String A = null;
  17. try {
  18. //交换数据
  19. A = exgr.exchange("B");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:" + A + ",B录入是:" + B);
  24. });
  25. threadPool.shutdown();
  26. }
  27. }
  28. //输出
  29. AB数据是否一致:false,A录入的是:银行流水A,B录入是:银行流水B
  30. receive B

1.5 类UML 图

Exchanger.png

二、源码分析

2.1 Exchanger 的成员变量

  1. static final class Participant extends ThreadLocal<Node> {
  2. public Node initialValue() { return new Node(); }
  3. }
  4. // @sun.misc.Contended 消除伪共享
  5. @sun.misc.Contended
  6. static final class Node {
  7. int index; // 当前节点 在数组 arena 的下标
  8. int bound; // 交换器的最后记录值
  9. int collides; // 在当前 bound 下 CAS 的失败次数
  10. int hash; // 伪随机的自旋数
  11. Object item; // A线程交换的数据项 如 Object match = A.exchange(item)
  12. volatile Object match; // 与该A线程交换数据对象的目标B线程 的数据项
  13. volatile Thread parked; // 当阻塞时,设置此线程,不阻塞的话就不必了(因为会自旋)
  14. }
  15. //用于保存每个线程的状态, 实现于ThreadLocal
  16. private final Participant participant;
  17. //交换器隔离区域,启动时为空(在slotExchange 方法内初始化)
  18. private volatile Node[] arena;
  19. //在同步点交换数据时,插槽用于保存线程的数据节点
  20. private volatile Node slot;
  21. //记录arena 最后合法的位置索引
  22. private volatile int bound;

2.2 构造方法

  1. public Exchanger() {
  2. participant = new Participant();
  3. }

在实例化时,创建了一个 ThreadLocal 对象,并设置了初始值,一个 Node 对象。

2.3 exchange 方法

  1. public V exchange(V x) throws InterruptedException {
  2. Object v;
  3. Object item = (x == null) ? NULL_ITEM : x; // translate null args
  4. if ((
  5. //1. 如果 arena 为空,则执行slotExchange方法;
  6. //2. slotExchange返回值 不为空 ,则返回
  7. arena != null || (v = slotExchange(item, false, 0L)) == null
  8. )
  9. &&
  10. (
  11. //3. arena 不为空 或者 slotExchange返回值 == null, 执行以下语句
  12. (
  13. //4. 线程被打断,返回
  14. //5. 线程没有被打断, 执行arenaExchange方法;
  15. Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null
  16. )
  17. ))throw new InterruptedException();
  18. return (v == NULL_ITEM) ? null : (V)v;
  19. }

2.4 slotExchange 的源码分析

  1. //item: 线程交换的数据项
  2. //timed 是否有超时
  3. //ns 超时毫秒
  4. private final Object slotExchange(Object item, boolean timed, long ns) {
  5. //拿出存在ThreadLocal 中的 node
  6. Node p = participant.get();
  7. //获取调用线程
  8. Thread t = Thread.currentThread();
  9. //判断是否被打断
  10. if (t.isInterrupted()) // preserve interrupt status so caller can recheck
  11. return null;
  12. //自旋
  13. for (Node q;;) {
  14. //判断 slot 节点是否为空
  15. if ((q = slot) != null) {
  16. //第二个线程执行 slotExchange 进入, 假设slot 不为空
  17. //设置插槽slot 为null, 唤醒等待线程, 返回数据项
  18. if (U.compareAndSwapObject(this, SLOT, q, null)) {
  19. //获取 q.item
  20. Object v = q.item;
  21. //自己的 item 赋值给 match,以让对方线程获取
  22. q.match = item;
  23. //唤醒与之匹配的线程,进行交换数据
  24. Thread w = q.parked;
  25. if (w != null)
  26. U.unpark(w);
  27. return v;
  28. }
  29. // cas 修改slot 失败,证明有线程在占用slot
  30. //如果arena 还没有初始化,则创建arena数组
  31. //直到slot 使用完置空后,其他线程执行exchange 方法后,就使用arena数组
  32. if (NCPU > 1 && bound == 0 &&
  33. U.compareAndSwapInt(this, BOUND, 0, SEQ)) // SEQ == 256; 默认 BOUND == 0
  34. arena = new Node[(FULL + 2) << ASHIFT];// length = (2 + 2) << 7 == 512
  35. }
  36. else if (arena != null)
  37. //多线程占用slot,使用arenaExchange方法
  38. return null; // caller must reroute to arenaExchange
  39. else {
  40. //第一个线程执行exchange 方法时
  41. //会创建一个Node 作为 slot
  42. p.item = item;
  43. if (U.compareAndSwapObject(this, SLOT, null, p))
  44. //跳出循环,等待释放
  45. break;
  46. // 如果 CAS 失败,将 p 的值清空,重来
  47. p.item = null;
  48. }
  49. }
  50. //执行这里,说明线程已经把需要交换的数据放到了slot中,当前线程阻塞自己
  51. //等待另一个线程在同一同步点时,唤醒当前线程交换数据
  52. // 伪随机数
  53. int h = p.hash;
  54. //超时时间
  55. long end = timed ? System.nanoTime() + ns : 0L;
  56. //自旋次数 SPINS: 1024
  57. int spins = (NCPU > 1) ? SPINS : 1;
  58. Object v;
  59. //获取 p.match 数据项,是否为空
  60. //match 使用了volatile 修饰,具有线程间可见性
  61. while ((v = p.match) == null) {
  62. //如果为空,仍需阻塞等待
  63. //判断自旋次数 是否大于0
  64. if (spins > 0) {
  65. // 计算伪随机数
  66. h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
  67. if (h == 0)
  68. h = SPINS | (int)t.getId();
  69. // 如果不是0,就将自旋数减一
  70. //自旋次数减为0,则让出 CPU 时间片
  71. else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
  72. Thread.yield();
  73. }
  74. //如果自旋数不够了,且 slot 还没有得到,就重置自旋数
  75. else if (slot != p)
  76. spins = SPINS;
  77. // 如果线程没有中断 && arena数组不是 null && 没有超时限制
  78. else if (!t.isInterrupted() && arena == null &&
  79. (!timed || (ns = end - System.nanoTime()) > 0L)) {
  80. //自旋次数用完了
  81. //采用阻塞的手段
  82. U.putObject(t, BLOCKER, this);
  83. p.parked = t; //记录阻塞的线程
  84. // 如果这个数据还没有被拿走,阻塞自己
  85. if (slot == p)
  86. U.park(false, ns);
  87. //线程被唤醒,清空阻塞标识
  88. p.parked = null;
  89. // 将当前线程的 parkBlocker 属性设置成 null
  90. U.putObject(t, BLOCKER, null);
  91. }
  92. // 如果有超时限制,使用 CAS 将 slot 从 p 变成 null,取消这次交换
  93. else if (U.compareAndSwapObject(this, SLOT, p, null)) {
  94. v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
  95. break;
  96. }
  97. }
  98. //将节点p 的match 属性设置为空
  99. //表示初始化状态,没有任何匹配 >>> putOrderedObject是putObjectVolatile的内存非立即可见版本.
  100. U.putOrderedObject(p, MATCH, null);
  101. // 重置 item
  102. p.item = null;
  103. // 保留伪随机数,供下次种子数字
  104. p.hash = h;
  105. //返回
  106. return v;
  107. }

slotExchage 方法逻辑

  1. 当第一个线程进入slotExchage 方法,会从ThreadLocal 中 获取p (Node)节点 ;没有则创建并保存在ThreadLocal中
  2. 线程把数据项item 保存到节点p 的item 中, 并把Exchanger 中的slot 指向 节点p
  3. 为了等待与之匹配交换数据的线程,需要进行等待阻塞 (为了性能考虑,按照优先级 自旋 > Thread.yield > 阻塞挂起)
  4. 当第二个线程进入的时候,会拿出存储在 slot item 中的值, 然后对 slot 的 match 赋值,并唤醒上次阻塞的线程.
  5. 当第一个线程阻塞被唤醒后,说明对方取到值了,就获取 slot 的 match (因为属性被volatile修饰,可以立即感知)值, 并重置 slot 的数据和ThreadLocal的数据,并返回自己的数据.
  6. 最后,如果超时了,就返回 Time_out 对象。如果线程中断了,就返回 null。在该方法中,会返回 2 种结果,一是有效的 item, 二是 null—- 要么是线程竞争使用 slot 了,创建了 arena 数组,要么是线程中断了.

2.5 使用slot 插槽节点交换数据图示

image.png

2.6 arenaExchange 方法 源码分析

  1. private final Object arenaExchange(Object item, boolean timed, long ns) {
  2. //获取arena 数组
  3. Node[] a = arena;
  4. //拿出存在ThreadLocal 中的 node
  5. Node p = participant.get();
  6. for (int i = p.index;;) { p.index 初始化时为0
  7. int b, m, c; long j;
  8. //(i << ASHIFT) + ABASE) 计算第i个元素的在数组arena 的下标
  9. //通过下标 j 获取数组a 中的元素节点 q
  10. Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
  11. ////如果q不为null,则将对应的数组元素置为null,表示当前线程和该元素对应的线程匹配了
  12. if (q != null && U.compareAndSwapObject(a, j, q, null)) {
  13. //获取 q.item
  14. Object v = q.item;
  15. //自己的 item 赋值给 match,以让对方线程获取
  16. q.match = item;
  17. //唤醒与之匹配的线程,进行交换数据
  18. if (w != null)
  19. U.unpark(w);
  20. return v;
  21. }
  22. //SEQ = 256
  23. //MMASK = 255
  24. //q为null 或者q不为null,cas抢占q失败了
  25. //bound初始化时是SEQ,SEQ & MMASK就是0,即m的初始值就是0,m为0时,i肯定为0
  26. else if (i <= (m = (b = bound) & MMASK) && q == null) {
  27. //把当前线程的数据放到p节点的item 中
  28. p.item = item; // offer
  29. //对应的数组元素修改为p
  30. if (U.compareAndSwapObject(a, j, null, p)) {
  31. //计算超时时间
  32. long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
  33. //获取当前线程
  34. Thread t = Thread.currentThread(); // wait
  35. //自旋(等待匹配)
  36. for (int h = p.hash, spins = SPINS;;) {
  37. Object v = p.match;
  38. if (v != null) { //已经跟某个线程交换成功
  39. //把p.match 置空
  40. U.putOrderedObject(p, MATCH, null);
  41. //把p.item 置空
  42. p.item = null; // clear for next use
  43. //保存hash 随机数,下次自旋使用
  44. p.hash = h;
  45. return v;
  46. }
  47. //还没匹配, 判断自旋次数是否大于0
  48. else if (spins > 0) {
  49. h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
  50. if (h == 0) // initialize hash
  51. h = SPINS | (int)t.getId();
  52. else if (h < 0 && // approx 50% true
  53. (--spins & ((SPINS >>> 1) - 1)) == 0)
  54. //当自旋次数等于0, 采用 yield 让出CPU 时间片
  55. Thread.yield(); // two yields per wait
  56. }
  57. else if (U.getObjectVolatile(a, j) != p)
  58. //索引j处的数组元素发生变更
  59. //重置自旋次数
  60. spins = SPINS; // releaser hasn't set match yet
  61. else if (!t.isInterrupted() && m == 0 &&
  62. (!timed ||
  63. (ns = end - System.nanoTime()) > 0L)) {
  64. //1. 线程没有被打断
  65. //2. 没有超时
  66. //3. m == 0
  67. // 阻塞当前线程
  68. U.putObject(t, BLOCKER, this); // emulate LockSupport
  69. p.parked = t; // minimize window
  70. if (U.getObjectVolatile(a, j) == p)
  71. U.park(false, ns);
  72. //线程被唤醒,因为中断或者等待超时
  73. p.parked = null;
  74. U.putObject(t, BLOCKER, null);
  75. }
  76. else if (U.getObjectVolatile(a, j) == p &&
  77. U.compareAndSwapObject(a, j, p, null)) {
  78. //线程被中断了或者等待超时或者m不等于0,进入此分支
  79. //m !=0
  80. if (m != 0) // try to shrink
  81. //修改bound,下一次跟MMASK求且的结果会减1,此时可能导致i大于m
  82. U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
  83. p.item = null;
  84. p.hash = h;
  85. //索引无符号右移(除以2)
  86. i = p.index >>>= 1; // descend
  87. //如果线程被打断,返回null
  88. if (Thread.interrupted())
  89. return null;
  90. //如果超时,返回超时对象
  91. if (timed && m == 0 && ns <= 0L)
  92. return TIMED_OUT;
  93. //m不等于0的情形,下一次for循环继续抢占
  94. //终止内层for循环,继续外层的for循环,会尝试抢占其他的数组元素,然后自旋等待
  95. break; // expired; restart
  96. }//内层for循环结束
  97. }
  98. }
  99. else //数组元素修改失败,下一次for循环重试,q不为null,与该元素对应的线程匹配成功
  100. p.item = null; // clear offer
  101. }
  102. else {
  103. //i>m 或者q不为null,cas抢占q失败了 会进入此分支
  104. //bound的初始值也是0
  105. if (p.bound != b) { // stale; reset
  106. p.bound = b;
  107. p.collides = 0;
  108. //如果i等于m且m不等于0,则i=m-1,否则i=m
  109. i = (i != m || m == 0) ? m : m - 1;
  110. }
  111. //修改bound,下一次跟MMASK求且的结果即m会加1,下一次进入此else分支p.bound != b
  112. //如果m小于FULL,会尝试最多m次,即进入下面的逻辑最多m次,
  113. //如果还失败则增加m,然后继续尝试直到m等于FUll为止
  114. else if ((c = p.collides) < m || m == FULL ||
  115. !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
  116. //计数加1
  117. p.collides = c + 1;
  118. //i不等于0的时候,i等于i减1,等于0则i等于m,即循环的从m往后遍历arena数组的元素了
  119. i = (i == 0) ? m : i - 1; // cyclically traverse
  120. }
  121. //上一个else if三个条件都为false,即p.collides >m 且m不等于FULL且cas 修改bound成功
  122. else
  123. //修改bound成功会导致下一次计算m时,m加1,
  124. //此处i等于m+1,下一次for循环i等于m,会抢占m+1对应的rena数组的元素
  125. i = m + 1; // grow
  126. //重置i
  127. p.index = i;
  128. }
  129. }
  130. }

arenaExchange 方法的执行逻辑

  1. 当第一个线程进入arenaExchange 方法,会从ThreadLocal 中 获取p (Node)节点 ;没有则创建并保存在ThreadLocal中
  2. 遍历arena 数组,m的初始值就是0,index的初始值也是0,两个都是大于等于0且i不大于m。 如果下标j对应的元素为空,把当前线程的数据放到p节点的item 中,并cas 设置下标j对应 元素值为 p节点。
  3. 为了等待与之匹配交换数据的线程,需要进行等待阻塞 (为了性能考虑,按照优先级 自旋 > Thread.yield > 阻塞挂起)
  4. 当第二个线程进入的时候, 也会从0开始遍历 arena, 如果当前索引j 对应的元素不为空,cas 置空成功后,进行元素交换;会拿出存储在 q.item 中的值, 然后对 q.match 赋值,并唤醒与之匹配阻塞的线程.
  5. 如果当某个线程多次尝试抢占index对应数组元素的Node都失败的情形下则尝试将m加1,然后抢占m加1对应的新数组元素,将其由null修改成当前线程关联的Node,然后自旋等待匹配;如果自旋结束,没有匹配的线程,则将m加1对应的新数组元素重新置为null,将m减1,然后再次for循环抢占其他为null的数组元素。极端并发下m会一直增加直到达到最大值FULL为止,达到FULL后只能通过for循环不断尝试与其他线程匹配或者抢占为null的数组元素,然后随着并发减少,m会一直减少到0。

2.7 areanExchange 图示

image.png

参考