AQS

简介

AQS全称为AbstractQueuedSynchronizer,这个类在concurrent包下,AQS是一个用来构建锁和同步器的框架,像ReentrantLock,Semaphore,FutureTask都是基于AQS,我们也可以使用AQS构造出符合我们需求的同步器。

原理

AQS的思想是如果被请求的共享资源空闲,则将请求资源的线程设置诶有效工作线程,把当前资源设置为Exclusive状态。

但如果被请求资源被占用,则需要将该线程放入阻塞队列并阻塞该线程,当锁释放时同时唤醒阻塞队列并进行锁分配。

这个队列是由CLH虚拟双向队列,之所以是虚拟表示不存在队列的实例,而仅仅是同过节点之间互相关联来实现队列。队列中的节点是由线程封装而来。

AQS模板方法

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方法为:

  1. 使用者继承AbstractQueuedSynchronizer并重写指定方法。
  2. 将AQS组合在自定义同步组件实现,并调用方法

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

  1. isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
  2. tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
  3. tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
  4. tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  5. tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

AQS 常用组件:

Semaphore(信号量)

Semaphore用于线程获取某个资源的线程数量,也就是互斥量。执行acquire方法若semaphore里没有permit则阻塞等待,直到调用release方法增加一个permit这是会唤醒释放一个正在阻塞的线程。

Semaphore有两种模式,公平模式和非公平模式

  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }
  4. public Semaphore(int permits, boolean fair) {
  5. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  6. }

CountDownLatch (倒计时器)

CountDownLatch 可以让一个线程等待,直至所有线程都执行了countdown方法,再全部往下执行。

CountDownLatch原理

CountDownLatch是共享锁的一种实现,它的构造AQS的state值为count个也就是我们期望计数的个数。当线程使用Countdown方法其实调用了内部的tryReleaseShared方法以CAS的操作使state减一。直到计数器为0,然后被阻塞的进程会继续执行。

CountDownLatch的两种典型用法

  1. 在启动一个服务时,主线程需要等待多个组件加载完毕,之后再运行。
  2. 多个线程共享一个countdownLatch对象,并且都调用await()方法,这样把多个线程放在起点,然后主线程再使用countdown让他们在同一时刻开始,以此来执行任务最大并发数。

CountDownLatch常见面试题

  1. CountDownLatch简介以及原理
  2. CountDownLatch和CyclicBarrier不同之处
  3. CountDownLatch使用的场景
  4. CountDownLatch类中的方法

CyclicBarrier(循环栅栏)

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CyclicBarrier构造函数

  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }
  4. public CyclicBarrier(int parties, Runnable barrierAction) {
  5. if (parties <= 0) throw new IllegalArgumentException();
  6. this.parties = parties;
  7. this.count = parties;
  8. this.barrierCommand = barrierAction;
  9. }

其中,parties就代表了有拦截的线程的数量,当拦截的线程数量达到这个值就打开栅栏,让所有线程通过。

CyclicBarrier使用实例

  1. public class SemaphoreExample1 {
  2. // 请求的数量
  3. private static final int threadCount = 550;
  4. public static CyclicBarrier barrierAction = new CyclicBarrier(5, () -> {
  5. System.out.println("barrierAction");
  6. });
  7. public static void main(String[] args) throws InterruptedException {
  8. ExecutorService threadPool = Executors.newFixedThreadPool(100);
  9. for (int i = 0; i < threadCount; i++) {
  10. final int threadnum = i;
  11. Thread.sleep(100);
  12. threadPool.execute(() -> {
  13. try {
  14. test(threadnum);
  15. } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
  16. // TODO Auto-generated catch block
  17. e.printStackTrace();
  18. }
  19. });
  20. }
  21. threadPool.shutdown();
  22. }
  23. public static void test(int threadnum) throws InterruptedException, BrokenBarrierException, TimeoutException {
  24. System.out.println(threadnum+"ready");
  25. barrierAction.await();
  26. System.out.println(threadnum+"finish");
  27. }
  28. }

运行结果:

  1. 0ready
  2. 1ready
  3. 2ready
  4. 3ready
  5. 4ready
  6. barrierAction
  7. 4finish
  8. 3finish
  9. 2finish
  10. 1finish
  11. 0finish
  12. 5ready
  13. 6ready
  14. 7ready
  15. 8ready
  16. 9ready
  17. barrierAction
  18. 5finish
  19. 6finish
  20. 7finish
  21. 8finish
  22. 9finish
  23. ....

CyclicBarrier等待的原理

CyclicBarrier内部通过一个count变量作为计数器,count的初始值为parties属性的初始化值,每当一个线程到了栅栏就将计数器减一,如果count值为0表示这是这一轮最后一个线程到达栅栏,就尝试执行构造方法中输入的任务。

