1.类图

image.png
从上面的UML图,观察到:

  1. **AbstractQueuedSynchronizer**继承了抽象类**AbstractOwnableSynchronizer****AbstractOwnableSynchronizer**是一个可以由线程以独占方式拥有的同步器。**AbstractQueuedSynchronizer**用虚拟队列的方式来管理线程中锁的获取与释放,同时也提供了各种情况下的线程中断。这个类虽然提供了默认的同步实现,但是获取锁和释放锁的实现被定义为抽象方法,由子类实现。这样做的目的是使开发人员可以自由定义锁的获取与释放方式。
  2. **ReentrantLock****ReentrantReadWriteLock****CountDownLatch****Semaphore****ThreadPoolExecutor**都有内部类继承了**AbstractQueuedSynchronizer**通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。
  3. ReentrantReadWriteLockLock的另一种实现方式,ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。与排他锁相比,能提供更高的并发性。

    2.AbstractOwnableSynchronizer

    AOS为创建锁和相关同步器提供了基础。只有一个属性和对应的get/set方法,用于设置和获取独占锁的拥有者线程。

    1. private transient Thread exclusiveOwnerThread;
    2. /**
    3. * Sets the thread that currently owns exclusive access.
    4. * A {@code null} argument indicates that no thread owns access.
    5. * This method does not otherwise impose any synchronization or
    6. * {@code volatile} field accesses.
    7. * @param thread the owner thread
    8. */
    9. protected final void setExclusiveOwnerThread(Thread thread) {
    10. exclusiveOwnerThread = thread;
    11. }
    12. /**
    13. * Returns the thread last set by {@code setExclusiveOwnerThread},
    14. * or {@code null} if never set. This method does not otherwise
    15. * impose any synchronization or {@code volatile} field accesses.
    16. * @return the owner thread
    17. */
    18. protected final Thread getExclusiveOwnerThread() {
    19. return exclusiveOwnerThread;
    20. }

    3.AbstractQueuedSynchronizer

    AQS 内部维护了一个 FIFO 队列来管理锁。线程首先会尝试获取锁,如果失败,则将当前线程以及等待状态等信息包成一个 Node 节点放入同步队列阻塞起来,当持有锁的线程释放锁时,就会唤醒队列中的后继线程。

总体来说,AQS提供的public方法有三类:

  • 实现锁的获取和释放(8个)
    • acquire
    • acquireInterruptibly
    • tryAcquireNanos
    • release
    • acquireShared
    • acquireSharedInterruptibly
    • tryAcquireSharedNanos
    • releaseShared
  • 提供队列检查方法
    • hasQueuedThreads
    • hasContended
    • getFirstQueuedThread
    • isQueued
    • hasQueuedPredecessors
  • 提供监控方法
    • getQueueLength
    • getQueuedThreads
    • getExclusiveQueuedThreads
    • getSharedQueuedThreads

**AbstractQueuedSynchronizer**提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,AQS内部类**Node**和内部类**ConditionObject**,其操作都是基于这2个内部类实现的。

3.1.内部类Node

AQS是通过内部类Node来实现FIFO队列。等待队列是“ CLH”(Craig,Landin和Hagersten)锁队列的变体。CLH锁通常用于自旋锁,在这里我们将它们用于阻塞同步器,但是使用相同的基本策略,将有关线程的一些控制信息保存在其前任节点中。

