ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步 手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比 synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

1.1 一种可靠的锁机制

直接上源码相信很多人都接受不了,包括本人当初学习的时候也是看得一脸懵。在了解了锁的工作机制后,在来推断ReentrantLock的实现会容易理解很多。

要实现线程安全,保证只有抢到锁之后才能执行业务代码,抢不到锁就一直停留在lock()方法里面出不来,直到抢到锁为止。

  1. lock.lock() //加锁
  2. 业务代码.....
  3. lock.unlock() //解锁

要想一个线程一直停留在lock()方法里面,最好的方法是设一个死循环,不断尝试获取锁,并保证每次只能有一个线程拿到锁,拿到锁就可以跳出循环,如果加锁不成功,则会不断重试。如果当前线程数比较大,所有线程都不断的重试,会极大浪费我们的CPU资源。此时有这么几种方案:

  • 线程让出当前CPU使用权
  • 线程睡眠一定的时间
  • 线程阻塞,待释放锁的线程唤醒

如果业务方法非常长,线程执行yeild()方法后,等到下次获得CPU使用权业务方法还没结束,继续让出CPU使用权,CPU不断执行yeild方法,也没有高效利用CPU资源;同理,不能正确预计业务执行时长,可能业务执行完了还在睡眠,白白浪费CPU资源。最合理的方法是等待释放锁的线程去唤醒阻塞线程。为此应该有一个数据结构(可以是数组、链表、队列)去保存当前的线程的引用。

  1. lock(){
  2. while(true){
  3. if(tryLock()){
  4. break; //加锁成功跳出循环
  5. }
  6. //Thread.yeild();//让出cpu使用权
  7. //Thread.seelp(time);//线程休眠
  8. Queue queue;
  9. queue.put(currentThread); //放到一个队列里面
  10. //阻塞
  11. LockSupport.park();
  12. }
  13. }

当得到锁的线程执行完业务代码后,就要释放锁资源同时去唤醒阻塞的线程。

  1. unlock(){
  2. Thread thread = queue.get();
  3. LockSupport.unpark(thread);
  4. }

总结:要实现上面锁机制的要素,自旋(死循环)、LockSupport.park() /LockSupport.unpark()(阻塞与唤醒)、CAS(保证只能有一条线程抢到锁)、队列(存放加锁失败的线程)

1.2 比较与交换(CAS)

CAS算法大概的流程是,当一个线程从主内存中读取一个变量到自己的工作内存中,当修改该变量值重新写回主内存中时,需要把当前线程当初读取该变量值与该变量当前在主内存的值进行比较,若相同则可以修改,不同则修改失败。

例如:线程A和线程B同时读取主内存中的同一个值微信截图_20210618162405.png线程B先修改了变量的值重新写回主内存中,线程A想修改该变量应重新读取变量新的值再进行修改微信截图_20210618162416.png

CAS中比较和交换这两个动作合在一起是原子的,底层依赖于汇编指令#cmpxchg#,具体可以查看各CPU厂商的指令手册。

在java里面Unsafe类提供了CAS的实现,里面有4个参数
微信截图_20210618163529.png
第一个参数是要改变的变量所属于的对象实例,例如要修改的属性是当前对象的属性,则传入this
第二个参数是属性在当前对象中的偏移量,即相对于当前对象在堆内存中的地址的偏移值
第三个参数是属性原来的值。
第四个参数是要修改的值。

1.3 AQS核心要素

到这里相信大家都对ReentrantLock的锁机制有所了解,那么它是如何基于AQS实现的呢?这就要弄清楚AQS里面的核心要素及作用。
微信截图_20210618175253.png
exclusiveOwnerThread:AQS上面还继承了一个父类,该父类中有一个这样的属性,该属性就是记录当前获得锁的线程

  1. private transient Thread exclusiveOwnerThread;

state:在AQS中有一个state变量,该变量记录着锁的状态,当state=0时,代表着锁没被任何线程获得,当state > 0时,代表锁被某个线程持有

  1. private volatile int state;

