一.使用
public class Test {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Producer p1 = new Producer("p1", queue, 1);
Producer p2 = new Producer("p2", queue, 2);
Consumer c1 = new Consumer("c1", queue);
Consumer c2 = new Consumer("c2", queue);
c1.start();
Thread.sleep(1000);
c2.start();
Thread.sleep(1000);
p1.start();
Thread.sleep(1000);
p2.start();
}
}
class Producer extends Thread {
private SynchronousQueue<Integer> queue;
private int n;
public Producer(String name, SynchronousQueue<Integer> queue, int n) {
super(name);
this.queue = queue;
this.n = n;
}
@Override
public void run() {
System.out.println(getName() + " offer " + queue.offer(n));
}
}
class Consumer extends Thread {
private SynchronousQueue<Integer> queue;
public Consumer(String name, SynchronousQueue<Integer> queue) {
super(name);
this.queue = queue;
}
@SneakyThrows
@Override
public void run() {
System.out.println(getName() + " take " + queue.take());
}
}
输出:
p1 offer true
c2 take 1
p2 offer true
c1 take 2
二.源码
SynchronousQueue构造方法
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
}
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// REQUEST:消费,DATA:生产
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) {
// 入栈
if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
}
}
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
SNode m = s.match;
if (m != null)
// 4.唤醒返回
return m;
if (spins > 0)
// 1.自旋
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
// 2.设置线程
s.waiter = w;
else if (!timed)
// 3.阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
offer
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// REQUEST:消费,DATA:生产
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (!isFulfilling(h.mode)) {
// 入栈
if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
SNode m = s.next;
SNode mn = m.next;
if (m.tryMatch(s)) {
// 设置head
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
}
}
}
}
boolean tryMatch(SNode s) {
// this对象的match属性赋值
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) {
waiter = null;
// 唤醒
LockSupport.unpark(w);
}
return true;
}
return match == s;
}