CyclicBarrier和CountdownLatch的区别

  1. CyclicBarrier能够循环使用,并且提供reset功能,而CountDownLatch只能使用一次。
  2. 对于CountDownLatch来说,重点是一个或多个等待,而其他N个线程负责计数,等待和计数是可以分开的,完成计数的线程可以根据需要继续运行也可以终止运行。而对于CyclicBarrier重点是多个线程一边计数一边等待,在满足计数条件后,多个线程继续执行。

ReentrantReadWriteLock

ReentrantReadWriteLock可以保证多个线程可以同时读,两个读操作之间不互斥,在读操作远大于写操作时,读写锁能发挥明显的作用。

AQS FairSync:公平可重入锁逻辑

  1. 进入直接调这条语句
  1. if (!tryAcquire(arg) &&
  2. // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
  4. selfInterrupt();
  5. }
  1. 先tryAcquire(),如果直接成功了,也就不需要进队列了```java protected final boolean tryAcquire(int acquires) {

    1. final Thread current = Thread.currentThread();
    2. int c = getState();
    3. if (c == 0) {
    4. //虽然是try一下但还是不能插队的,因为是公平锁吗
    5. //要先用hasQueuedPredecessors看一下有没有先来的
    6. if (!hasQueuedPredecessors() &&
    7. //没有先来的就用CAS改变一下状态
    8. compareAndSetState(0, acquires)) {
    9. //成功就申明一下互斥
    10. setExclusiveOwnerThread(current);
    11. return true;
    12. }
    13. }
    14. //可重入锁情况,需要操作:state=state+1
    15. else if (current == getExclusiveOwnerThread()) {
    16. int nextc = c + acquires;
    17. if (nextc < 0)
    18. throw new Error("Maximum lock count exceeded");
    19. setState(nextc);
    20. return true;
    21. }
    22. return false;
    23. }

    ```

  2. 若try一下失败了,则进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),先看addWaiter(Node.EXCLUSIVE)```java private Node addWaiter(Node mode) {

    1. Node node = new Node(Thread.currentThread(), mode);
    2. // Try the fast path of enq; backup to full enq on failure
    3. //先获取一下尾部的node,看看存不存在,注意tail和head均属于成员变量
    4. Node pred = tail;
    5. //尾部存在的情况就直接插入返回就行了
    6. if (pred != null) {
    7. node.prev = pred;
    8. if (compareAndSetTail(pred, node)) {
    9. pred.next = node;
    10. return node;
    11. }
    12. }
    13. //走到这里说明两种情况,1.队列为空,尾部不存在2.之前的CAS插入失败了(有竞争)
    14. //此时调用enq初始化队列,或自旋入队。
    15. enq(node);
    16. return node;

    } ```

  3. 调用enq函数自旋入队,或者初始化队列```java private Node enq(final Node node) {

     for (;;) {
         Node t = tail;
         //针对之前队列为空的情况,初始化头部
         if (t == null) { // Must initialize
             if (compareAndSetHead(new Node()))
                 tail = head;
         //这里是自旋尝试插入尾部
         //之前的情况可能为1.插入尾部有竞争2.刚初始化完
         } else {
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
    

    } ```

  4. 返回节点后,进入acquireQueued```java final boolean acquireQueued(final Node node, long arg) {

     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             //predecessor方法获取前驱节点
             final Node p = node.predecessor();
             //前驱为head的话,说明是队列的第一个,再试一下tryAcquire
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
             }
             //走到这里说明要么不是队列的第一个,要么tryAcquire的时候没有抢赢
             //失败了就要被挂起了
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
    

    } ```

  5. 节点被挂起```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

     int ws = pred.waitStatus;
     //Node.SIGNAL为-1
     if (ws == Node.SIGNAL)
         /*
          * This node has already set status asking a release
          * to signal it, so it can safely park.
          * 前端点为-1,为正常,可以挂起
          */
         return true;
     //前端点取消了排队,说明可以插在前端点前面了
     if (ws > 0) {
         /*
          * Predecessor was cancelled. Skip over predecessors and
          * indicate retry.
          */
         do {
             node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
     } else {
         /*
          * waitStatus must be 0 or PROPAGATE.  Indicate that we
          * need a signal, but don't park yet.  Caller will need to
          * retry to make sure it cannot acquire before parking.
          */
        // 仔细想想,如果进入到这个分支意味着什么
         // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
         // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
         // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
         // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
         // 改好之后走一次for循环,再进来此方法,会进入第一个分支
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
     return false;
    

    } ```

  6. 开始挂起进程了```java private final boolean parkAndCheckInterrupt() {

     LockSupport.park(this);
     return Thread.interrupted();
    

    } ```

公平重入锁解锁

解锁操作

最后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this); 挂起停止,等待被唤醒。

// 唤醒的代码还是比较简单的,你如果上面加锁的都看懂了,下面都不需要看就知道怎么回事了
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    // 往后看吧
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全释放锁
    boolean free = false;
    // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
// 唤醒后继节点
// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    // 如果head节点当前waitStatus<0, 将其修改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
    // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 刚刚线程被挂起在这里了
    return Thread.interrupted();
}
// 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的前驱是head了