Node:AQS中有一个Node内部类,通过node信息维护着CLH队列和条件队列。

  1. static final class Node {
  2. //标记该节点为共享模式
  3. static final Node SHARED = new Node();
  4. //标记该节点为独占模式
  5. static final Node EXCLUSIVE = null;
  6. //节点状态
  7. //表示线程等待超时或被中断,该节点需要从同步队列中移除,不能参与锁争夺
  8. static final int CANCELLED = 1;
  9. //处于SIGNAl状态的节点在锁被释放时可被唤醒
  10. static final int SIGNAL = -1;
  11. //表示该节点在条件等待队列中,当其他节点对该节点调用signal()方法,该节点会从
  12. //条件等待队列进入到CHL队列,在BlockingQueue中用到该条件
  13. static final int CONDITION = -2;
  14. //表示下一次共享同步状态获取将会告诉下一个节点,使其参与锁竞争
  15. static final int PROPAGATE = -3;
  16. //表示当前节点状态,即上面4种情况,刚初始化默认为0
  17. volatile int waitStatus;
  18. //指向前一个节点
  19. volatile Node prev;
  20. //指向下一个节点
  21. volatile Node next;
  22. //当前节点所代表的线程
  23. volatile Thread thread;
  24. //若当前节点位于条件等待队列,指向位于该节点的下一个节点
  25. Node nextWaiter;
  26. .......
  27. }

同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人 发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH 队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
微信截图_20210618181427.png

条件等待队列
Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
微信截图_20210618205903.png

1.4 ReentrantLock源码解析

微信截图_20210618172813.png

由上图可知,ReentrantLock通过内部定义一个Sync类继承AQS,而Sync又有两个实现类,分别实现了公平锁与非公平锁。

下面我们以公平锁为例:
当我们调用lock.lock()方法,由于实现的是公平锁,会走到FairSync中的lock()方法

  1. final void lock() {
  2. acquire(1);
  3. }

接着执行acquire()方法,其中tryAcquire()方法是尝试获取锁,加锁失败后调用addWaiter()方法创建Node节点并加入等待队列,acquireQueued()方法会再次尝试获取锁,再次失败后把该线程阻塞。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

lock —- tryAcquire()

该方法主要是为了加锁,通过修改状态量state的值来表示当前锁的状态。其中要判断CLH队列中有无节点,有节点则不能获取锁,要加入队列排队(公平锁的机制)。根据代码可知,ReentrantLock的锁是可重入的。

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. //获取当前状态量
  4. int c = getState();
  5. //若state为0则代表还没有线程获得锁
  6. if (c == 0) {
  7. if (!hasQueuedPredecessors() && //判断CLH队列中有没有节点,没有才可以接着修改state的值
  8. compareAndSetState(0, acquires)) { //通过CAS的方法修改state的值,acquires是我们一开始传入的值,为1
  9. setExclusiveOwnerThread(current); //成功修改state的值,即获得锁成功,把exclusiveOwnerThread的值设为当前线程
  10. return true;
  11. }
  12. }
  13. else if (current == getExclusiveOwnerThread()) { //若state不等于0,判断所得锁的线程是否是当前线程,有可能是当前线程重复加锁,这种情况是允许的
  14. int nextc = c + acquires;
  15. if (nextc < 0)
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc); //判断是重复加锁的情况,则在现在state的基础上继续加1
  18. return true;
  19. }
  20. return false;
  21. }
  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail;
  3. Node h = head;
  4. Node s;
  5. return h != t && //判断队列是否为空
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }

当只有一条线程,肯定可以获取锁成功
微信截图_20210619002552.png

lock —- addWaiter()

当线程获取锁失败,则会加入CHL队列。

  1. private Node addWaiter(Node mode) {
  2. //创建一个节点绑定当前线程,mode表示当前节点的模式,上面传入的是Node.EXCLUSIVE,代表是独占模式
  3. Node node = new Node(Thread.currentThread(), mode);
  4. //获取尾节点
  5. Node pred = tail;
  6. //当tail节点不为空
  7. if (pred != null) {
  8. node.prev = pred;
  9. // 新创建的节点入队并成为新的tail节点
  10. if (compareAndSetTail(pred, node)) {
  11. pred.next = node;
  12. return node;
  13. }
  14. }
  15. //当一次为队列创建节点则会走这个方法
  16. enq(node);
  17. return node;
  18. }

