一、AQS(AbstractQueuedSynchronizer)

AQS是一个抽象同步框架,实现一个依赖状态的同步器。java.util.concurrent包中,比如等待队列、条件队列、独占获取、共享获取等,都依赖AQS。

AQS特性

阻塞等待队列、共享/独占、公平/非公平、可重入、允许中断。

AQS内部属性

volatile int state

  • state表示资源的可用状态

State三种访问方式:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

    AQS定义两种队列

    同步等待队列

    AQS当中的同步等待队列也称CLH队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
    AQS 依赖CLH同步队列来完成同步状态的管理:

    • 当前线程,如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
    • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
    • 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

image.png

条件等待队列

AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:

  1. - 调用await方法阻塞线程;
  2. - 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)

image.png

  1. 1. 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列**尾部**添加一个节点,所以调用Condition#await方法的时候**必须持有锁**。
  2. 1. 调用Condition#signal方法会将Condition队列的**首节点**移动到同步等待队列**尾部**,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁),所以调用Condition#signal方法的时候**必须持有锁**,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。

二、ReentrantLock应用与原理

ReentrantLock是一种基于AQS框架的应用实现,功能类似于synchronized,是一种互斥锁,可以保证线程安全。
相对于 synchronized, ReentrantLock具备如下特点:

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

对比区别

synchronized ReentrantLock
JVM层次的锁实现 JDK层次的锁实现
锁状态是无法在代码中直接判断 通过ReentrantLock#isLocked,获取锁定状态
非公平锁 可以是公平也可以是非公平
不可以被中断的 可以被中断
发生异常时,synchronized会自动释放锁 在finally块中显示释放锁
特定的情况下对于已经在等待的线程是后来的线程先获得锁 已经在等待的线程是先来的线程先获得锁

1.ReentrantLock应用测试

  1. public static void main(String[] args) {
  2. Lock lock = new ReentrantLock();
  3. Thread thread0 = new Thread(() -> {
  4. lock.lock();
  5. try {
  6. System.out.println("输出测试0");
  7. } finally {
  8. lock.unlock();
  9. }
  10. });
  11. thread0.start();
  12. Thread thread1 = new Thread(() -> {
  13. lock.lock();
  14. try {
  15. System.out.println("输出测试1");
  16. } finally {
  17. lock.unlock();
  18. }
  19. });
  20. thread1.start();
  21. Thread thread2 = new Thread(() -> {
  22. lock.lock();
  23. try {
  24. System.out.println("输出测试2");
  25. } finally {
  26. lock.unlock();
  27. }
  28. });
  29. thread2.start();
  30. }