源码

  1. static final class Node {
  2. // 表明节点在共享模式下等待的标记
  3. static final Node SHARED = new Node();
  4. // 表明节点在独占模式下等待的标记
  5. static final Node EXCLUSIVE = null;
  6. // 表征等待线程已取消的
  7. static final int CANCELLED = 1;
  8. // 表征需要唤醒后续线程
  9. static final int SIGNAL = -1;
  10. // 表征线程正在等待触发条件(condition)
  11. static final int CONDITION = -2;
  12. // 表征下一个acquireShared应无条件传播
  13. static final int PROPAGATE = -3;
  14. /**
  15. * SIGNAL: 当前节点释放state或者取消后,将通知后续节点竞争state。
  16. * CANCELLED: 线程因timeout和interrupt而放弃竞争state,当前节点将与state彻底拜拜
  17. * CONDITION: 表征当前节点处于条件队列中,它将不能用作同步队列节点,直到其waitStatus被重置为0
  18. * PROPAGATE: 表征下一个acquireShared应无条件传播
  19. * 0: None of the above
  20. */
  21. volatile int waitStatus;
  22. // 前继节点
  23. volatile Node prev;
  24. // 后继节点
  25. volatile Node next;
  26. // 持有的线程
  27. volatile Thread thread;
  28. // 链接下一个等待条件触发的节点
  29. Node nextWaiter;
  30. // 返回节点是否处于Shared状态下
  31. final boolean isShared() {
  32. return nextWaiter == SHARED;
  33. }
  34. // 返回前继节点
  35. final Node predecessor() throws NullPointerException {
  36. Node p = prev;
  37. if (p == null)
  38. throw new NullPointerException();
  39. else
  40. return p;
  41. }
  42. // Shared模式下的Node构造函数
  43. Node() {
  44. }
  45. // 用于addWaiter
  46. Node(Thread thread, Node mode) {
  47. this.nextWaiter = mode;
  48. this.thread = thread;
  49. }
  50. // 用于Condition
  51. Node(Thread thread, int waitStatus) {
  52. this.waitStatus = waitStatus;
  53. this.thread = thread;
  54. }
  55. }

waitStatus

节点的内部属性**waitStatus**字段跟踪线程是否应阻塞。
CANCELLED(1):已取消,节点不可用。由于超时或中断,该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。
SIGNAL(-1):后继线程需要释放。当前节点释放或取消时必须unpark its successor
CONDITION(-2):线程正在条件等待。该节点当前在条件队列中。在传输之前,它不会用作同步队列节点,此时状态将设置为0。
PROPAGATE(-3):下一个acquireShared应该无条件传播。releaseShared应该传播到其他节点。在doReleaseShared中对此进行了设置(仅适用于头节点),以确保传播继续进行,即使此后进行了其他操作也是如此。
0:非以上状态时,设置为0。

waitStatu=1,代表不可用,所以waitStatus只需要检查其正负符号即可,不用太多关注特定值。

3.2.如何操作队列

head和tail为AQS的两个成员变量,也就是通过这两个成员变量来操作整个队列(双向链表)。

这个双向链表是如何表示的?

image.png

  1. private transient volatile Node head;
  2. private transient volatile Node tail;

入队(如何添加一个节点)