之前的队列没有节点,当第一次创建节点时,走的是enq(Node node)方法,node节点到底如何加入队列?

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. //当队列第一次创建,head和tail肯定为空
  5. if (t == null) {
  6. if (compareAndSetHead(new Node()))
  7. //创建一个新节点,并且使head = tail
  8. tail = head;
  9. } else {
  10. //使我们传进来的节点成为新的tail
  11. node.prev = t;
  12. if (compareAndSetTail(t, node)) {
  13. t.next = node;
  14. return t;
  15. }
  16. }
  17. }
  18. }

第一次循环先创建一个新的节点并让head和tail同时指向该节点
微信截图_20210619143830.png
第二次循环使我们传进来的节点成为新的tail节点,为了防止多个线程节点同时入队,设置tail节点时采用了CAS的方法,保证每个线程节点都能入队成功
微信截图_20210619145649.png

lock —- acquireQueued()

直到现在,线程还没有被阻塞,该方法就是去阻塞入队的线程

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. //若入队的节点前一个节点是head节点,则调用tryAcquire()重新尝试获取锁
  8. if (p == head && tryAcquire(arg)) {
  9. //获取锁成功,重新设置head节点
  10. setHead(node);
  11. p.next = null; //断开的节点则等待GC垃圾回收
  12. failed = false;
  13. return interrupted;
  14. }
  15. //去阻塞该节点的线程
  16. if (shouldParkAfterFailedAcquire(p, node) &&
  17. parkAndCheckInterrupt())
  18. interrupted = true;
  19. }
  20. } finally {
  21. if (failed)
  22. cancelAcquire(node);
  23. }
  24. }

重新获取锁成功
微信截图_20210619175517.png

若重新获取锁失败则开始阻塞,阻塞前调用shouldParkAfterFailedAcquire(),先把该节点的前驱节点(pred)的waitStatus变为SIGNAL状态(因为只有SIGNAL状态的节点才能被唤醒)。

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. //若该节点的前一个节点(pred)的waitStatus为SIGNAL,直接返回
  4. if (ws == Node.SIGNAL)
  5. return true;
  6. //若pred节点的waitStatus大于0,即为CANCELLED状态,则要把该pred节点移出队列
  7. if (ws > 0) {
  8. do {
  9. node.prev = pred = pred.prev;
  10. } while (pred.waitStatus > 0);
  11. pred.next = node;
  12. } else {
  13. //其他waitStatus值统一都设为SIGNAL
  14. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  15. }
  16. return false;
  17. }

AQS的设计是通过前驱节点的状态来判断其下一个节点是否能够被唤醒,即head节点其实是没有绑定线程的,真正要唤醒的节点是从队列的第二个节点开始。
微信截图_20210619160301.png

因为head节点初始化后没有对waitStatus有任何操作,所以初始值为0,所以acquireQueued()方法里面要走两次循环,第一次调用parkAndCheckInterrupt把head节点waitStatus置为-1,第二次循环才能调用parkAndCheckInterrupt()方法进行阻塞。
当被唤醒是会返回当前线程有无被中断的判断,(本人测过,当线程调用park()方法被阻塞住,在调用interrupt()方法线程会被唤醒,可以继续执行park()方法后面的逻辑),AQS中有对应的逻辑处理被中断的线程。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this); //直接调用LockSupport.park()方法阻塞线程
  3. return Thread.interrupted();
  4. }

lock()和lockInterruptibly()的区别

lock()支持中断,lockInterruptibly()不支持中断

  1. public void lockInterruptibly() throws InterruptedException {
  2. sync.acquireInterruptibly(1);
  3. }
  4. -----------------------------------------
  5. public final void acquireInterruptibly(int arg) throws InterruptedException {
  6. //检测到线程中断,则抛出异常
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. if (!tryAcquire(arg))
  10. doAcquireInterruptibly(arg);
  11. }

当获取锁失败,进入doAcquireInterruptibly()方法,如果线程被中断,直接抛出异常并把并调用cancelAcquire()方法

  1. private void doAcquireInterruptibly(int arg) throws InterruptedException {
  2. //将节点加入到CLH队列
  3. final Node node = addWaiter(Node.EXCLUSIVE);
  4. boolean failed = true;
  5. try {
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. setHead(node);
  10. p.next = null; // help GC
  11. failed = false;
  12. return;
  13. }
  14. if (shouldParkAfterFailedAcquire(p, node) &&
  15. parkAndCheckInterrupt())
  16. //检测到中断,直接抛出异常
  17. throw new InterruptedException();
  18. }
  19. } finally {
  20. //如果是抛出异常,failed为true
  21. if (failed)
  22. //将节点状态设置为CANCELLED并出队
  23. cancelAcquire(node);
  24. }
  25. }

