重新回到这个acquire加锁的方法,如果tryAcquire加锁失败了就走第2步操作
image.png
第二步操作又可以分成两步,一个是addWaiter(Node.EXCLUSIVE),另一个是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

addWaiter,线程入队

addWaiter(Node.EXCLUSIVE)
这个EXCLUSIVE代表的就是排他性,独占锁,同一时间只能有一个线程获取到锁,此时是排他锁,独占锁。

  1. private Node addWaiter(Node mode) {
  2. //1.封装node
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. //2.
  6. Node pred = tail;
  7. if (pred != null) {
  8. node.prev = pred;
  9. if (compareAndSetTail(pred, node)) {
  10. pred.next = node;
  11. return node;
  12. }
  13. }
  14. //3
  15. enq(node);
  16. return node;
  17. }

1.Node node = new Node(Thread.currentThread(), mode);

  1. 将当前线程(抢锁的线程)封装成了一个Node,mode=EXCLUSIVE
  2. 具体看一下Node
    1. volatile int waitStatus; :一个线程无法获取到锁的时候,就会阻塞住,当作线程挂起,但是阻塞状态又细分为很多不同的阻塞状态CANCELLED、SIGNAL、CONDITION、PROPAGATE
    2. volatile Node prev;:一个节点可以有上一个节点、prev指针
    3. volatile Node next;:一个节点可以有下一个节点、next指针
    4. volatile Thread thread;:保存当前node的Thread
    5. Node nextWaiter;:可以认为是下一个等待线程

获取不到锁,处于等待状态的线程,会封装为一个Node,而且有指针,最后多个处于阻塞等待状态的线程可以封装为一个Node双向链表。

  1. static final class Node {
  2. /** Marker to indicate a node is waiting in shared mode */
  3. static final Node SHARED = new Node();
  4. /** Marker to indicate a node is waiting in exclusive mode */
  5. static final Node EXCLUSIVE = null;
  6. /** waitStatus value to indicate thread has cancelled */
  7. static final int CANCELLED = 1;
  8. /** waitStatus value to indicate successor's thread needs unparking */
  9. static final int SIGNAL = -1;
  10. /** waitStatus value to indicate thread is waiting on condition */
  11. static final int CONDITION = -2;
  12. /**
  13. * waitStatus value to indicate the next acquireShared should
  14. * unconditionally propagate
  15. */
  16. static final int PROPAGATE = -3;
  17. volatile int waitStatus;
  18. volatile Node prev;
  19. volatile Node next;
  20. /**
  21. * The thread that enqueued this node. Initialized on
  22. * construction and nulled out after use.
  23. */
  24. volatile Thread thread;
  25. Node nextWaiter;
  26. /**
  27. * The synchronization state.
  28. */
  29. private volatile int state;

image.png

3.enq(node);队列初始化

  1. private Node enq(final Node node(这个node就是封装线程2node对象)) {
  2. for (;;) {
  3. //1
  4. Node t = tail;
  5. //2
  6. if (t == null) { // Must initialize
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else {
  10. node.prev = t;
  11. if (compareAndSetTail(t, node)) {
  12. t.next = node;
  13. return t;
  14. }
  15. }
  16. }
  17. }

第一轮循环

  1. Node t = tail; tail此时还是null,t=null
  2. if (t == null)这一步成立,随后通过cas操作unsafe.compareAndSwapObject(this, headOffset, null, update);,判断一下head变量是否为null,如果是null就new 了一个Node()并放到head指针上,执行tail = head,头尾相连。

image.png

第二轮循环

  1. Node t = tail; 此时tail指向HeadNode,t也就是指向了HeadNode

image.png

  1. node.prev = t; 线程2node的prev指针指向t指向的那个headNode

image.png

  1. compareAndSetTail(t, node);尝试比较tail的变量是否是t节点,如果是tail就指向线程2的node

image.png

  1. t.next = node;把那个创建的HeadNode节点的next指针和Node关联上,这俩形成一个双向链表

image.png

  1. return t;这个t实际上就是图中的HeadNode。到此,双向队列初始化完成

addWaiter结束后,线程2的Node就已经被加入到队列里面去了

acquireQueued,第二个线程尝试加锁或阻塞

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. //1
  7. final Node p = node.predecessor();
  8. //2
  9. if (p == head && tryAcquire(arg)) {
  10. setHead(node);
  11. p.next = null; // help GC
  12. failed = false;
  13. return interrupted;
  14. }
  15. //3
  16. if (shouldParkAfterFailedAcquire(p, node) &&
  17. parkAndCheckInterrupt())
  18. interrupted = true;
  19. }
  20. } finally {
  21. if (failed)
  22. cancelAcquire(node);
  23. }
  24. }

