一、AQS

1.概述

AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

    AQS是一个抽象类,所以不能直接实例化,当我们需要实现一个自定义锁的时候可以去继承AQS然后重写以下一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire

  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

    2.实现不可重入锁

    自定义不可重入锁

    ```java package panw.AQS;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j //自定义不可重入锁 public class MyLock implements Lock{ public static void main(String[] args) { MyLock myLock = new MyLock(); Thread t1 = new Thread(() -> { myLock.lock(); try { while (true){ log.debug(“t1”); Thread.sleep(1000000000000L); }

  1. } catch (InterruptedException e) {
  2. throw new RuntimeException(e);
  3. } finally {
  4. myLock.unlock();
  5. }
  6. }, "t1");
  7. Thread t2 = new Thread(() -> {
  8. myLock.lock();
  9. try {
  10. log.debug("t1");
  11. Thread.sleep(1000000000000L);
  12. } catch (InterruptedException e) {
  13. throw new RuntimeException(e);
  14. } finally {
  15. myLock.unlock();
  16. }
  17. }, "t2");
  18. t1.start();
  19. t2.start();
  20. }
  21. static class MySync extends AbstractQueuedSynchronizer {
  22. @Override
  23. protected boolean tryAcquire(int acquires) {
  24. if (acquires==1){
  25. if (compareAndSetState(0,1)){
  26. setExclusiveOwnerThread(Thread.currentThread());
  27. return true;
  28. }
  29. }
  30. return false;
  31. }
  32. @Override
  33. protected boolean tryRelease(int acquires) {
  34. if (acquires==1){
  35. if (getState()==0){
  36. throw new IllegalMonitorStateException();
  37. }
  38. setExclusiveOwnerThread(null);
  39. setState(0);
  40. return true;
  41. }
  42. return false;
  43. }
  44. protected Condition newCondition() {
  45. return new ConditionObject();
  46. }
  47. @Override
  48. //是否持有独占锁
  49. protected boolean isHeldExclusively() {
  50. return getState() == 1;
  51. }
  52. }
  53. protected MySync mySync = new MySync();
  54. @Override
  55. public void lock() {
  56. mySync.acquire(1);
  57. }
  58. @Override
  59. public void lockInterruptibly() throws InterruptedException {
  60. mySync.acquireInterruptibly(1);
  61. }
  62. @Override
  63. public boolean tryLock() {
  64. return mySync.tryAcquire(1);
  65. }
  66. @Override
  67. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  68. return mySync.tryAcquireNanos(1,unit.toNanos(time));
  69. }
  70. @Override
  71. public void unlock() {
  72. mySync.release(0);
  73. }
  74. @Override
  75. public Condition newCondition() {
  76. return mySync.newCondition();
  77. }

}

  1. <a name="Y6bQC"></a>
  2. ## 二、AQS源码解析
  3. <a name="KU4ge"></a>
  4. ### 总结
  5. 这部分是我debug源码完来进行的总结,可以先让大家了解一下整个AQS的结构,方便大家理解。<br />AQS类中有state属性来表示锁是否被占,还有两个重要属性:head、tail。这两个节点类型是内部类的Node节点,Node类有一个waitState来作为节点状态的标识,Node节点则是用来构建一个**双向链表的阻塞队列**和一个**单向链表的等待队列**。head和tail是用来构建**阻塞队列的,即下图:**<br />![](https://cdn.nlark.com/yuque/0/2022/jpeg/28810082/1655179484404-7ba1a1dd-216b-424b-a983-34621c23a158.jpeg)<br />还有一个内部类是ConditionObject,该内部类复用了Node类,并构造了**单向链表的等待队列。**<br />![](https://cdn.nlark.com/yuque/0/2022/jpeg/28810082/1655179768730-a9432a10-a14b-4c6b-af11-f10a1a445b3d.jpeg)
  6. <a name="XMJCa"></a>
  7. ### WaitStatus
  8. | SIGNAL | 值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,那么就会通知后继节点,让后继节点的线程能够运行 |
  9. | --- | --- |
  10. | CONDITION | 值为-2,节点在等待队列中,节点线程等待在Condition上,不过当其他的线程对Condition调用了signal()方法后,该节点就会从等待队列转移到同步队列中,然后开始尝试对同步状态的获取 |
  11. | PROPAGATE | 值为-3,表示下一次的共享式同步状态获取将会无条件的被传播下去 |
  12. | CANCELLED | 值为1,由于超时或中断,该节点被取消。 节点进入该状态将不再变化。特别是具有取消节点的线程永远不会再次阻塞 |
  13. | INITIAL | 值为0,初始状态 |
  14. <a name="LRiuP"></a>
  15. ### 通过分析ReentrantLock来分析
  16. <a name="T1fM7"></a>
  17. #### 构造
  18. ```java
  19. public ReentrantLock() {
  20. sync = new NonfairSync();
  21. }
  22. public ReentrantLock(boolean fair) {
  23. sync = fair ? new FairSync() : new NonfairSync();
  24. }

