阻塞队列模型

AQS.png

同步流程

AQS同步流程图.png

条件队列

Condition队列模型

Condition队列模型.png

公共方法(封装好的API)

acquire

  • 独占式获取(Acquires in exclusive mode, ignoring interrupts)
  1. /**
  2. * 先后调用tryAcquire()和acquireQueued()方法
  3. * tryAcquire()是模板方法,用户自定义扩展,自定义获取状态的实现
  4. * acquireQueued()是在状态获取失败情况下调用的,addWaiter()将节点追加到独占式队列尾部
  5. * 将获取失败的节点添加到等待队列中
  6. */
  7. public final void acquire(int arg) {
  8. if (!tryAcquire(arg) &&
  9. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  10. selfInterrupt();
  11. }

acquireShared

  • 共享式获取(Acquires in shared mode, ignoring interrupts.
  1. /**
  2. * 同样是自定义的模板方法tryAcquireShared()
  3. *
  4. */
  5. public final void acquireShared(int arg) {
  6. if (tryAcquireShared(arg) < 0)
  7. doAcquireShared(arg);
  8. }

hasQueuedPredecessors

  • 判断是否有前驱节点
  1. public final boolean hasQueuedPredecessors() {
  2. // The correctness of this depends on head being initialized
  3. // before tail and on head.next being accurate if the current
  4. // thread is first in queue.
  5. Node t = tail; // Read fields in reverse initialization order
  6. Node h = head;
  7. Node s;
  8. return h != t &&
  9. ((s = h.next) == null || s.thread != Thread.currentThread());
  10. }

getQueueLength

  • 获取同步队列长度
  • 从尾结点开始遍历,仅节点存在线程才有效
  1. public final int getQueueLength() {
  2. int n = 0;
  3. for (Node p = tail; p != null; p = p.prev) {
  4. if (p.thread != null)
  5. ++n;
  6. }
  7. return n;
  8. }

getQueuedThreads

  • 获取同步线程队列
  • 从尾结点开始遍历
  1. public final Collection<Thread> getQueuedThreads() {
  2. ArrayList<Thread> list = new ArrayList<Thread>();
  3. for (Node p = tail; p != null; p = p.prev) {
  4. Thread t = p.thread;
  5. if (t != null)
  6. list.add(t);
  7. }
  8. return list;
  9. }

getExclusiveQueuedThreads

  • 获取独占式同步线程队列
  1. public final Collection<Thread> getExclusiveQueuedThreads() {
  2. ArrayList<Thread> list = new ArrayList<Thread>();
  3. for (Node p = tail; p != null; p = p.prev) {
  4. if (!p.isShared()) {
  5. Thread t = p.thread;
  6. if (t != null)
  7. list.add(t);
  8. }
  9. }
  10. return list;
  11. }

getSharedQueuedThreads

  • 获取共享式同步线程队列
  1. public final Collection<Thread> getSharedQueuedThreads() {
  2. ArrayList<Thread> list = new ArrayList<Thread>();
  3. for (Node p = tail; p != null; p = p.prev) {
  4. if (p.isShared()) {
  5. Thread t = p.thread;
  6. if (t != null)
  7. list.add(t);
  8. }
  9. }
  10. return list;
  11. }

release

  • 独占式释放
  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

模板方法(自定义实现)

tryAcquire

  • 尝试独占式获取
  1. protected boolean tryAcquire(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

tryRelease

  • 尝试独占式释放
  1. protected boolean tryRelease(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

tryAcquireShared

  • 尝试共享获取
  1. protected int tryAcquireShared(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

tryReleaseShared

  • 尝试共享释放
  1. protected boolean tryReleaseShared(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

isHeldExclusively

  1. protected boolean isHeldExclusively() {
  2. throw new UnsupportedOperationException();
  3. }

私有方法(同步队列核心实现)

acquireQueued

  • 在队列中尝试获取状态(独占队列)
  • 成功获取状态的前提是前驱节点是头结点,然后执行自定义的tryAcquire方法去尝试获取状态
  • 注:在acquire方法中会优先调用tryAcquire方法在调用此方法的逻辑,非公平锁在这点上可以提前忽略p == head的条件而进行一次抢占式获取。
  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

doAcquireShared

  • 在队列中尝试获取状态(共享队列)
  • 在循环中,有个setHeadAndPropagate的方法,目的在于设置新的头结点,并传播给后续的Node.SHARED模式的节点
  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. if (r >= 0) {
  11. setHeadAndPropagate(node, r);
  12. p.next = null; // help GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }

enq

  • 添加到同步队列
  1. /**
  2. * 循环CAS
  3. */
  4. private Node enq(final Node node) {
  5. for (;;) {
  6. Node t = tail;
  7. if (t == null) { // Must initialize
  8. if (compareAndSetHead(new Node()))
  9. tail = head;
  10. } else {
  11. node.prev = t;
  12. if (compareAndSetTail(t, node)) {
  13. t.next = node;
  14. return t;
  15. }
  16. }
  17. }
  18. }
  19. private Node addWaiter(Node mode) {
  20. Node node = new Node(Thread.currentThread(), mode);
  21. // 以下代码段同enq()一样,这里只是尝试快速添加到尾结点,如若失败再通过enq循环CAS
  22. Node pred = tail;
  23. if (pred != null) {
  24. node.prev = pred;
  25. if (compareAndSetTail(pred, node)) {
  26. pred.next = node;
  27. return node;
  28. }
  29. }
  30. enq(node);
  31. return node;
  32. }

setHead

  • 替换头结点
  • 一般用在抢占队列中,抢夺锁成功的节点将成为新的头结点
  1. private void setHead(Node node) {
  2. head = node;
  3. node.thread = null;
  4. node.prev = null;
  5. }

unparkSuccessor

  • 唤醒后继节点,如果后继节点不存在或等待状态大于0,那么会尝试从尾结点开始往前遍历,直到找到合适的节点并将其唤醒。
  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * 将当前节点的等待状态设置为0
  4. */
  5. int ws = node.waitStatus;
  6. if (ws < 0)
  7. compareAndSetWaitStatus(node, ws, 0);
  8. /*
  9. * 如果后继节点不存在或等待状态大于0,那么会尝试从尾结点开始往前遍历
  10. * 直到找到合适的节点并将其唤醒。
  11. */
  12. Node s = node.next;
  13. if (s == null || s.waitStatus > 0) {
  14. s = null;
  15. for (Node t = tail; t != null && t != node; t = t.prev)
  16. if (t.waitStatus <= 0)
  17. s = t;
  18. }
  19. if (s != null)
  20. LockSupport.unpark(s.thread);
  21. }

doReleaseShared

  • 发出释放信号并确保传播
  • 用于共享队列(非独占式队列)
  • 应用于共享队列的操作中
  • @see setHeadAndPropagate(Node node, int propagate)
  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  8. continue; // loop to recheck cases
  9. //唤醒后继节点
  10. unparkSuccessor(h);
  11. }
  12. else if (ws == 0 &&
  13. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  14. continue; // loop on failed CAS
  15. }
  16. if (h == head) // loop if head changed
  17. break;
  18. }
  19. }

setHeadAndPropagate

  • 设置头结点并传播
  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. setHead(node);
  4. /*
  5. * 如果propagate大于0(表示传播)或者头结点(此时为node)是空或等待状态小于0,
  6. * 且后继节点不存在或是共享队列
  7. * 那么进行释放信号的传播
  8. */
  9. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  10. (h = head) == null || h.waitStatus < 0) {
  11. Node s = node.next;
  12. if (s == null || s.isShared())
  13. doReleaseShared();
  14. }
  15. }

shouldParkAfterFailedAcquire

  • 在将线程(失败获取状态的node节点对应的线程)park前需要判断前驱节点是否为SIGNAL状态
  • pred.waitStatus == Node.SIGNAL表明前驱节点的后继节点(也就是当前节点)是阻塞的,在前驱节点释放的时候可以根据waitStatus判断是否有阻塞节点,如果前驱节点的状态不是Node.SIGNAL,表明当前节点的位置不是安全的(下一次抢夺状态的时候无法被唤醒),同时如果当前节点的前驱节点的状态是CANCELLED(中断或超时),应该循环判断将前驱节点前移,直到大于0(CANCELLED是大于0的一种)
  • 要么就尝试将前驱节点状态更改为Node.SIGNAL,然后才可以”safely park”
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. if (ws == Node.SIGNAL)
  4. /*
  5. * This node has already set status asking a release
  6. * to signal it, so it can safely park.
  7. */
  8. return true;
  9. if (ws > 0) {
  10. /*
  11. * Predecessor was cancelled. Skip over predecessors and
  12. * indicate retry.
  13. */
  14. do {
  15. node.prev = pred = pred.prev;
  16. } while (pred.waitStatus > 0);
  17. pred.next = node;
  18. } else {
  19. /*
  20. * waitStatus must be 0 or PROPAGATE. Indicate that we
  21. * need a signal, but don't park yet. Caller will need to
  22. * retry to make sure it cannot acquire before parking.
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  25. }
  26. return false;
  27. }

Condition

一个同步器可以构造多个ConditionObject,用于不同场景下的条件阻塞。

await

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

signal

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignal(first);
  7. }
  8. private void doSignal(Node first) {
  9. do {
  10. if ( (firstWaiter = first.nextWaiter) == null)
  11. lastWaiter = null;
  12. first.nextWaiter = null;
  13. } while (!transferForSignal(first) &&
  14. (first = firstWaiter) != null);
  15. }
  16. final boolean transferForSignal(Node node) {
  17. /*
  18. * If cannot change waitStatus, the node has been cancelled.
  19. */
  20. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  21. return false;
  22. /*
  23. * Splice onto queue and try to set waitStatus of predecessor to
  24. * indicate that thread is (probably) waiting. If cancelled or
  25. * attempt to set waitStatus fails, wake up to resync (in which
  26. * case the waitStatus can be transiently and harmlessly wrong).
  27. */
  28. Node p = enq(node);
  29. int ws = p.waitStatus;
  30. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  31. LockSupport.unpark(node.thread);
  32. return true;
  33. }

节点属性的操作

  1. private static final Unsafe unsafe = Unsafe.getUnsafe();
  2. private static final long stateOffset;
  3. private static final long headOffset;
  4. private static final long tailOffset;
  5. private static final long waitStatusOffset;
  6. private static final long nextOffset;
  7. static {
  8. try {
  9. stateOffset = unsafe.objectFieldOffset
  10. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  11. headOffset = unsafe.objectFieldOffset
  12. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  13. tailOffset = unsafe.objectFieldOffset
  14. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  15. waitStatusOffset = unsafe.objectFieldOffset
  16. (Node.class.getDeclaredField("waitStatus"));
  17. nextOffset = unsafe.objectFieldOffset
  18. (Node.class.getDeclaredField("next"));
  19. } catch (Exception ex) { throw new Error(ex); }
  20. }
  21. /**
  22. * CAS head field. Used only by enq.
  23. */
  24. private final boolean compareAndSetHead(Node update) {
  25. return unsafe.compareAndSwapObject(this, headOffset, null, update);
  26. }
  27. /**
  28. * CAS tail field. Used only by enq.
  29. */
  30. private final boolean compareAndSetTail(Node expect, Node update) {
  31. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  32. }
  33. /**
  34. * CAS waitStatus field of a node.
  35. */
  36. private static final boolean compareAndSetWaitStatus(Node node,
  37. int expect,
  38. int update) {
  39. return unsafe.compareAndSwapInt(node, waitStatusOffset,
  40. expect, update);
  41. }
  42. /**
  43. * CAS next field of a node.
  44. */
  45. private static final boolean compareAndSetNext(Node node,
  46. Node expect,
  47. Node update) {
  48. return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  49. }