1.1.可中断

  1. @Slf4j
  2. public class ReentrantLockDemo3 {
  3. public static void main(String[] args) {
  4. ReentrantLock lock = new ReentrantLock();
  5. Thread t1 = new Thread(() -> {
  6. log.debug("t1启动...");
  7. try {
  8. lock.lockInterruptibly();
  9. try {
  10. log.debug("t1获得了锁");
  11. } finally {
  12. lock.unlock();
  13. }
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. log.debug("t1等锁的过程中被中断");
  17. }
  18. }, "t1");
  19. lock.lock();
  20. try {
  21. log.debug("main线程获得了锁");
  22. t1.start();
  23. //先让线程t1执行
  24. try {
  25. Thread.sleep(1000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. t1.interrupt();
  30. log.debug("线程t1执行中断");
  31. } finally {
  32. lock.unlock();
  33. }
  34. }

1.2.超时:立即失败

  1. @Slf4j
  2. public class ReentrantLockDemo4 {
  3. public static void main(String[] args) {
  4. ReentrantLock lock = new ReentrantLock();
  5. Thread t1 = new Thread(() -> {
  6. log.debug("t1启动...");
  7. // 注意: 即使是设置的公平锁,此方法也会立即返回获取锁成功或失败,公平策略不生效
  8. if (!lock.tryLock()) {
  9. log.debug("t1获取锁失败,立即返回false");
  10. return;
  11. }
  12. try {
  13. log.debug("t1获得了锁");
  14. } finally {
  15. lock.unlock();
  16. }
  17. }, "t1");
  18. lock.lock();
  19. try {
  20. log.debug("main线程获得了锁");
  21. t1.start();
  22. //先让线程t1执行
  23. try {
  24. Thread.sleep(1000);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. } finally {
  29. lock.unlock();
  30. }
  31. }

1.2超时指定时间失败

  1. @Slf4j
  2. public class ReentrantLockDemo4 {
  3. public static void main(String[] args) {
  4. ReentrantLock lock = new ReentrantLock();
  5. Thread t1 = new Thread(() -> {
  6. log.debug("t1启动...");
  7. //超时
  8. try {
  9. if (!lock.tryLock(1, TimeUnit.SECONDS)) {
  10. log.debug("等待 1s 后获取锁失败,返回");
  11. return;
  12. }
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. return;
  16. }
  17. try {
  18. log.debug("t1获得了锁");
  19. } finally {
  20. lock.unlock();
  21. }
  22. }, "t1");
  23. lock.lock();
  24. try {
  25. log.debug("main线程获得了锁");
  26. t1.start();
  27. //先让线程t1执行
  28. try {
  29. Thread.sleep(2000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. } finally {
  34. lock.unlock();
  35. }
  36. }

2.ReentrantLock获取锁流程

AQS与ReentrantLock - 图3

三、ReentrantLock源码解析

1.重要变量

  1. /** 标记节点为共享模式 */
  2. static final Node SHARED = new Node();
  3. /** 标记节点为独占模式 */
  4. static final Node EXCLUSIVE = null;
  5. /** 标记在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待(即从队列中移除) */
  6. static final int CANCELLED = 1;
  7. /** waitStatus value to indicate successor's thread needs unparking
  8. * 标记后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消
  9. * 将会唤醒后继节点,使后继节点的线程得以运行。
  10. */
  11. static final int SIGNAL = -1;
  12. /** waitStatus value to indicate thread is waiting on condition
  13. * 标记节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后
  14. * 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
  15. */
  16. static final int CONDITION = -2;
  17. /**
  18. * waitStatus value to indicate the next acquireShared should
  19. * unconditionally propagate
  20. * 标识下一次共享式同步状态获取将会被无条件地传播下去
  21. */
  22. static final int PROPAGATE = -3;
  23. /**
  24. * 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
  25. * 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
  26. * 即被一个线程修改后,状态会立马让其他线程可见。
  27. */
  28. volatile int waitStatus;

2.获取锁逻辑

加锁:
阻塞需要for循环执行两次
for{
第一次循环:把阻塞元素前一个元素waitstatus设置为-1
第二次循环:阻塞当前线程(park)
}

  1. /**
  2. * 默认为非公平锁
  3. */
  4. public void lock() {
  5. sync.lock();
  6. }

2.1.非公平式获取锁

  1. /**
  2. * Sync object for non-fair locks
  3. */
  4. static final class NonfairSync extends Sync {
  5. private static final long serialVersionUID = 7316153563782823691L;
  6. /**
  7. * 1.立即执行获取锁操作。
  8. * 2.失败,则执行常规获取锁操作
  9. */
  10. final void lock() {
  11. //立即尝试获取锁,如果当前state=0,则可以获取锁
  12. if (compareAndSetState(0, 1))
  13. setExclusiveOwnerThread(Thread.currentThread());
  14. else
  15. //常规获取锁操作
  16. acquire(1);
  17. }
  18. protected final boolean tryAcquire(int acquires) {
  19. return nonfairTryAcquire(acquires);
  20. }
  21. }
  22. 采用非公平方式,可以提高效率。可以直接由活跃线程获取锁,避免unpark唤醒耗时。
  23. 解锁过程
  24. 1.设置state=0
  25. 非公平模式下,可在此处插队
  26. 2.唤醒线程
  27. 3.刚唤醒线程,执行尝试获取锁逻辑
  28. 4.从等待队列中,移除刚唤醒的节点。

2.2.常规获取锁

  1. /**
  2. * 1.尝试获取锁
  3. * 2.第一步失败,则创建等待节点,并入队
  4. */
  5. public final void acquire(int arg) {
  6. if (!tryAcquire(arg) &&
  7. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  8. selfInterrupt();
  9. }
  10. //调用NonfairSync中获取锁实现
  11. protected final boolean tryAcquire(int acquires) {
  12. return nonfairTryAcquire(acquires);
  13. }
  14. /**
  15. * 调用Sync非公平获取锁
  16. */
  17. final boolean nonfairTryAcquire(int acquires) {
  18. final Thread current = Thread.currentThread();
  19. int c = getState();
  20. if (c == 0) {
  21. if (compareAndSetState(0, acquires)) {
  22. setExclusiveOwnerThread(current);
  23. return true;
  24. }
  25. }
  26. //处理锁重入场景
  27. else if (current == getExclusiveOwnerThread()) {
  28. int nextc = c + acquires;
  29. if (nextc < 0) // overflow
  30. throw new Error("Maximum lock count exceeded");
  31. setState(nextc);
  32. return true;
  33. }
  34. return false;
  35. }

2.3.创建等待节点

  1. //创建等待节点,并插入CHL(双向)队列尾部
  2. private Node addWaiter(Node mode) {
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. //将尾部节点暂存到pred中
  6. Node pred = tail;
  7. //队列为空时,则tail和head节点都为null,如果perd不为null,则同步队列不为空。
  8. if (pred != null) {
  9. //队列不为空,执行插入操作
  10. node.prev = pred;//设置当前节点前驱为原队列尾节点
  11. if (compareAndSetTail(pred, node)) {//通过cas设置当前节点为尾节点
  12. pred.next = node;//之前尾节点指向当前节点
  13. return node;
  14. }
  15. }
  16. //当前同步队列为null,执行队列初始化,并将当前节点加入共同队列
  17. enq(node);
  18. return node;
  19. }

2.4.初始化等待队列,并插入第一个元素

  1. //1.第一次循环,判断队列是否为空,为空则初始化
  2. //2.第二次循环,则将节点,插入刚初始化好的队列中
  3. private Node enq(final Node node) {
  4. for (;;) {
  5. Node t = tail;
  6. if (t == null) { // Must initialize
  7. if (compareAndSetHead(new Node()))//通过cas初始化CHL队列
  8. tail = head;//头、尾节点指向同一个位置。
  9. } else {
  10. node.prev = t;
  11. if (compareAndSetTail(t, node)) {//通过cas设置当前节点为尾节点
  12. t.next = node;
  13. return t;
  14. }
  15. }
  16. }
  17. }

2.5.阻塞加入队列中线程节点

  1. //阻塞队列中的节点,将当前线程阻塞,等待获取锁(阻塞前如果是第一个节点,则再次尝试加锁)
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();//获取当前节点前驱节点
  8. //如果当前节点前驱节点为head(则当前节点是队列中第一个有效节点),则再次尝试获取锁。(该节点是同步队列中的第一个节点,避免阻塞性能损耗,再次获取锁)
  9. if (p == head && tryAcquire(arg)) {
  10. //执行到此处,表示待阻塞节点,已经获取锁不必再阻塞,故将当前节点设置为同步队列的head(头)节点
  11. setHead(node);
  12. p.next = null; // help GC
  13. failed = false;
  14. return interrupted;
  15. }
  16. /**
  17. * 第一次循环,(shouldParkAfterFailedAcquire方法)将当前【待阻塞】节点,前一个节点的waitStatus设置为-1,返回false
  18. * 第二次循环,当前待阻塞节点的前一个节点waitStatus=-1,则返回true。
  19. * 继续执行parkAndCheckInterrupt方法(1.执行线程阻塞,2.并检查是否有中断标识【可能在阻塞期间被设置了中断】)
  20. **/
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. //***线程interrupt方法,会唤醒park阻塞的线程**/
  24. interrupted = true;
  25. }
  26. } finally {
  27. //发生异常时,failed才会true,然后标记队列中节点为Cancel状态
  28. if (failed)
  29. cancelAcquire(node);
  30. }
  31. }

2.5.1.设置前驱节点信号量并清理无效节点

  1. //判断当前节点是否可以被阻塞
  2. //第一轮循环时,当前待阻塞节点的【前一个节点】waitStatus修改为-1.signal=-1可被唤醒。为唤醒做准备,唤醒时,进行判断!=0
  3. //第二次循环,当前待阻塞节点的前一个节点waitStatus=-1,则返回true。
  4. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  5. int ws = pred.waitStatus;
  6. if (ws == Node.SIGNAL)
  7. /*
  8. * This node has already set status asking a release
  9. * to signal it, so it can safely park.
  10. */
  11. return true;
  12. if (ws > 0) {//大于0,表示失效节点
  13. /*
  14. * 循环清理等待队列中的失效节点
  15. */
  16. do {
  17. node.prev = pred = pred.prev;
  18. } while (pred.waitStatus > 0);
  19. pred.next = node;
  20. } else {
  21. /*
  22. * 将前驱节点信号量设置为SIGNAL(-1)
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  25. }
  26. return false;
  27. }

3.解锁逻辑

解锁
1.设置state=0
非公平模式下,可在此处插队
2.唤醒线程
3.刚唤醒线程,执行尝试获取锁逻辑
4.从等待队列中,移除刚唤醒的节点。

  1. public void unlock() {
  2. sync.release(1);
  3. }

3.1.解锁

  1. //调用AQS方法
  2. public final boolean release(int arg) {
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. unparkSuccessor(h);
  7. return true;
  8. }
  9. return false;
  10. }

3.1.1.尝试解锁

  1. /*调用ReentrantLock方法
  2. * 解锁:即还原lock初始状态
  3. * 返回:解锁结果。成功:true;失败:false
  4. **/
  5. protected final boolean tryRelease(int releases) {
  6. int c = getState() - releases;
  7. //必须获取锁的线程解锁
  8. if (Thread.currentThread() != getExclusiveOwnerThread())
  9. throw new IllegalMonitorStateException();
  10. boolean free = false;
  11. //处理锁重入情况
  12. if (c == 0) {
  13. free = true;
  14. setExclusiveOwnerThread(null);
  15. }
  16. setState(c);
  17. return free;
  18. }

3.1.2.唤醒线程

  1. /*
  2. * 1.解锁场景,入参为head节点,修改head,waitStatus=0
  3. *
  4. **/
  5. private void unparkSuccessor(Node node) {
  6. int ws = node.waitStatus;
  7. if (ws < 0)
  8. compareAndSetWaitStatus(node, ws, 0);
  9. //如果后继节点不符合唤醒条件,则从队列尾部,查询队列最前端符合条件的待唤醒节点
  10. Node s = node.next;
  11. if (s == null || s.waitStatus > 0) {
  12. s = null;
  13. //从队列尾部遍历,获取队列头部第一个waitStatus<0的节点
  14. for (Node t = tail; t != null && t != node; t = t.prev)
  15. if (t.waitStatus <= 0)
  16. s = t;
  17. }
  18. //唤醒阻塞线程
  19. if (s != null)
  20. LockSupport.unpark(s.thread);
  21. }

3.2.唤醒线程继续执行

  1. //继续执行循环操作
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. /*
  8. * 唤醒节点为队列第一个节点,故尝试获取锁。
  9. * 公平锁场景:肯定可以获取锁
  10. * 非公平锁场景:获取锁可能失败,重新阻塞
  11. */
  12. final Node p = node.predecessor();
  13. if (p == head && tryAcquire(arg)) {
  14. //获取锁成功,头结点设置为-1。(解锁时,会将头节点waitStatus设置为0)
  15. setHead(node);
  16. p.next = null; // help GC
  17. failed = false;
  18. return interrupted;
  19. }
  20. //获取锁失败,头结点设置为-1。(解锁时,会将头节点waitStatus设置为0)
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. interrupted = true;
  24. }
  25. } finally {
  26. //发生异常时,failed才会true
  27. if (failed)
  28. cancelAcquire(node);
  29. }
  30. }

网络总结图
ReentrantLock中的AbstractQueuedSynchronized的流程.png

图.png