当在构造参数中添加一个true的布尔值就使用公平锁,否则就是非公平锁。那么公平锁和非公平锁是如何实现的呢?
我们来具体分析一下ReentrantLock的结构
image.png
从图中可以看出:
ReentrantLock有三个静态内部类 Sync、NonfairSync、FairSync。其中Sync继承自AbstractQueuedSynchronizer,NonfairSync、FairSync则继承Sync。

当调用lock方法时:

  1. //ReentrantLock调用了Sync中的抽象方法lock
  2. public void lock() {
  3. sync.lock();
  4. }
  5. //Sync
  6. abstract static class Sync extends AbstractQueuedSynchronizer {
  7. abstract void lock();
  8. ...
  9. }

在NonfairSync、FairSync分别实现了该方法:

  1. // FairSync
  2. static final class FairSync extends Sync {
  3. final void lock() {
  4. acquire(1);
  5. }
  6. }
  7. // NonfairSync
  8. static final class NonfairSync extends Sync {
  9. final void lock() {
  10. //首先对state进行原子操作,如果成功就将owner线程设置为当前线程
  11. if (compareAndSetState(0, 1))
  12. setExclusiveOwnerThread(Thread.currentThread());
  13. else
  14. acquire(1);
  15. }
  16. }

其中都调用了acquire(1),这是AbstractQueuedSynchronizer中的方法:

  1. public final void acquire(int arg) {
  2. //如果没获取到锁则将其加入队列中,并打断该线程
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. selfInterrupt();
  6. }

tryAcquire()方法在AbstractQueuedSynchronizer中并未实现,需要子类去进行重写:

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable {
  4. protected boolean tryAcquire(int arg) {
  5. throw new UnsupportedOperationException();
  6. }
  7. }

tryAcquire

分别来看看公平锁和非公平锁的实现:

公平锁
  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState(); //获取当前state值
  4. if (c == 0) { //锁没被其他线程获取
  5. if (!hasQueuedPredecessors() && //判断是否队列中是否有其他线程Node
  6. compareAndSetState(0, acquires)) { //原子操作将state置为1
  7. setExclusiveOwnerThread(current); //将owner线程设置为当前线程
  8. return true;
  9. }
  10. }
  11. //锁已被其他线程获取
  12. else if (current == getExclusiveOwnerThread()) { //owner线程与当前线程一致
  13. int nextc = c + acquires; //state++
  14. if (nextc < 0)
  15. throw new Error("Maximum lock count exceeded");
  16. setState(nextc); //将新state写入,用于可重入锁计数
  17. return true;
  18. }
  19. return false;
  20. }
  21. //这是AbstractQueuedSynchronizer中hasQueuedPredecessors方法
  22. public final boolean hasQueuedPredecessors() {
  23. Node t = tail;
  24. Node h = head;
  25. Node s;
  26. //判断是否队列中是否有其他线程Node
  27. return h != t &&
  28. ((s = h.next) == null || s.thread != Thread.currentThread());
  29. }

hasQueuedPredecessors方法:

  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail;
  3. Node h = head;
  4. Node s;
  5. return h != t && //判断是不是头尾为一个Node节点,是一个节点直接返回false
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. //如果不是一个节点再看头节点的下一个节点是不是null,是null直接返回true
  8. //如果下一个节点不是null,判断该节点的Thread是不是当前线程
  9. }