添加一个节点就是直接将该节点添加到尾部,并且变更AQS中tail节点的引用。

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. // 队列非空
  6. if (pred != null) {
  7. node.prev = pred;
  8. if (compareAndSetTail(pred, node)) {
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. //当前尾节点为null,也就是说当前没有一个节点,那么就会调用enq(Node)方法。
  14. enq(node);
  15. return node;
  16. }
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法为什么返回的是前驱节点?如下图所示,该方法的调用仅3处,且仅1处关心方法返回值,判断前继节点的waitStatus是否是大于0,即CANCELLED
image.png
image.png

出队

/**
     * Wakes up node's successor, if one exists.(唤醒后继者)
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

需要unpark的线程是被后继节点持有的。从尾结点开始找到non-cancelled successor,将其持有的thread unpark。

3.3.线程的阻塞和唤醒

AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和LockSupport.unpark()实现线程的阻塞和唤醒的。LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可。

public class LockSupportDemo {
    public static void main(String[] args) {
        LockSupport.park();
        System.out.println("block.");

    }
}

运行这段代码,主线程一直阻塞。因为许可默认是被占用的,调用park()时获取不到许可,所以进入阻塞状态。

public class LockSupportDemo {
    public static void main(String[] args) {
        LockSupport.unpark(Thread.currentThread());
        LockSupport.park();
        System.out.println("block.");

    }
}

先释放许可,再获取许可,主线程能够正常终止。

LockSupport是不可重入的,如果一个线程连续2次调用LockSupport.park(),那么该线程一定会一直阻塞下去。

线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException。

3.4.内部类ConditionObject

实现了java.util.concurrent.locks.Condition接口。Condition的目的主要是替代Object的wait,notify,notifyAll方法的,它是基于Lock实现的.
Condition.png
ConditionObject维护两个队列:

  • Condition队列,表示等待的队列,其waitStatus=Node.Condition,由firstWaiter和lastWaiter两个属性操控.
  • Sync队列,表示可以竞争锁的队列,这个跟AQS一致,waitStatus=0;

await()方法就是把当前线程创建一个Node加入Condition队列,接着就一直循环查其在不在Sync队列,如果当前节点在Sync队列里了,就可以竞争锁,恢复运行了。
signal()方法就是把某个节点的nextWaiter设为null,再把其从Condition队列转到Sync队列。

3.5.同步状态State

AQS维护了一个volatile语义(支持多线程下的可见性)的共享资源变量state和一个FIFO线程等待队列(多线程竞争state被阻塞时会进入此队列)。

共享资源变量state,它是int数据类型的,其访问方式有3种:

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)

上述3种方式均是原子操作,其中compareAndSetState()的实现依赖于Unsafe的compareAndSwapInt()方法。

    private volatile int state;

    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    // 如果当前状态值等于预期值,则原子地将同步状态设置为给定的更新值
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

....

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }
    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

一个 Java 对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移。用于在后面的 compareAndSwapInt中,去根据偏移量找到对象在内存中的具体位置,所以 stateOffset 表示 state 这个字段在 AQS 类的内存中相对于该类首地址的偏移量

state=0表示没有被占用
image.png

资源的共享方式分为2种:

  • 独占式(Exclusive):只有单个线程能够成功获取资源并执行,如ReentrantLock。
  • 共享式(Shared):多个线程可成功获取资源并执行,如Semaphore/CountDownLatch等。

    3.7.如何自定义同步器

    AQS将大部分的同步逻辑均已经实现好,自定义同步器只需要实现state的获取(acquire)和释放(release)的逻辑代码就可以,主要包括下面方法:

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法,因为一般自定义同步器要么是独占方法,要么是共享方法,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。当然,AQS也支持子类同时实现独占和共享两种模式,如ReentrantReadWriteLock。

4.锁的获取和释放

4.1.独占模式

4.1.1.获取锁

  • public final void acquire(int arg)
  • public final void acquireInterruptibly(int arg)
  • public final boolean tryAcquireNanos(int arg, long nanosTimeout)

image.png
步骤:

  1. tryAcquire:尝试获取一次锁,如果成功,返回。
  2. addWaiter:当前线程包装成Node插入到队列中。
  3. acquireQueued:会在队列中会检测是否为head的直接后继,并尝试获取锁,如果获取失败,则阻塞当前线程,直至被”释放锁的线程”唤醒或者被中断,随后再次尝试获取锁,如此反复

    �4.1.2.释放锁

    • public final boolean release(int arg)

image.png
步骤:

  1. 调用 tryRelease;
  2. 如果 tryRelease 返回 true 也就是独占锁被完全释放,唤醒后继线程。

    4.2.共享模式

    4.2.1.获取锁

    • public final void acquireShared(int arg)
    • public final void acquireSharedInterruptibly(int arg)
    • public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

image.png
步骤:

  1. 尝试获取锁,如果返回值为负值,代表获取失败;返回0代表获取成功,但是后续的获取共享锁会失败;返回正值,代表获取成功,且后续线程获取共享锁大概率也会成功。
  2. doAcquireShared获取共享锁。

image.png

image.png

�4.2.2.释放锁

  • public final boolean releaseShared(int arg)

image.png
重点看下doReleaseShared方法:
image.png

5.自旋锁

AQS是自旋锁:在等待唤醒的时候,经常会使用自旋(while(!cas()))的方式,不停地尝试获取锁,直到被其他线程获取成功

自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。

获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。

6.CLH算法

6.1.CLH锁

CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

7.参考文章

AbstractQueuedSynchronizer 源码分析 (基于Java 8)