synchronized vs AQS

synchronized 的objectMonitor、AQS都是基于管程的MESA模型去实现的
MESA模型中定义了入口等待队列(获取锁相关)、条件等待队列(等待唤醒机制)

有了 synchronized 那么为什么会出现AQS呢?

因为业务场景中需要更加灵活地使用锁

synchronized AQS
管程 objectMonitor是JVM层面(C++)实现的

入口等待队列(cas _owner)
条件等待队列(wait/notify/notifyAll)
java层面实现

入口等待队列(cas volatile int state)
条件等待队列(condition await/signal/signalAll)
获取锁的灵活性 不灵活 支持响应中断、超时、尝试获取锁
释放锁 隐式自动释放 手动显示调用unlock()释放
公平性 非公平锁 支持公平、非公平
条件队列 一个 支持多个
重入 可重入 可重入

AQS

共享资源state

AQS定义了volatile int state,表示资源的可用状态。
state只有三个api: getState() setState() compareAndSetState()
基于state api的不同操作组合可以分为两种锁(在具体锁中去实现):

1)独占

image.png
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

2) 共享

image.png
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但 没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待 结点返回true,否则返回false。

AQS 的子类可以定义不同的资源实现不同性质的方法。
可重入锁 ReentrantLock , 定义 state 为 0 时可以获取资源并置为 l 。若已获 得资源 , state 不断加1 , 在释放资源时 state 减 1 直至为 0 ;
CountDownLatch 初始时 定义了资源总量 state=count, countDown() 不断将 state 减 l .当 state=0时才能获得锁 , 释放后 state 就一直为 0。 CountDownLatch 是 一次性的 , 用完后如果再想用就只能重新创建一个,如果希望循环使用 , 推荐使用基于 Reen tran tLock 实现的 CyclicBarrier。
Semaphore 与 CountDownLatch 略有不同 , 样也是定义了资源总量 state=permits , 当 state>O 时就能获得锁 , 并将 state 减 1 ;当 state=0 时只能等待其他线程释放锁 , 当释放锁时 state 加1, 其他等待线程又能获得 这个锁。当 Semphore 的 permits 定义为1时,就是互斥锁,当 permits> 1就是共享锁。

入口等待队列:

维护获取锁失败时入队的线程,
数据结构 :CLH队列:一种基于双向链表数据结构的队列,先进先出
synchronized的monitor结构中的cxq是栈结构,先进后出

线程进入同步等待队列代码:

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

同步等待队列的线程park/ unpark后去获取锁

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

条件等待队列:

调用await()的时候会释放锁,然后线程会从入口等待队列加入到条件等待队列,
调用 signal()唤醒的时候会把条件队列中的线程节点移动到入口等待队列中,等待再次获得锁
数据结构:单向链表

通过上述的逻辑,AQS具备的特性:
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断

ReentrantLock

ReentrantLock是一种基于AQS框架的应用实现
ReentrantLock应用在竞争激烈的场景(秒杀),synchronized 应用在竞争相对不激烈的场景

加锁

三个线程竞争ReentrantLock的加锁代码执行过程
image.png
ReentrantLock非公平锁的实现代码

  1. final void lock() {
  2. //对应上图的thread1加锁成功
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. //对应上图的thread2加锁失败,进入等待队列
  7. acquire(1);
  8. }
  1. public final void acquire(int arg) {
  2. //再重试去获取锁
  3. if (!tryAcquire(arg) &&
  4. //进入等待队列
  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }
  1. protected final boolean tryAcquire(int acquires) {
  2. return nonfairTryAcquire(acquires);
  3. }
  4. final boolean nonfairTryAcquire(int acquires) {
  5. final Thread current = Thread.currentThread();
  6. int c = getState();
  7. if (c == 0) {
  8. if (compareAndSetState(0, acquires)) {
  9. setExclusiveOwnerThread(current);
  10. return true;
  11. }
  12. }
  13. else if (current == getExclusiveOwnerThread()) {
  14. //exclusiveOwnerThread用于判断可重入
  15. int nextc = c + acquires;
  16. if (nextc < 0) // overflow
  17. throw new Error("Maximum lock count exceeded");
  18. setState(nextc);
  19. return true;
  20. }
  21. return false;
  22. }
  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }
  1. //值得学习的代码,多线程情况下for循环+cas 确保链表队列初始化成功,并且node节点入队成功
  2. //Inserts node into queue, initializing if necessary.
  3. private Node enq(final Node node) {
  4. for (;;) {
  5. Node t = tail;
  6. if (t == null) { // Must initialize
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else {
  10. node.prev = t;
  11. if (compareAndSetTail(t, node)) {
  12. t.next = node;
  13. return t;
  14. }
  15. }
  16. }
  17. }
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // for循环使得入队线程堵塞
            // for循环使得堵塞的线程唤醒之后又会去获取锁
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

解锁

image.png