cancelAcquire()方法主要就是将节点状态置为 CANCELLED

  1. private void cancelAcquire(Node node) {
  2. if (node == null)
  3. return;
  4. node.thread = null;
  5. Node pred = node.prev;
  6. //找到该节点前面第一个非CANCELLED节点
  7. while (pred.waitStatus > 0)
  8. node.prev = pred = pred.prev;
  9. Node predNext = pred.next;
  10. //将节点状态设置为CANCELLED
  11. node.waitStatus = Node.CANCELLED;
  12. //如果该节点是尾节点,直接删除自己
  13. if (node == tail && compareAndSetTail(node, pred)) {
  14. compareAndSetNext(pred, predNext, null);
  15. } else {
  16. int ws;
  17. if (pred != head &&
  18. ((ws = pred.waitStatus) == Node.SIGNAL ||
  19. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  20. pred.thread != null) {
  21. //找到要删除节点的next节点
  22. Node next = node.next;
  23. //将第一个非CANCELLED节点的next节点设置为要删除节点的next节点
  24. if (next != null && next.waitStatus <= 0)
  25. compareAndSetNext(pred, predNext, next);
  26. } else {
  27. unparkSuccessor(node);
  28. }
  29. node.next = node;
  30. }
  31. }

微信截图_20210731161910.png微信截图_20210731162026.png

unlock —- tryRelease()

上面就是线程从抢锁到阻塞的全部过程,接下来就要进行唤醒了,执行unlock()方法

  1. public void unlock() {
  2. sync.release(1);
  3. }

走到了AQS中的release()方法,这里最重要的就是tryRelease()方法

  1. public final boolean release(int arg) {
  2. //尝试释放锁
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. //唤醒队列中的线程
  7. unparkSuccessor(h);
  8. return true;
  9. }
  10. return false;
  11. }

tryRelease()方法主要就是为了把state变量重新置为0,且把exclusiveOwnerThread的值置为null,让其他线程可以去争夺锁

  1. protected final boolean tryRelease(int releases) {
  2. // 上面传入releases的值为1,所以把state减1
  3. int c = getState() - releases;
  4. if (Thread.currentThread() != getExclusiveOwnerThread())
  5. throw new IllegalMonitorStateException();
  6. boolean free = false;
  7. //如果减完1后state的值为0
  8. if (c == 0) {
  9. free = true;
  10. //把绑定当前获取锁的线程的属性exclusiveOwnerThread置为null
  11. setExclusiveOwnerThread(null);
  12. }
  13. //重新设置state的值
  14. setState(c);
  15. return free;
  16. }

unparkSuccessor()方法就是去唤醒队列中的线程,前提条件是head节点的waitStatus值不为0(前面我们已经把head的waitStatus置为-1,即SIGNAL状态),最后调用LockSupport.unpark()方法唤醒线程。

  1. private void unparkSuccessor(Node node) {
  2. int ws = node.waitStatus;
  3. //把head节点的waitStatus置为0
  4. if (ws < 0)
  5. compareAndSetWaitStatus(node, ws, 0);
  6. //获取head节点的下一个节点
  7. Node s = node.next;
  8. //若要唤醒的节点waitStatus > 0,即为CANCELD状态,则要把该节点出队,重新调整CHL队列
  9. if (s == null || s.waitStatus > 0) {
  10. s = null;
  11. for (Node t = tail; t != null && t != node; t = t.prev)
  12. if (t.waitStatus <= 0)
  13. s = t;
  14. }
  15. if (s != null)
  16. //唤醒线程
  17. LockSupport.unpark(s.thread);
  18. }

微信截图_20210619173005.png

被唤醒的线程会重新获取锁,重新走一次循环,这次t1线程执行完业务逻辑并释放锁,在公平锁机制下t2可以获得锁。
微信截图_20210619173450.png
微信截图_20210619175517.png
若有更多的线程来争夺锁,逻辑与前面一样,ReentrantLock的加锁流程就是这么简单。
微信截图_20210619162311.png