上文中(Lock 与 队列同步器 AbstractQueueSynchronizer) 详细的描述了同步器的模式,自定义同步器的方式以及其底层实现-同步队列,本文将着重讲解独占式同步状态的获取与释放的原理。

1、同步状态的获取

1.1 从 acquire 方法说起

上文中定义的Metux锁中lock() 方法的实现使用的同步对象的 acquire() 方法,此方法是AQS的实现方法,其内部调用如下

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) && // 尝试获取锁
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败则构建独占式 Node 同时添加同步队列尾部
  4. selfInterrupt();
  5. }

可以看到acquire() 方法主要的逻辑是调用模板方法 tryAcquire() 方法,此方法是自定义同步组件中需要重写的方法,返回true标识获取成功,否则获取失败。如果获取同步状态失败,那么将会通过addWaiter() 方法构建新的节点,此节点会被添加到同步队列的尾部,最后调用acquireQueued方法,使得该节点以”死循环”的方式(其实本质是等待、通知) 的不断的尝试获取同步状态。

1.2 tryAcquire 方法

此方法是我们自定义同步组件需要实现的方法,在AQS中并未将此方法设置为抽象方法,仅仅抛出UnsupportedOperationException异常 ,这是为什么呢?其实很简单,上文中提到的需要重写的5个方法中同步状态的获取分为两类,分别独占式与共享式,如果我们需要独占式的锁,仅需要重写独占式相关的方法即可,反之亦然,如果将方法设置为抽象方法,那么在我们实现独占式同步状态器的时候也需要实现共享式同步状态的方法,这显然有点冗余的。

上文中实现 Metux 中的Sync组件实现的tryAcquire() 方法的代码如下

    /** 独占式获取同步状态,需要使用CAS 设置同步状态 */
    @Override
    protected boolean tryAcquire(int arg) {
      if (compareAndSetState(0, 1)) {
        Thread currentThread = Thread.currentThread();
        setExclusiveOwnerThread(currentThread);
        return true;
      }
      return false;
    }

1.3 addWaiter() 方法

addWaiter()方法的主要作用是创建一个新的Node, 然后将该节点通过CAS的方式添加到同步队列的尾部,该节点持有当前线程的引用。enq()方法则是不断的循环,确保将节点设置在同步队列的尾部。

image.png

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }


    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

1.4 acquireQueued() 方法

节点进入同步队列之后,就会进入一个自旋的过程,每个节点都在不断的循环进行自我检查,当条件满足的时候,就获取了同步状态,从自旋过程中退出,否则依旧保留在同步队列中,继续处于阻塞状态,直到线程中断或者前驱节点释放同步状态。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取到前驱结点
                final Node p = node.predecessor();
                // 如果前驱节点是头结点,并尝试获取同步状态成功,则将当前节点设置为头结点同时退出自旋状态
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

那么为什么设置只有其前驱节点是头节点才能尝试获取同步状态呢?原因有二,首先头结点是获取到同步状态的节点,头结点在释放同步状态之后,后继节点被唤醒时需要检查自己的前驱节点是否是头结点;其二是使得节点的释放规范符合FIFO,所以在获取同步状态的时候则需要检查自己的前驱节点是否是头结点,也可以避免其他节点过早地唤醒,并发的尝试获取同步状态。

image.png

可以看到,前驱节点为头结点并且获取到同步状态,则会将当前节点设置为头结点,然后从acquire(int arg) 方法中返回,当线程成功获取到同步状态,对于Lock而言,则是获取到锁。

2、同步状态的释放

当前线程获取到同步状态并且执行相应的逻辑之后,则需要时释放同步状态,是的后续节点能够继续尝试获取同步状态,通过调用AQS的release(int arg)方法可以释放同步状态,该方法的主要作用是释放同步状态并且通过 unparkSuccessor 方法唤醒后继节点获取通知状态。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

3、总结

在同步器尝试获取同步状态的时候,如果获取成功,则直接返回,否则会构建一个节点添加到AQS维护的同步队列中进行自旋获取,自旋结束的条件是前驱节点为头结点并且获取到同步状态。在释放同步状态的时候回通过同步器调用tryRelease() 方法释放同步状态,然后唤醒后继节点尝试获取同步状态。