非公平锁
  1. protected final boolean tryAcquire(int acquires) {
  2. //调用Sync中的方法
  3. return nonfairTryAcquire(acquires);
  4. }
  5. //Sync中的方法
  6. final boolean nonfairTryAcquire(int acquires) {
  7. final Thread current = Thread.currentThread();
  8. int c = getState();
  9. if (c == 0) {
  10. if (compareAndSetState(0, acquires)) {
  11. setExclusiveOwnerThread(current);
  12. return true;
  13. }
  14. }
  15. else if (current == getExclusiveOwnerThread()) {
  16. int nextc = c + acquires;
  17. if (nextc < 0) // overflow
  18. throw new Error("Maximum lock count exceeded");
  19. setState(nextc);
  20. return true;
  21. }
  22. return false;
  23. }

从上可以看出,公平锁与非公平锁的acquire方法实现基本一样,只是公平锁在判断时调用了hasQueuedPredecessors方法,判断是否队列中是否有其他线程Node。

这里将上面的AbstractQueuedSynchronizer中的方法再次拿下来好看一些

  1. public final void acquire(int arg) {
  2. //如果没获取到锁则将其加入队列中,并打断该线程
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. selfInterrupt();
  6. }

此时acquire方法如果执行成功则acquire方法结束,逐层返回,否则则执行 &&后的语句,先来看看addWaiter。

addWaiter
  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. Node pred = tail;
  4. //第一次执行方法需要进行执行enq进行初始化,之后则不需要
  5. if (pred != null) {
  6. node.prev = pred; //在链表尾节点添加元素
  7. if (compareAndSetTail(pred, node)) { //用cas将尾节点置为新节点
  8. pred.next = node; //将新节点放在队列尾部
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

enq

该方法是第一次进入进行队列初始化

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize 必须初始化
  5. if (compareAndSetHead(new Node())) //第一次循环,创建一个NULL节点为头尾节点
  6. tail = head;
  7. } else {
  8. //第二次循环,将该节点放在队列尾部,并且返回
  9. node.prev = t;
  10. if (compareAndSetTail(t, node)) {
  11. t.next = node;
  12. return t;
  13. }
  14. }
  15. }
  16. }

接下来就来看看acquireQueued方法把!

acquireQueued
  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor(); //得到前置节点
  7. if (p == head && tryAcquire(arg)) { //先判断前节点是不是头节点,是就再进行一次抢锁
  8. setHead(node); //枪锁成功就将该节点设置为头节点
  9. p.next = null; // help GC 将头节点删除
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) && //第一次执行将waitStatus置为SIGNAL即 -1,然后再进行循环
  14. parkAndCheckInterrupt()) //第二次执行 shouldParkAfterFailedAcquire返回true,
  15. //执行parkAndCheckInterrupt
  16. interrupted = true;
  17. }
  18. } finally {
  19. if (failed)
  20. cancelAcquire(node); //默认操作失败,进行处理,下面具体来看
  21. }
  22. }

shouldParkAfterFailedAcquire

该方法来判断是否需要park

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus; //第一次仅需该方法 waitStatus的默认值为0所以执行该代码块的12行
  3. if (ws == Node.SIGNAL) //第二次进入,条件成立,返回true
  4. return true;
  5. if (ws > 0) {
  6. do {
  7. node.prev = pred = pred.prev;
  8. } while (pred.waitStatus > 0);
  9. pred.next = node;
  10. } else {
  11. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 第一次将waitStatus设置为-1
  12. }
  13. return false;
  14. }

parkAndCheckInterrupt
  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

