synchronized vs AQS
synchronized 的objectMonitor、AQS都是基于管程的MESA模型去实现的
MESA模型中定义了入口等待队列(获取锁相关)、条件等待队列(等待唤醒机制)
有了 synchronized 那么为什么会出现AQS呢?
因为业务场景中需要更加灵活地使用锁
| synchronized | AQS | |
|---|---|---|
| 管程 | objectMonitor是JVM层面(C++)实现的 入口等待队列(cas _owner) 条件等待队列(wait/notify/notifyAll) |
java层面实现 入口等待队列(cas volatile int state) 条件等待队列(condition await/signal/signalAll) |
| 获取锁的灵活性 | 不灵活 | 支持响应中断、超时、尝试获取锁 |
| 释放锁 | 隐式自动释放 | 手动显示调用unlock()释放 |
| 公平性 | 非公平锁 | 支持公平、非公平 |
| 条件队列 | 一个 | 支持多个 |
| 重入 | 可重入 | 可重入 |
AQS
共享资源state
AQS定义了volatile int state,表示资源的可用状态。
state只有三个api: getState() setState() compareAndSetState()
基于state api的不同操作组合可以分为两种锁(在具体锁中去实现):
1)独占

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
2) 共享

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但 没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待 结点返回true,否则返回false。
AQS 的子类可以定义不同的资源实现不同性质的方法。
可重入锁 ReentrantLock , 定义 state 为 0 时可以获取资源并置为 l 。若已获 得资源 , state 不断加1 , 在释放资源时 state 减 1 直至为 0 ;
CountDownLatch 初始时 定义了资源总量 state=count, countDown() 不断将 state 减 l .当 state=0时才能获得锁 , 释放后 state 就一直为 0。 CountDownLatch 是 一次性的 , 用完后如果再想用就只能重新创建一个,如果希望循环使用 , 推荐使用基于 Reen tran tLock 实现的 CyclicBarrier。
Semaphore 与 CountDownLatch 略有不同 , 样也是定义了资源总量 state=permits , 当 state>O 时就能获得锁 , 并将 state 减 1 ;当 state=0 时只能等待其他线程释放锁 , 当释放锁时 state 加1, 其他等待线程又能获得 这个锁。当 Semphore 的 permits 定义为1时,就是互斥锁,当 permits> 1就是共享锁。
入口等待队列:
维护获取锁失败时入队的线程,
数据结构 :CLH队列:一种基于双向链表数据结构的队列,先进先出
synchronized的monitor结构中的cxq是栈结构,先进后出
线程进入同步等待队列代码:
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
同步等待队列的线程park/ unpark后去获取锁
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
条件等待队列:
调用await()的时候会释放锁,然后线程会从入口等待队列加入到条件等待队列,
调用 signal()唤醒的时候会把条件队列中的线程节点移动到入口等待队列中,等待再次获得锁
数据结构:单向链表
通过上述的逻辑,AQS具备的特性:
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断
ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现
ReentrantLock应用在竞争激烈的场景(秒杀),synchronized 应用在竞争相对不激烈的场景
加锁
三个线程竞争ReentrantLock的加锁代码执行过程
ReentrantLock非公平锁的实现代码
final void lock() {//对应上图的thread1加锁成功if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else//对应上图的thread2加锁失败,进入等待队列acquire(1);}
public final void acquire(int arg) {//再重试去获取锁if (!tryAcquire(arg) &&//进入等待队列acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {//exclusiveOwnerThread用于判断可重入int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
//值得学习的代码,多线程情况下for循环+cas 确保链表队列初始化成功,并且node节点入队成功//Inserts node into queue, initializing if necessary.private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// for循环使得入队线程堵塞
// for循环使得堵塞的线程唤醒之后又会去获取锁
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
解锁

