synchronized vs AQS
synchronized 的objectMonitor、AQS都是基于管程的MESA模型去实现的
MESA模型中定义了入口等待队列(获取锁相关)、条件等待队列(等待唤醒机制)
有了 synchronized 那么为什么会出现AQS呢?
因为业务场景中需要更加灵活地使用锁
synchronized | AQS | |
---|---|---|
管程 | objectMonitor是JVM层面(C++)实现的 入口等待队列(cas _owner) 条件等待队列(wait/notify/notifyAll) |
java层面实现 入口等待队列(cas volatile int state) 条件等待队列(condition await/signal/signalAll) |
获取锁的灵活性 | 不灵活 | 支持响应中断、超时、尝试获取锁 |
释放锁 | 隐式自动释放 | 手动显示调用unlock()释放 |
公平性 | 非公平锁 | 支持公平、非公平 |
条件队列 | 一个 | 支持多个 |
重入 | 可重入 | 可重入 |
AQS
共享资源state
AQS定义了volatile int state,表示资源的可用状态。
state只有三个api: getState() setState() compareAndSetState()
基于state api的不同操作组合可以分为两种锁(在具体锁中去实现):
1)独占
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
2) 共享
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但 没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待 结点返回true,否则返回false。
AQS 的子类可以定义不同的资源实现不同性质的方法。
可重入锁 ReentrantLock , 定义 state 为 0 时可以获取资源并置为 l 。若已获 得资源 , state 不断加1 , 在释放资源时 state 减 1 直至为 0 ;
CountDownLatch 初始时 定义了资源总量 state=count, countDown() 不断将 state 减 l .当 state=0时才能获得锁 , 释放后 state 就一直为 0。 CountDownLatch 是 一次性的 , 用完后如果再想用就只能重新创建一个,如果希望循环使用 , 推荐使用基于 Reen tran tLock 实现的 CyclicBarrier。
Semaphore 与 CountDownLatch 略有不同 , 样也是定义了资源总量 state=permits , 当 state>O 时就能获得锁 , 并将 state 减 1 ;当 state=0 时只能等待其他线程释放锁 , 当释放锁时 state 加1, 其他等待线程又能获得 这个锁。当 Semphore 的 permits 定义为1时,就是互斥锁,当 permits> 1就是共享锁。
入口等待队列:
维护获取锁失败时入队的线程,
数据结构 :CLH队列:一种基于双向链表数据结构的队列,先进先出
synchronized的monitor结构中的cxq是栈结构,先进后出
线程进入同步等待队列代码:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
同步等待队列的线程park/ unpark后去获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
条件等待队列:
调用await()的时候会释放锁,然后线程会从入口等待队列加入到条件等待队列,
调用 signal()唤醒的时候会把条件队列中的线程节点移动到入口等待队列中,等待再次获得锁
数据结构:单向链表
通过上述的逻辑,AQS具备的特性:
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断
ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现
ReentrantLock应用在竞争激烈的场景(秒杀),synchronized 应用在竞争相对不激烈的场景
加锁
三个线程竞争ReentrantLock的加锁代码执行过程
ReentrantLock非公平锁的实现代码
final void lock() {
//对应上图的thread1加锁成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//对应上图的thread2加锁失败,进入等待队列
acquire(1);
}
public final void acquire(int arg) {
//再重试去获取锁
if (!tryAcquire(arg) &&
//进入等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//exclusiveOwnerThread用于判断可重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//值得学习的代码,多线程情况下for循环+cas 确保链表队列初始化成功,并且node节点入队成功
//Inserts node into queue, initializing if necessary.
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// for循环使得入队线程堵塞
// for循环使得堵塞的线程唤醒之后又会去获取锁
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}