cancelAcquire
  1. private void cancelAcquire(Node node) {
  2. if (node == null)
  3. return;
  4. node.thread = null; //将当前节点线程置为null
  5. Node pred = node.prev; //获取前节点
  6. while (pred.waitStatus > 0) //如果前节点的waitStatus是 CANCELLED 即 1
  7. node.prev = pred = pred.prev; //跳过前面已经为CANCELLED的节点
  8. Node predNext = pred.next;
  9. node.waitStatus = Node.CANCELLED; //将该节点waitStatus置为 1
  10. // 如果是尾节点,直接删除
  11. if (node == tail && compareAndSetTail(node, pred)) {
  12. compareAndSetNext(pred, predNext, null); //将前节点的下一个节点置为null
  13. } else {
  14. int ws;
  15. if (pred != head &&
  16. ((ws = pred.waitStatus) == Node.SIGNAL ||
  17. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  18. pred.thread != null) {
  19. Node next = node.next;
  20. if (next != null && next.waitStatus <= 0)
  21. compareAndSetNext(pred, predNext, next); //将要删除节点的后节点链接到该节点的前节点
  22. } else {
  23. unparkSuccessor(node);
  24. }
  25. node.next = node; // help GC
  26. }
  27. }

当调用unlock方法时:

  1. //ReentrantLock调用了Sync中的方法release
  2. public void unlock() {
  3. sync.release(1);
  4. }

release
  1. //Sync中的release
  2. public final boolean release(int arg) {
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. unparkSuccessor(h); //如果头节点不为null,status不为0
  7. return true;
  8. }
  9. return false;
  10. }

tryRelease
  1. //ReentrantLock中的方法tryRelease
  2. protected final boolean tryRelease(int releases) {
  3. int c = getState() - releases; //state - 1
  4. if (Thread.currentThread() != getExclusiveOwnerThread())
  5. throw new IllegalMonitorStateException();
  6. boolean free = false;
  7. if (c == 0) {
  8. free = true;
  9. setExclusiveOwnerThread(null); //如果state为0设置owner为null,free为true
  10. }
  11. setState(c); //写回state,如果不为0则未解锁
  12. return free;
  13. }

unparkSuccessor
  1. private void unparkSuccessor(Node node) {
  2. int ws = node.waitStatus;
  3. if (ws < 0)
  4. compareAndSetWaitStatus(node, ws, 0); //把头结点的waitStatus置为0
  5. Node s = node.next;
  6. if (s == null || s.waitStatus > 0) {
  7. s = null;
  8. for (Node t = tail; t != null && t != node; t = t.prev) //如果头节点的下一个节点为null或者waitStatus为CANCELLED 即1
  9. //那么就一直往后找到waitStatus<0的节点
  10. if (t.waitStatus <= 0)
  11. s = t;
  12. }
  13. if (s != null)
  14. LockSupport.unpark(s.thread); //找到节点,将线程unpark
  15. }

当调用await方法时:

这时会执行AbstractQueuedSynchronizer中内部类ConditionObject中的await方法

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter(); //创建等待队列
  5. int savedState = fullyRelease(node); //将同步队列中的锁释放
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //进入阻塞队列
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

addConditionWaiter
  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter; //获取尾节点
  3. if (t != null && t.waitStatus != Node.CONDITION) { //如果不为空且waitStatus不为-2
  4. unlinkCancelledWaiters(); //删除已取消的节点
  5. t = lastWaiter;
  6. }
  7. Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建新节点
  8. if (t == null)
  9. firstWaiter = node; //如果第一次创建则将节点加在头
  10. else
  11. t.nextWaiter = node; //之后则加在尾节点后
  12. lastWaiter = node; //将该节点置为尾节点
  13. return node;
  14. }

当调用signal方法时:

  1. public final void signal() {
  2. if (!isHeldExclusively()) //判断owner线程是不是当前线程
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignal(first);
  7. }

doSignal
  1. private void doSignal(Node first) {
  2. do {
  3. if ( (firstWaiter = first.nextWaiter) == null)
  4. lastWaiter = null;
  5. first.nextWaiter = null; //上面三行是将首节点拿出等待队列
  6. } while (!transferForSignal(first) && //将节点从等待队列转移到同步队列
  7. (first = firstWaiter) != null);
  8. }

transferForSignal
  1. final boolean transferForSignal(Node node) {
  2. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  3. return false;
  4. Node p = enq(node); //enq上面已经分析过,用来对阻塞队列进行初始化,返回前节点
  5. int ws = p.waitStatus;
  6. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果前节点未被删除,把前节点waitStatus置为-1,让该节点线程unpark
  7. LockSupport.unpark(node.thread);
  8. return true;
  9. }

我们就对AQS源码讨论这么多,相信你已经把握大部分了,其他的就由自己研究把!

三、读写锁(待写)

3.1ReentrantReadWriteLock

当读操作远远高于写操作时候,这时候使用读写锁可以让读-读并发,提高性能。