一.使用

  1. public class Test {
  2. public static void main(String[] args) throws InterruptedException {
  3. SynchronousQueue<Integer> queue = new SynchronousQueue<>();
  4. Producer p1 = new Producer("p1", queue, 1);
  5. Producer p2 = new Producer("p2", queue, 2);
  6. Consumer c1 = new Consumer("c1", queue);
  7. Consumer c2 = new Consumer("c2", queue);
  8. c1.start();
  9. Thread.sleep(1000);
  10. c2.start();
  11. Thread.sleep(1000);
  12. p1.start();
  13. Thread.sleep(1000);
  14. p2.start();
  15. }
  16. }
  17. class Producer extends Thread {
  18. private SynchronousQueue<Integer> queue;
  19. private int n;
  20. public Producer(String name, SynchronousQueue<Integer> queue, int n) {
  21. super(name);
  22. this.queue = queue;
  23. this.n = n;
  24. }
  25. @Override
  26. public void run() {
  27. System.out.println(getName() + " offer " + queue.offer(n));
  28. }
  29. }
  30. class Consumer extends Thread {
  31. private SynchronousQueue<Integer> queue;
  32. public Consumer(String name, SynchronousQueue<Integer> queue) {
  33. super(name);
  34. this.queue = queue;
  35. }
  36. @SneakyThrows
  37. @Override
  38. public void run() {
  39. System.out.println(getName() + " take " + queue.take());
  40. }
  41. }
  42. 输出:
  43. p1 offer true
  44. c2 take 1
  45. p2 offer true
  46. c1 take 2

二.源码

SynchronousQueue构造方法

  1. public SynchronousQueue() {
  2. this(false);
  3. }
  4. public SynchronousQueue(boolean fair) {
  5. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  6. }

take

  1. public E take() throws InterruptedException {
  2. E e = transferer.transfer(null, false, 0);
  3. if (e != null)
  4. return e;
  5. }
  6. E transfer(E e, boolean timed, long nanos) {
  7. SNode s = null;
  8. // REQUEST:消费,DATA:生产
  9. int mode = (e == null) ? REQUEST : DATA;
  10. for (;;) {
  11. SNode h = head;
  12. if (h == null || h.mode == mode) {
  13. // 入栈
  14. if (casHead(h, s = snode(s, e, h, mode))) {
  15. SNode m = awaitFulfill(s, timed, nanos);
  16. return (E) ((mode == REQUEST) ? m.item : s.item);
  17. }
  18. }
  19. }
  20. }
  21. SNode awaitFulfill(SNode s, boolean timed, long nanos) {
  22. Thread w = Thread.currentThread();
  23. int spins = (shouldSpin(s) ?
  24. (timed ? maxTimedSpins : maxUntimedSpins) : 0);
  25. for (;;) {
  26. SNode m = s.match;
  27. if (m != null)
  28. // 4.唤醒返回
  29. return m;
  30. if (spins > 0)
  31. // 1.自旋
  32. spins = shouldSpin(s) ? (spins-1) : 0;
  33. else if (s.waiter == null)
  34. // 2.设置线程
  35. s.waiter = w;
  36. else if (!timed)
  37. // 3.阻塞
  38. LockSupport.park(this);
  39. else if (nanos > spinForTimeoutThreshold)
  40. LockSupport.parkNanos(this, nanos);
  41. }
  42. }

offer

  1. public boolean offer(E e) {
  2. if (e == null) throw new NullPointerException();
  3. return transferer.transfer(e, true, 0) != null;
  4. }
  5. E transfer(E e, boolean timed, long nanos) {
  6. SNode s = null;
  7. // REQUEST:消费,DATA:生产
  8. int mode = (e == null) ? REQUEST : DATA;
  9. for (;;) {
  10. SNode h = head;
  11. if (!isFulfilling(h.mode)) {
  12. // 入栈
  13. if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
  14. for (;;) {
  15. SNode m = s.next;
  16. SNode mn = m.next;
  17. if (m.tryMatch(s)) {
  18. // 设置head
  19. casHead(s, mn);
  20. return (E) ((mode == REQUEST) ? m.item : s.item);
  21. }
  22. }
  23. }
  24. }
  25. }
  26. }
  27. boolean tryMatch(SNode s) {
  28. // this对象的match属性赋值
  29. if (match == null &&
  30. UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
  31. Thread w = waiter;
  32. if (w != null) {
  33. waiter = null;
  34. // 唤醒
  35. LockSupport.unpark(w);
  36. }
  37. return true;
  38. }
  39. return match == s;
  40. }