第二个线程加锁成功过程

  1. final Node p = node.predecessor();获取线程2Node的上一个节点,
  2. p == head && tryAcquire(arg);根据图中已经创建好的队列来看,Node上一个节点就是head指向的Node,此时p==head成立,接下来就要再次尝试获取一下锁tryAcquire(arg);
    1. 如果加锁成功,将线程2对应的node从队列中移除。

image.png
重新构造队列,
setHead(node);该方法把head的指针指向线程2的Node,同时把线程2的thread干掉设置为null,同时线程2Node的prev指针也与之前的HeadNode取消关联。
p.next = null; 原来的HeadNode的next指针与原来的线程2Node取消关联,这个时候HeadNode就孤立了,不被任何变量引用,因此就能被GC掉(也就是为什么注释里写了help GC的原因)
return interrupted;此时这个interrupted一定是一个false,因为已经被唤醒了线程的interrupt肯定不能是true
(疑问:执行了LockSupport.park代码是不会往下走了吗?同时这个park执行了后都改了线程的什么东西)

  1. setHead(node);
  2. private void setHead(Node node) {
  3. head = node;
  4. node.thread = null;
  5. node.prev = null;
  6. }
  7. p.next = null; // help GC
  8. failed = false;
  9. return interrupted;

第二个线程加锁失败过程

  1. 加锁失败进入shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()判断。判断一下,是否需要将当前线程挂起,阻塞等待。如果需要,使用park操作挂起当前线程

    shouldParkAfterFailedAcquire(p, node)

    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. int ws = pred.waitStatus;
    3. if (ws == Node.SIGNAL)
    4. /*
    5. * This node has already set status asking a release
    6. * to signal it, so it can safely park.
    7. */
    8. return true;
    9. if (ws > 0) {
    10. /*
    11. * Predecessor was cancelled. Skip over predecessors and
    12. * indicate retry.
    13. */
    14. do {
    15. node.prev = pred = pred.prev;
    16. } while (pred.waitStatus > 0);
    17. pred.next = node;
    18. } else {
    19. /*
    20. * waitStatus must be 0 or PROPAGATE. Indicate that we
    21. * need a signal, but don't park yet. Caller will need to
    22. * retry to make sure it cannot acquire before parking.
    23. */
    24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    25. }
    26. return false;
    27. }
  2. int ws = pred.waitStatus;,pred就是Head指向的node就是头节点,头节点这个watiStatus一开始的时候是个0

  3. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);进入这个else代码块,把这个pred的也就是这个头节点的waitStatus设置成SIGNAL。

image.png

  1. 返回false
  2. 又跳回上层acquireQueued的代码,这个代码中是个for死循环

    1. final Node p = node.predecessor();又拿一次那个headNode
    2. p == head && tryAcquire(arg);尝试加锁,又失败
    3. shouldParkAfterFailedAcquire(p, node)进入该方法做判断,这个时候ws == Node.SIGNAL判断成立,因此就返回true
      1. final boolean acquireQueued(final Node node, int arg) {
      2. boolean failed = true;
      3. try {
      4. boolean interrupted = false;
      5. for (;;) {
      6. final Node p = node.predecessor();
      7. if (p == head && tryAcquire(arg)) {
      8. setHead(node);
      9. p.next = null; // help GC
      10. failed = false;
      11. return interrupted;
      12. }
      13. if (shouldParkAfterFailedAcquire(p, node) &&
      14. parkAndCheckInterrupt())
      15. interrupted = true;
      16. }
      17. } finally {
      18. if (failed)
      19. cancelAcquire(node);
      20. }
      21. }

      parkAndCheckInterrupt(),挂起线程

      1. private final boolean parkAndCheckInterrupt() {
      2. //1
      3. LockSupport.park(this);
      4. //2
      5. return Thread.interrupted();
      6. }
  3. LockSupport.park(this);将线程进行挂起,必须有另外一个线程来对当前线程执行unpark操作,唤醒当前挂起的线程(为什么叫异步阻塞,这里的park就必须被别的线程所唤醒,这个过程就是异步的)

  4. Thread.interrupted(); 就是修改线程的interrupte标记位改为true,不让当前线程再运行了。

