一、介绍
1.1 简介
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
1.2 应用场景
- Exchanger可以用于遗传算法
- Exchanger也可以用于校对工作
1.3 Exchange api
//构造函数
public Exchanger()
//等待其他线程到达交换点(除非线程被打断,或者交换超时),
//传输给定对象到exchanger,接收exchanger 返回的对象
public V exchange(V x);
//等待其他线程到达交换点(除非线程被打断,或者交换超时),
//传输给定对象到exchanger,接收exchanger 返回的对象
public V exchange(V x, long timeout, TimeUnit unit);
1.4 简单的使用例子
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
String A = "银行流水A";// A录入银行流水数据
try {
String result = exgr.exchange(A);
System.out.println("receive " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPool.execute(() -> {
String B = "银行流水B";// B录入银行流水数据
String A = null;
try {
//交换数据
A = exgr.exchange("B");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:" + A + ",B录入是:" + B);
});
threadPool.shutdown();
}
}
//输出
A和B数据是否一致:false,A录入的是:银行流水A,B录入是:银行流水B
receive B
1.5 类UML 图
二、源码分析
2.1 Exchanger 的成员变量
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
// @sun.misc.Contended 消除伪共享
@sun.misc.Contended
static final class Node {
int index; // 当前节点 在数组 arena 的下标
int bound; // 交换器的最后记录值
int collides; // 在当前 bound 下 CAS 的失败次数
int hash; // 伪随机的自旋数
Object item; // A线程交换的数据项 如 Object match = A.exchange(item)
volatile Object match; // 与该A线程交换数据对象的目标B线程 的数据项
volatile Thread parked; // 当阻塞时,设置此线程,不阻塞的话就不必了(因为会自旋)
}
//用于保存每个线程的状态, 实现于ThreadLocal
private final Participant participant;
//交换器隔离区域,启动时为空(在slotExchange 方法内初始化)
private volatile Node[] arena;
//在同步点交换数据时,插槽用于保存线程的数据节点
private volatile Node slot;
//记录arena 最后合法的位置索引
private volatile int bound;
2.2 构造方法
public Exchanger() {
participant = new Participant();
}
在实例化时,创建了一个 ThreadLocal 对象,并设置了初始值,一个 Node 对象。
2.3 exchange 方法
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((
//1. 如果 arena 为空,则执行slotExchange方法;
//2. slotExchange返回值 不为空 ,则返回
arena != null || (v = slotExchange(item, false, 0L)) == null
)
&&
(
//3. arena 不为空 或者 slotExchange返回值 == null, 执行以下语句
(
//4. 线程被打断,返回
//5. 线程没有被打断, 执行arenaExchange方法;
Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null
)
))throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
2.4 slotExchange 的源码分析
//item: 线程交换的数据项
//timed 是否有超时
//ns 超时毫秒
private final Object slotExchange(Object item, boolean timed, long ns) {
//拿出存在ThreadLocal 中的 node
Node p = participant.get();
//获取调用线程
Thread t = Thread.currentThread();
//判断是否被打断
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
//自旋
for (Node q;;) {
//判断 slot 节点是否为空
if ((q = slot) != null) {
//第二个线程执行 slotExchange 进入, 假设slot 不为空
//设置插槽slot 为null, 唤醒等待线程, 返回数据项
if (U.compareAndSwapObject(this, SLOT, q, null)) {
//获取 q.item
Object v = q.item;
//自己的 item 赋值给 match,以让对方线程获取
q.match = item;
//唤醒与之匹配的线程,进行交换数据
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// cas 修改slot 失败,证明有线程在占用slot
//如果arena 还没有初始化,则创建arena数组
//直到slot 使用完置空后,其他线程执行exchange 方法后,就使用arena数组
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ)) // SEQ == 256; 默认 BOUND == 0
arena = new Node[(FULL + 2) << ASHIFT];// length = (2 + 2) << 7 == 512
}
else if (arena != null)
//多线程占用slot,使用arenaExchange方法
return null; // caller must reroute to arenaExchange
else {
//第一个线程执行exchange 方法时
//会创建一个Node 作为 slot
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
//跳出循环,等待释放
break;
// 如果 CAS 失败,将 p 的值清空,重来
p.item = null;
}
}
//执行这里,说明线程已经把需要交换的数据放到了slot中,当前线程阻塞自己
//等待另一个线程在同一同步点时,唤醒当前线程交换数据
// 伪随机数
int h = p.hash;
//超时时间
long end = timed ? System.nanoTime() + ns : 0L;
//自旋次数 SPINS: 1024
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
//获取 p.match 数据项,是否为空
//match 使用了volatile 修饰,具有线程间可见性
while ((v = p.match) == null) {
//如果为空,仍需阻塞等待
//判断自旋次数 是否大于0
if (spins > 0) {
// 计算伪随机数
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
// 如果不是0,就将自旋数减一
//自旋次数减为0,则让出 CPU 时间片
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
//如果自旋数不够了,且 slot 还没有得到,就重置自旋数
else if (slot != p)
spins = SPINS;
// 如果线程没有中断 && arena数组不是 null && 没有超时限制
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
//自旋次数用完了
//采用阻塞的手段
U.putObject(t, BLOCKER, this);
p.parked = t; //记录阻塞的线程
// 如果这个数据还没有被拿走,阻塞自己
if (slot == p)
U.park(false, ns);
//线程被唤醒,清空阻塞标识
p.parked = null;
// 将当前线程的 parkBlocker 属性设置成 null
U.putObject(t, BLOCKER, null);
}
// 如果有超时限制,使用 CAS 将 slot 从 p 变成 null,取消这次交换
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
//将节点p 的match 属性设置为空
//表示初始化状态,没有任何匹配 >>> putOrderedObject是putObjectVolatile的内存非立即可见版本.
U.putOrderedObject(p, MATCH, null);
// 重置 item
p.item = null;
// 保留伪随机数,供下次种子数字
p.hash = h;
//返回
return v;
}
slotExchage 方法逻辑
- 当第一个线程进入slotExchage 方法,会从ThreadLocal 中 获取p (Node)节点 ;没有则创建并保存在ThreadLocal中
- 线程把数据项item 保存到节点p 的item 中, 并把Exchanger 中的slot 指向 节点p
- 为了等待与之匹配交换数据的线程,需要进行等待阻塞 (为了性能考虑,按照优先级 自旋 > Thread.yield > 阻塞挂起)
- 当第二个线程进入的时候,会拿出存储在 slot item 中的值, 然后对 slot 的 match 赋值,并唤醒上次阻塞的线程.
- 当第一个线程阻塞被唤醒后,说明对方取到值了,就获取 slot 的 match (因为属性被volatile修饰,可以立即感知)值, 并重置 slot 的数据和ThreadLocal的数据,并返回自己的数据.
- 最后,如果超时了,就返回 Time_out 对象。如果线程中断了,就返回 null。在该方法中,会返回 2 种结果,一是有效的 item, 二是 null—- 要么是线程竞争使用 slot 了,创建了 arena 数组,要么是线程中断了.
2.5 使用slot 插槽节点交换数据图示
2.6 arenaExchange 方法 源码分析
private final Object arenaExchange(Object item, boolean timed, long ns) {
//获取arena 数组
Node[] a = arena;
//拿出存在ThreadLocal 中的 node
Node p = participant.get();
for (int i = p.index;;) { p.index 初始化时为0
int b, m, c; long j;
//(i << ASHIFT) + ABASE) 计算第i个元素的在数组arena 的下标
//通过下标 j 获取数组a 中的元素节点 q
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
////如果q不为null,则将对应的数组元素置为null,表示当前线程和该元素对应的线程匹配了
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
//获取 q.item
Object v = q.item;
//自己的 item 赋值给 match,以让对方线程获取
q.match = item;
//唤醒与之匹配的线程,进行交换数据
if (w != null)
U.unpark(w);
return v;
}
//SEQ = 256
//MMASK = 255
//q为null 或者q不为null,cas抢占q失败了
//bound初始化时是SEQ,SEQ & MMASK就是0,即m的初始值就是0,m为0时,i肯定为0
else if (i <= (m = (b = bound) & MMASK) && q == null) {
//把当前线程的数据放到p节点的item 中
p.item = item; // offer
//对应的数组元素修改为p
if (U.compareAndSwapObject(a, j, null, p)) {
//计算超时时间
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
//获取当前线程
Thread t = Thread.currentThread(); // wait
//自旋(等待匹配)
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) { //已经跟某个线程交换成功
//把p.match 置空
U.putOrderedObject(p, MATCH, null);
//把p.item 置空
p.item = null; // clear for next use
//保存hash 随机数,下次自旋使用
p.hash = h;
return v;
}
//还没匹配, 判断自旋次数是否大于0
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
//当自旋次数等于0, 采用 yield 让出CPU 时间片
Thread.yield(); // two yields per wait
}
else if (U.getObjectVolatile(a, j) != p)
//索引j处的数组元素发生变更
//重置自旋次数
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
//1. 线程没有被打断
//2. 没有超时
//3. m == 0
// 阻塞当前线程
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
//线程被唤醒,因为中断或者等待超时
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
//线程被中断了或者等待超时或者m不等于0,进入此分支
//m !=0
if (m != 0) // try to shrink
//修改bound,下一次跟MMASK求且的结果会减1,此时可能导致i大于m
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
//索引无符号右移(除以2)
i = p.index >>>= 1; // descend
//如果线程被打断,返回null
if (Thread.interrupted())
return null;
//如果超时,返回超时对象
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
//m不等于0的情形,下一次for循环继续抢占
//终止内层for循环,继续外层的for循环,会尝试抢占其他的数组元素,然后自旋等待
break; // expired; restart
}//内层for循环结束
}
}
else //数组元素修改失败,下一次for循环重试,q不为null,与该元素对应的线程匹配成功
p.item = null; // clear offer
}
else {
//i>m 或者q不为null,cas抢占q失败了 会进入此分支
//bound的初始值也是0
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
//如果i等于m且m不等于0,则i=m-1,否则i=m
i = (i != m || m == 0) ? m : m - 1;
}
//修改bound,下一次跟MMASK求且的结果即m会加1,下一次进入此else分支p.bound != b
//如果m小于FULL,会尝试最多m次,即进入下面的逻辑最多m次,
//如果还失败则增加m,然后继续尝试直到m等于FUll为止
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
//计数加1
p.collides = c + 1;
//i不等于0的时候,i等于i减1,等于0则i等于m,即循环的从m往后遍历arena数组的元素了
i = (i == 0) ? m : i - 1; // cyclically traverse
}
//上一个else if三个条件都为false,即p.collides >m 且m不等于FULL且cas 修改bound成功
else
//修改bound成功会导致下一次计算m时,m加1,
//此处i等于m+1,下一次for循环i等于m,会抢占m+1对应的rena数组的元素
i = m + 1; // grow
//重置i
p.index = i;
}
}
}
arenaExchange 方法的执行逻辑
- 当第一个线程进入arenaExchange 方法,会从ThreadLocal 中 获取p (Node)节点 ;没有则创建并保存在ThreadLocal中
- 遍历arena 数组,m的初始值就是0,index的初始值也是0,两个都是大于等于0且i不大于m。 如果下标j对应的元素为空,把当前线程的数据放到p节点的item 中,并cas 设置下标j对应 元素值为 p节点。
- 为了等待与之匹配交换数据的线程,需要进行等待阻塞 (为了性能考虑,按照优先级 自旋 > Thread.yield > 阻塞挂起)
- 当第二个线程进入的时候, 也会从0开始遍历 arena, 如果当前索引j 对应的元素不为空,cas 置空成功后,进行元素交换;会拿出存储在 q.item 中的值, 然后对 q.match 赋值,并唤醒与之匹配阻塞的线程.
- 如果当某个线程多次尝试抢占index对应数组元素的Node都失败的情形下则尝试将m加1,然后抢占m加1对应的新数组元素,将其由null修改成当前线程关联的Node,然后自旋等待匹配;如果自旋结束,没有匹配的线程,则将m加1对应的新数组元素重新置为null,将m减1,然后再次for循环抢占其他为null的数组元素。极端并发下m会一直增加直到达到最大值FULL为止,达到FULL后只能通过for循环不断尝试与其他线程匹配或者抢占为null的数组元素,然后随着并发减少,m会一直减少到0。