一.使用
public class Test {public static void main(String[] args) throws InterruptedException {SynchronousQueue<Integer> queue = new SynchronousQueue<>();Producer p1 = new Producer("p1", queue, 1);Producer p2 = new Producer("p2", queue, 2);Consumer c1 = new Consumer("c1", queue);Consumer c2 = new Consumer("c2", queue);c1.start();Thread.sleep(1000);c2.start();Thread.sleep(1000);p1.start();Thread.sleep(1000);p2.start();}}class Producer extends Thread {private SynchronousQueue<Integer> queue;private int n;public Producer(String name, SynchronousQueue<Integer> queue, int n) {super(name);this.queue = queue;this.n = n;}@Overridepublic void run() {System.out.println(getName() + " offer " + queue.offer(n));}}class Consumer extends Thread {private SynchronousQueue<Integer> queue;public Consumer(String name, SynchronousQueue<Integer> queue) {super(name);this.queue = queue;}@SneakyThrows@Overridepublic void run() {System.out.println(getName() + " take " + queue.take());}}输出:p1 offer truec2 take 1p2 offer truec1 take 2
二.源码
SynchronousQueue构造方法
public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
take
public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;}E transfer(E e, boolean timed, long nanos) {SNode s = null;// REQUEST:消费,DATA:生产int mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;if (h == null || h.mode == mode) {// 入栈if (casHead(h, s = snode(s, e, h, mode))) {SNode m = awaitFulfill(s, timed, nanos);return (E) ((mode == REQUEST) ? m.item : s.item);}}}}SNode awaitFulfill(SNode s, boolean timed, long nanos) {Thread w = Thread.currentThread();int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {SNode m = s.match;if (m != null)// 4.唤醒返回return m;if (spins > 0)// 1.自旋spins = shouldSpin(s) ? (spins-1) : 0;else if (s.waiter == null)// 2.设置线程s.waiter = w;else if (!timed)// 3.阻塞LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}}
offer
public boolean offer(E e) {if (e == null) throw new NullPointerException();return transferer.transfer(e, true, 0) != null;}E transfer(E e, boolean timed, long nanos) {SNode s = null;// REQUEST:消费,DATA:生产int mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;if (!isFulfilling(h.mode)) {// 入栈if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) {SNode m = s.next;SNode mn = m.next;if (m.tryMatch(s)) {// 设置headcasHead(s, mn);return (E) ((mode == REQUEST) ? m.item : s.item);}}}}}}boolean tryMatch(SNode s) {// this对象的match属性赋值if (match == null &&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {Thread w = waiter;if (w != null) {waiter = null;// 唤醒LockSupport.unpark(w);}return true;}return match == s;}