image.png

设置interrupted

第三个线程加锁失败过程

tryAcquire失败仍然是做,addWaiter(Node.EXCLUSIVE)和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

addWaiter

  1. private Node addWaiter(Node mode) {
  2. //1.封装node
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. //2.
  6. Node pred = tail;
  7. if (pred != null) {
  8. node.prev = pred;
  9. if (compareAndSetTail(pred, node)) {
  10. pred.next = node;
  11. return node;
  12. }
  13. }
  14. //3
  15. enq(node);
  16. return node;
  17. }
  1. 封装node
  2. Node pred = tail;

image.png

  1. pred != null 成立,执行node.prev = pred;

image.png

  1. compareAndSetTail(pred, node),由于pred保存的tail的引用,cas操作后就把tail指向了线程3的Node

image.png

  1. pred.next = node;在第2步的时候pred保存的tail指针,而原来tail指针指向的线程2的Node,因此pred就是线程2的Node,此时线程2Node的next指针指向线程3的Node指针

image.png

  1. return node;直接把线程3Node对象返回出去

    acquireQueued

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. final Node p = node.predecessor();
    7. if (p == head && tryAcquire(arg)) {
    8. setHead(node);
    9. p.next = null; // help GC
    10. failed = false;
    11. return interrupted;
    12. }
    13. if (shouldParkAfterFailedAcquire(p, node) &&
    14. parkAndCheckInterrupt())
    15. interrupted = true;
    16. }
    17. } finally {
    18. if (failed)
    19. cancelAcquire(node);
    20. }
    21. }

    第一次循环

  2. final Node p = node.predecessor();拿到上一个节点,线程3Node的上一个节点就是线程2Node,此时p指向线程2Node

image.png

  1. p == head && tryAcquire(arg);首先p ==head就不成立,意味着如果上一个节点不是head那就意味着还有别的线程在等待加锁,因此后面的tryAcquire压根就不让他走
  2. shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()

    1. shouldParkAfterFailedAcquire(p, node) 过程是一样的,pred指向线程2的Node,直接走的else块,把ws设置成signal
      1. image.png
        1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        2. int ws = pred.waitStatus;
        3. if (ws == Node.SIGNAL)
        4. /*
        5. * This node has already set status asking a release
        6. * to signal it, so it can safely park.
        7. */
        8. return true;
        9. if (ws > 0) {
        10. /*
        11. * Predecessor was cancelled. Skip over predecessors and
        12. * indicate retry.
        13. */
        14. do {
        15. node.prev = pred = pred.prev;
        16. } while (pred.waitStatus > 0);
        17. pred.next = node;
        18. } else {
        19. /*
        20. * waitStatus must be 0 or PROPAGATE. Indicate that we
        21. * need a signal, but don't park yet. Caller will need to
        22. * retry to make sure it cannot acquire before parking.
        23. */
        24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        25. }
        26. return false;
        27. }
  3. 又跳回上层acquireQueued的代码,这个代码中是个for死循环,shouldParkAfterFailedAcquire(p, node)此时由于线程2Node的ws是SIGNAL因此这个时候这个方法会返回true

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. final Node p = node.predecessor();
    7. if (p == head && tryAcquire(arg)) {
    8. setHead(node);
    9. p.next = null; // help GC
    10. failed = false;
    11. return interrupted;
    12. }
    13. if (shouldParkAfterFailedAcquire(p, node) &&
    14. parkAndCheckInterrupt())
    15. interrupted = true;
    16. }
    17. } finally {
    18. if (failed)
    19. cancelAcquire(node);
    20. }
    21. }
  4. parkAndCheckInterrupt();直接挂起当前线程3。