参考
https://masterwangzx.com/2021/01/11/java-aqs/
队列同步器 AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,在其基础上实现了 Reentrantlock, ReentrantReadWriteLock, CountDownLatch 等。
AQS
队列同步器 AbstractQueuedSynchronizer,AQS,使用一个整型的 volatile 修饰的变量 state 来维护同步状态。
AQS 的设计是基于模板方法模式的,子类通过继承 AQS 并实现它的抽象方法和使用 getState(), setState(), compareAndSetState() 方法进行同步状态管理。
锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节。 AQS 是面向的是锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和 AQS 很好地隔离使用者和实现者所需关注的领域。
可重写方法
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于 0 的值,表示获取成功,反之获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldexClusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被 当前线程所独占 |
模板方法
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态。如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待。该方法将会调用重写的 tryAcquire() 方法 |
void acquirelnterruptibly(int arg) | 与 acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException |
boolean tryAcquireNanos(int arg, long nanos) | 在 acquirelnterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回 false,如果获取到了返回 true |
void acquireShared(int arg) | 共享式的获取同步状态、如果当前线程未获取到同步状态、将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。该方法将会调用重写的 tryAcquireShared() 方法 |
void acquireSharedIntemuptibly(int arg) | 与 acquireShared(int arg) 相同,该方法响应中断 |
boolean tryAcquireSharedNanos(in arg, long nanos) | 在 acquireSharedIntemuptibly(int arg) 基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第 一个节点包含的线程唤醒。该方法将会调用重写的 tryRelease() 方法 |
boolean releaseShared(int arg) | 共享式的释放同步状态。该方法将会调用重写的 tryReleaseShared() 方法 |
Collection |
获取等待在同步队列上的线程集合 |
同步队列
AQS 依赖内部的同步队列来完成同步状态的管理,这是一个 FIPO 双向队列。如下图所示,当前线程获取同步状态失败时,AQS 会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
- 由于线程获取同步状态存在并发情况,所以使用基于 CAS 的设置尾结点方法
首节点释放同步状态时会唤醒其后继结点,后继结点获取同步状态成功后将自己设置为首节点。由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用 CAS 来保证
acquire()
AQS 中的模板方法,用于独占式获取同步状态。如下所示
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }调用重写方法 tryAcquire() 获取同步状态
- 如果获取同步状态失败,则调用 addWaiter() 方法使用 CAS 方法向同步队列中添加结点
- acquireQueued() 方法内是一个死循环,当其前驱结点是头结点且通过调用 tryAcquire() 成功获取同步状态时设置当前结点为头结点并返回,否则通过 LockSupport.park(this) 阻塞
由于存在以上流程,所以等待队列中的节点都在自旋获得同步状态 (阻塞)
release()
AQS 中的模板方法,用于独占式的释放同步状态,如下所示
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
- 调用重写方法 tryRelease() 释放同步状态
如果成功释放同步状态,则调用 unparkSuccessor() 通过 LockSupport.unpark(object) 唤醒头结点的后继结点
acquireShared()
AQS 中的模板方法,用于共享式的获取同步状态,如下所示
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }调用重写方法 tryAcquireShared() 获取同步状态
如果获取同步状态失败,则调用 doAcquireShared() 方法,与 acquireQueued() 类似,采用自旋与 park 阻塞的方式
releaseShared()
AQS 中的模板方法,用于共享式的释放同步状态。如下所示,与 release() 方法类似,区别是存在并发的情况,需要通过循环和 CAS 保证线程安全
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }ReentrantLock
ReentrantLock 的实现依赖于 AQS,该锁是可重入的。通过继承 AQS 的静态内部类来实现公平锁 FairSync 和非公平锁 NonfairSync,将 ReentrantLock 的 api 代理到 FairSync 或 NonfairSync 上。无论是公平锁还是非公平锁,ReentrantLock::tryLock() 调用的是 Sync.nonfairTryAcquire()。
无论是公平锁还是非公平锁,释放锁的实现都是一致的,调用栈为 ReentrantLock::unlock()->AQS::release(int arg)->Sync::tryRelease(int releases),下面分析一下 Sync::tryRelease(int releases) 的源码
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }如果不是持有锁的线程不是当前线程,则抛出异常
修改 state 后,若为 0 才能释放锁,处理了重入的情况,返回 true 并设置持有锁线程为 null
公平锁
公平锁保证先获取锁的线程一定先被满足,相比于非公平锁效率较低,但能减少饥饿的情况。加锁的调用栈为 ReentrantLock::lock()->FairSync::lock()->AQS::acquire(int arg)->FairSync::tryAcquire()。下面分析一下 FairSync::tryAcquire() 的源码
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error(“Maximum lock count exceeded”); setState(nextc); return true; } return false; } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }如果 state == 0,说明没有线程获得锁,CAS 修改 state 并设置持有锁的线程为当前线程。期间会调用 hasQueuedPredecessors() 方法,如果有当前结点在同步队列中存在前驱结点,则返回 true 让出资源
- 如果获得锁的线程与当前线程一致,修改 state 以标记重入
-
非公平锁
非公平锁不能保证先获取锁的线程一定先被满足,相比于公平锁效率更高,但存在饥饿的情况。加锁的调用栈为 ReentrantLock::lock()->NonfairSync::lock()->AQS::compareAndSetState(), setExclusiveOwnerThread(), acquire()->NonFairSync::tryAcquire()->Sync.nonfairTryAcquire()。
下面分析一下 NonfairSync::lock() 的源码,如果 state == 0 说明没有线程获得锁,通过 CAS 更新 state 并设置锁持有线程为当前线程并返回。否则调用 AQS::acquire()
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
Sync.nonfairTryAcquire() 负责处理锁被其他线程获得的情况。相比于公平锁的对应方法 FairSync::tryAcquire(),少了 hasQueuedPredecessors() 的调用ReentrantReadWriteLock
ReentrantReadWriteLock 的实现依赖于 AQS,读锁是共享式的,写锁是独占式的。state 的高 16 位表示读锁状态,低 16 位表示写锁状态。下面以公平锁来分析读锁和写锁的实现
写锁
写锁是独占式锁且与读锁也是互斥的。释放锁的过程与 ReentrantLock 类似。加锁的调用栈为 WriteLock::lock()->FairSync::lock()->AQS::acquire(int arg)->Sync::tryAcquire()。下面分析一下 Sync::tryAcquire() 的源码
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // c: 读锁状态+w int c = getState(); // w: 写锁状态 int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAXCOUNT) throw new Error(“Maximum lock count exceeded”); // 可重入_ setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 如果 c!=0 且 w=0,说明存在读锁并且当前线程没有获得读锁,此时写锁不能获得
- 如果 c!=0 且 w!=0 且 current != getExclusiveOwnerThread(),说明当前线程获得了读锁和写锁,此时处理重入的情况
-
读锁
读锁是共享式锁且与写锁是互斥的。加锁的调用栈为 ReadLock::lock()->AQS::acquireShared()->Sync::tryAcquireShared(int arg)。由于代码过长,这里不进行源码分析,直接给出分析结果
如果写状态为 0 时,读锁总会被成功地获取,线程安全的增加读状态
- 如果当前线程已经获取了读锁,则线程安全的增加读状态
-
锁降级
锁降级指的是写锁降级为读锁,为了保证数据的可见性。下面用一个小例子说明
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必须先释放读锁 rwl.readLock().unlock(); // 必须持有写锁 rwl.writeLock().lock(); try { if (!cacheValid) { // 准备数据 data = … cacheValid = true; } // 如果之前没有持有写锁, 那么其他线程可能获得写锁并修改准备好的数据 // 获得读锁 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } // 完成锁降级, 阻止其他线程改变准备好的数据, 保证准备数据的可见性在本线程内 } try { use(data); } finally { rwl.readLock().unlock(); } } }Condition
Condition 定义了等待 / 通知两种类型的方法,依赖于 Lock 对象。ConditionObject 是 AQS 的内部类,因为 Condition 的操作需要获取相关锁。
等待队列
等待队列是一个 FIFO 的队列,在队列中的每个节点都包含了一个 线程引用,该线程就是在 Condition 对象上等待的线程。由于 Condition 的调用方法必定受到了锁的保护,所以等待队列新增结点没有 CAS 保证。
如图所示,对于一个 AQS 对象来说,拥有一个同步队列和多个属于不同 Condition 对象的等待队列。
await()
将当前线程阻塞在此 Condition 上直到收到了 signal() 唤醒或者中断。代码如下所示
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 当前线程加入等待队列 Node node = addConditionWaiter(); // 释放锁, 即移出同步队列 long savedState = fullyRelease(node); int interruptMode = 0; // 是否是等待队列的头结点, 循环+park while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 获取同步状态, 竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
signal()
唤醒一个阻塞在此 Condition 上的线程,必须获得与此 Condition 有关的锁。代码如下所示
public final void signal() { // 当前线程必须是获取锁的线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 唤醒等待队列的第一个结点 Node first = firstWaiter; if (first != null) doSignal(first); }
其他并发工具类
CountdownLatch: 允许一个或多个线程等待其他线程完成操作
- CyclicBarrier: 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续运行
- Semaphore: 控制同时访问特定资源的线程数量。它通过协调各个线程,以保证合理的使用公共资源
- Exchanger: 进行线程间的数据交换。两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点时,才进行数据交换。
CountdownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset 方法重置。所以 CyclicBarrier 能处理更为复杂的业务场景。