Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,
比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

1.AQS原理

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。

这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
图片.png它维护了一个**volatile int state**(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列), 通过CAS完成对State值的修改

  • compareAndSetState()

1.1 AQS数据结构

AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点
图片.png
方法和属性值的含义 :

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出npe
nextWaiter 指向下一个处于CONDITION状态的节点
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 共享,多个线程可同时执行,如Semaphore/CountDownLatch
EXCLUSIVE 独占,只有一个线程能执行,如ReentrantLock

waitStatus的枚举值 :

枚举 含义
0 当一个Node被初始化的时候的默认值
CANCELLED 为1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

1.2 同步状态State

AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况
图片.png
state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。
只有用到Condition才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,
如果释放后允许唤醒后续等待结点返回True,否则返回False。

2.AQS源码

2.1 加锁

加锁的流程
并发基石--AQS - 图4

2.1.1 acquire

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响. 下面是acquire()的源码:

  1. /**
  2. * 获取独占锁
  3. */
  4. public final void acquire(int arg) {
  5. //尝试获取锁
  6. if (!tryAcquire(arg) &&
  7. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//独占模式
  8. selfInterrupt();
  9. }

函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待;
  2. addWaiter()如果获取锁失败, 将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(,将中断补上。

2.1.1 tryAcquire(int)

  1. protected boolean tryAcquire(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。

ReentrantLock中公平锁tryAcquire的实现

  1. /**
  2. * 公平锁
  3. */
  4. static final class FairSync extends Sync {
  5. private static final long serialVersionUID = -3000897897090466540L;
  6. final void lock() {
  7. acquire(1);
  8. }
  9. /**
  10. * 重写aqs中的方法逻辑
  11. * 尝试加锁,被AQS的acquire()方法调用
  12. */
  13. protected final boolean tryAcquire(int acquires) {
  14. final Thread current = Thread.currentThread();
  15. int c = getState();
  16. if (c == 0) {
  17. /**
  18. * 与非公平锁中的区别,需要先判断队列当中是否有等待的节点
  19. * 如果没有则可以尝试CAS获取锁
  20. */
  21. if (!hasQueuedPredecessors() &&
  22. compareAndSetState(0, acquires)) {
  23. //独占线程指向当前线程
  24. setExclusiveOwnerThread(current);
  25. return true;
  26. }
  27. } //判断同一个线程多次获取,体现可重入性
  28. else if (current == getExclusiveOwnerThread()) {
  29. int nextc = c + acquires;
  30. if (nextc < 0)
  31. throw new Error("Maximum lock count exceeded");
  32. setState(nextc);
  33. return true;
  34. }
  35. return false;
  36. }
  37. }

2.1.3 addWaiter(Node)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点

  1. private Node addWaiter(Node mode) {
  2. // 1. 将当前线程构建成Node类型。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
  3. Node node = new Node(Thread.currentThread(), mode);
  4. //尝试快速方式直接放到队尾。
  5. Node pred = tail;
  6. if (pred != null) // 2. 1当前尾节点是否为null?
  7. node.prev = pred; // 2.2 将当前节点尾插入的方式
  8. if (compareAndSetTail(pred, node)) { // 2.3 CAS将节点插入同步队列的尾部
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. //上一步失败则通过enq入队。
  14. enq(node);
  15. return node;
  16. }

2.1.4 enq(Node)

此方法用于将node加入队尾

  1. private Node enq(final Node node) {
  2. //CAS"自旋",直到成功加入队尾
  3. for (;;) {
  4. Node t = tail;
  5. if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
  6. if (compareAndSetHead(new Node()))
  7. tail = head;
  8. } else {//正常流程,放入队尾
  9. node.prev = t;
  10. if (compareAndSetTail(t, node)) { //当前节点置为尾部
  11. t.next = node; //前驱节点的next指针指向当前节点
  12. return t;
  13. }
  14. }
  15. }
  16. }

2.1.5 acquireQueued(Node, int)

进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~
acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回

acquireQueued源码

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;//标记是否成功拿到资源
  3. try {
  4. boolean interrupted = false;//标记等待过程中是否被中断过
  5. //又是一个“自旋”!
  6. for (;;) {
  7. final Node p = node.predecessor();//拿到前驱
  8. //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
  9. if (p == head && tryAcquire(arg)) {
  10. setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
  11. p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
  12. failed = false; // 成功获取资源
  13. return interrupted;//返回等待过程中是否被中断过
  14. }
  15. //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。
  16. //如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
  17. if (shouldParkAfterFailedAcquire(p, node) &&
  18. parkAndCheckInterrupt())
  19. interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
  20. }
  21. } finally {
  22. if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
  23. cancelAcquire(node);
  24. }
  25. }

到这里了,我们先不急着总结acquireQueued()的函数流程,先看看shouldParkAfterFailedAcquire()parkAndCheckInterrupt()具体干些什么。

2.1.6 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧!

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

2.1.7 parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//调用park()使线程进入waiting状态
    return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
1)被unpark()
2)被interrupt();

需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

2.1.8 小结

acquireQueued()分析完之后,我们接下来再回到acquire()!再贴上它的源码吧

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

2.2 解锁

2.2.1 release(int)

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0)
它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。

下面是release()的源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,**

2.2.2 tryRelease(int)

此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

2.2.3 unparkSuccessor(Node)

此方法用于唤醒等待队列中下一个线程。下面是源码:

private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程
这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),
然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!

2.2.4 小结

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?是不是没法再被唤醒了?
答案是YES!!!这时,队列中等待锁的线程将永远处于park状态,无法再被唤醒!!!但是我们再回头想想,获取锁的线程在什么情形下会release抛出异常呢??

  1. 线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;
  2. 线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;
  3. release代码有bug,抛出异常了?目前来看,Doug Lea的release方法还是比较健壮的,没有看出能引发异常的情形(如果有,恐怕早被用户吐槽了)。除非自己写的tryRelease()有bug,那就没啥说的,自己写的bug只能自己含着泪去承受了

3.ASQ应用

3.1 ReentrantLock的可重入应用

ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:

if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}
else if (current == getExclusiveOwnerThread()) {  //可重入,
    int nextc = c + acquires;  //同一个线程多次获得锁, 就会多次+1,这里就是可重入的概念
    if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
}

3.2 JUC中的应用场景

AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,
大体介绍一下AQS的应用场景:

同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

3.3 自定义同步工具

了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具

public class LeeLock  {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire (int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease (int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively () {
            return getState() == 1;
        }
    }

    private Sync sync = new Sync();

    public void lock () {
        sync.acquire(1);
    }

    public void unlock () {
        sync.release(1);
    }
}

通过我们自己定义的Lock完成一定的同步功能。

public class LeeMain {

    static int count = 0;
    static LeeLock leeLock = new LeeLock();

    public static void main (String[] args) throws InterruptedException {

        Runnable runnable = new Runnable() {
            @Override
            public void run () {
                try {
                    leeLock.lock();
                    for (int i = 0; i < 10000; i++) {
                        count++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    leeLock.unlock();
                }

            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}

3.4 Mutex(互斥锁)

Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。
下边是Mutex的核心源码:

class Mutex implements Lock, java.io.Serializable {
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取资源,立即返回。成功则返回true,否则false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,立即返回。成功则为true,否则false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定为1个量
            if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 真正同步类的实现都依赖继承于AQS的自定义同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。两者语文一样:释放资源。
    public void unlock() {
        sync.release(1);
    }

    //锁是否占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}