为什么有了Synchronzied还需要Lock

image.png

  1. // Lock
  2. class Test {
  3. public void test1() {
  4. ReentrantLock reentrantLock = new ReentrantLock();
  5. // 加锁
  6. reentrantLock.lock();
  7. // do something ....
  8. // 解锁
  9. reentrantLock.unlock();
  10. }
  11. }
public interface Lock {
    // 尝试获取锁,获取锁成功后返回
    void lock();

    // 可中断的获取锁。所谓可中断的意思就是,在锁的获取过程中可以中断当前线程
    void lockInterruptibly() throws InterruptedException;

    // 尝试非阻塞的获取锁。不同于 lock() 方法在锁获取成功后再返回,该方法被调用后就会立即返回。如果最终获取锁成功返回 true,否则返回 false
    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

AQS

简介

AQS 是一个抽象类,是用来构建锁或者其他同步组件的基础框架,它使用了一个 volaitle 修饰的 int 成员变量 state 表示同步状态,通过内置的 FIFO 双向队列(源码注释上写的 CLH(Craig,Landin,and Hagersten) 队列(三个人名的简称),其实就是一个先进先出的双向队列)来完成线程们获取资源的时候的排队工作。
具体来说,如果某个线程请求锁(共享资源)失败,则该线程就会被加入到 CLH 队列的末端。当持有锁的线程释放锁之后,会唤醒其后继节点,这个后继节点就可以开始尝试获取锁。

image.png
image.png
第一个节点就是成功获取同步资源的节点(为了和头节点 head 区分开,这里我们将他称为 “首节点” 吧),首节点的线程 A 在释放同步资源时,将会唤醒器其后继节点 B,而后继节点 B 被唤醒后,就会重新尝试加锁,同样还是 CAS 操作给 state 变量加 1,如果成功,就将自己设置为首节点。如下图:
image.png

AQS 的两套模式

AQS 采用模版方法模式,拥有独占和共享两种实现
image.png

独占模式

独占锁的Acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 此方法会尝试去获取锁
        // 将当前线程加入 CLH 队列中
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        selfInterrupt();
}

方法很好理解,首先 tryAcquire 会去尝试获取锁,如果获取锁成功,则方法结束(selfInterrupt);如果获取锁失败,则会将通过 addWaiter 方法构造一个 EXCLUSIVE 节点,将该节点加入到 CLH 队列的尾端,然后调用 acquireQueued 方法从队列中排队获取锁。
这里多提一嘴,tryAcquire 这个方法是 AQS 开放给子类进行重写的,所以子类可以自定义这个方法来实现公平竞争或者非公平竞争(是否允许插队)。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 当前节点的前驱节点
            final Node p = node.predecessor();
            // 如果当前节点的前驱节点是头节点并且成功获取锁,则当前线程获取到独占锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获取独占锁失败,则当前线程进入等待态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

1)if (p == head && tryAcquire(arg)) 如果当前节点的前驱节点是头节点并且成功获取锁,则当前线程获取到独占锁。所谓当前节点的前驱节点是头节点(不存储任何数据的节点),意思就是当前节点是第一个元素节点(在上篇文章中为了和头节点做区分,我们把它称作首节点)。也就是说,当某个节点是头节点的后继节点的时候,就表示之前持有独占锁的线程已经释放了,于是这个时候当前线程就有资格去获取独占锁了。

另外,这样设计也能够保证 CLH 队列的 FIFO 原则。

2)for(;;),这是啥?没错,死循环,自旋。也就是说我们新加入到 CLH 队列的这个节点会以 “死循环” 的方式获取锁。如果获取锁失败,则阻塞该节点中的线程。只有当这个节点的前驱节点出队了,或者该阻塞线程被中断了,这个被阻塞线程才会被唤醒。
image.png

独占锁的释放

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • 在获取同步状态(锁)时,AQS 维护一个 CLH 双向队列,获取锁失败的线程都会通过 CAS 操作被加入到队列尾端,并且在队列中无限自旋等待获取锁;
  • 停止自旋(或者说被移除 CLH 队列)的条件是其前驱节点为头节点并且成功获取了独占锁;
  • 当前节点(线程)成功释放掉独占锁后,AQS 就会紧接着唤醒该节点的后继节点,这样,这个后继节点又会开始去尝试获取锁。循此往复。

小结:独占锁的释放和获取流程

  • 在获取同步状态(锁)时,AQS 维护一个 CLH 双向队列,获取锁失败的线程都会通过 CAS 操作被加入到队列尾端,并且在队列中无限自旋等待获取锁;
  • 停止自旋(或者说被移除 CLH 队列)的条件是其前驱节点为头节点并且成功获取了独占锁;
  • 当前节点(线程)成功释放掉独占锁后,AQS 就会紧接着唤醒该节点的后继节点,这样,这个后继节点又会开始去尝试获取锁。循此往复。


    共享模式

    共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态(锁)。

  • 在获取同步状态(锁)时,AQS 维护一个 CLH 双向队列,获取锁失败的线程都会通过 CAS 操作被加入到队列尾端,并且在队列中无限自旋等待获取锁;

  • 停止自旋(或者说被移除 CLH 队列)的条件是其前驱节点为头节点并且成功获取了共享锁;
  • 通过循环 + CAS 操作确保当前节点(线程)成功释放掉共享锁后,AQS 就会紧接着唤醒该节点的后继节点,这样,这个后继节点又会开始去尝试获取锁。循此往复。

ReentrantLock

公平锁与非公平锁

image.png
可以看到,Snyc 是 ReentrantLock 的内部类并且继承自 AQS,同时,Sync 还拥有两个子类:NonfairSync、FairSync。从名字也就可以看出来,这两个子类分别用来实现非公平锁和公平锁的相关操作。
无参构造默认就是非公平锁:
image.png
image.png

image.png

Reentrant lock 的加锁流程
ReentrantLock.lock -> Sync.lock -> NonfairSync/FairSync.lock -> AQS.acquire -> NonfairSync/FairSync.tryAcquire()

非公平锁的 tryAcquire 调用的就是 nonfairTryAcquire 方法,这个在上一节我们已经解释过了,把 nonfairTryAcquire 方法提到 Sync 中去的原因,就是因为 ReentrantLock 想要它的 tryLock 这个方法无论什么情况都以非公平的方式去获取锁:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

公平锁的TryAcquire 方法:
image.png
判断是否有前驱结点:
image.png
如果等待队列中当前节点(尝试获取锁的当前线程)存在前驱节点,则表示有线程比当前线程更早地请求获取锁,所以,公平来讲,当前线程需要等待前驱线程获取并成功释放锁之后才能去尝试获取锁。

可以发现,ReentrantLock 无论是无参构造还是 tryLock 方法,使用的都是非公平锁的方式。这是为啥呢?因为非公平锁的性能更高。

公平锁为了保证公平,保证按照时间顺序来获取锁,就必定要进行频繁的线程上下文切换,而非公平锁不需要,谁 CAS 成功了谁就能拿到锁,极少的线程切换保证了其更大的吞吐量。

当然,公平锁的存在自然有它的意义,虽然说非公平锁的性能更好,但是存在线程 “饥饿” 现象,也就是说由于非公平锁采用了争抢的方式,有可能导致某些线程永远也获取不到锁。
image.png

public class Test {    
    static Map<String, Object> map = new HashMap<String, Object>();  
    // 读写锁
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();    
    // 读锁
    static Lock readLock = rwl.readLock();    
    // 写锁
    static Lock writeLock = rwl.writeLock();    
    // 获取一个 key 对应的 value    
    public static final Object get(String key) { 
        // 加读锁
        readLock.lock();            
        try {                    
            return map.get(key);            
        } finally {
            // 释放读锁
            readLock.unlock();            
        }    
    }    
    // 设置 key 对应的 value,并返回旧的 value    
    public static final Object put(String key, Object value) {    
        // 加写锁
        writeLock.lock();            
        try {                    
            return map.put(key, value);            
        } finally {
            // 释放写锁
            writeLock.unlock();            
        }    
    }    
}

ReentrantWriteLock 的具体实现

  • 读写状态的设计
  • 写锁的获取与释放
  • 读锁的获取与释放

各位先回顾下 ReentrantLock 中是怎么设置锁的状态的,没错,就是那个 state 变量,通过对 state 变量的加减来表示该锁被同一个线程获取的次数,从而实现可重入操作。

ReentrantLock 是排他锁,同一个时间要么是读线程在占有它,要么是写线程在占有它,所以这个 state 变量只需要维护一个特征就行了。但是,ReentrantReadWriteLock 同时需要维护读锁和写锁,写锁是排他锁,读锁是共享锁,也就是说它需要用这个 state 变量去维护多个读线程和当前写线程的状态。

举个例子,如果当前线程在获取写锁时,读锁已经被获取(读状态不为 0),或者,写锁已经被获取并且该线程不是已经获取写锁的线程,那么,当前线程就会进入等待状态。

那如何在一个整型变量上维护多种状态呢?

这里就需要 “按位切割使用” 的方法,即将 32 位的整型变量 state 切分成两个部分,高 16 位表示读,低 16 位表示写:
image.png
如上图,当前 state 变量表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。
ReentrantReadWriteLock 是如何从这个变量中快速确定读和写各自的状态呢?
位运算!
假设当前同步状态值为 S,计算写状态很简单,将高 16 位全部抹去就好了,即 S & 0x0000FFFF,当写状态增加1时,等于 S+1;
读状态等于 S>>>16,即无符号补 0 右移 16 位。当读状态增加1时,等于 S+(1<<16),也就是 S+0x00010000。

写锁的获取和释放

image.png

和 ReentrantLock 差不多,除了可重入的处理,这里还增加了一个对读锁的排他处理。各位可以代码中的那一行注释:
具体来说,代码中 c 这个变量获取到的就是 state 的值,w 这个变量表示的就是 state 中低 16 位的写状态,这里其实有一个推论,那就是如果 state != 0,而 state 的低 16 位 = 0,那是不是就表明 state 的高 16 为一定 != 0,对吧,很好理解。也就是说,如果整体的 state 状态不为 0,而写锁状态为 0,那么说明读锁一定大于 0,即表示读锁已经被获取。如果存在读锁,那么写锁不能被获取。

锁的降级

注意,读写锁中的锁降级不要和 synchronized 的锁升级(锁膨胀)搞混了,事实上这两者就不是一个维度的概念,synchronized 中的锁升级是 JVM 底层帮助我们自动实现的,而读写锁中的锁升级是我们在代码中自己手动制造出来的。
解释一下:读写锁中的锁降级指的是,写锁降级成为读锁。即把持住当前线程所拥有的写锁,然后获取到读锁,随后释放先前拥有的写锁。可以看下 ReentrantReadWriteLock 源码中关于锁降级的示例:

void processCachedData() {
    // 获取读锁
    rwl.readLock().lock();
    if (!cacheValid) {
        // 先释放读锁
        rwl.readLock().unlock();
        // 获取写锁
        rwl.writeLock().lock();
        try {
            // Recheck state because another thread might have
            // acquired write lock and changed state before we did.
            if (!cacheValid) {
                data = ...
                cacheValid = true;
            }
              // 把持住当前线程所拥有的写锁,然后获取读锁
              rwl.readLock().lock();
        } finally {
              // 随后释放先前拥有的写锁
              rwl.writeLock().unlock(); 
        }
        // 锁降级完成,写锁降级为读锁
    }

    try {
        use(data);
    } finally {
        rwl.readLock().unlock();
      }
}

那,锁降级的应用场景是什么呢?

举个例子,线程 A 修改了共享变量(加写锁)并且马上就想使用它(加读锁),那如果线程 A 直接释放写锁然后再去加读锁的话(分段获取),假设此刻另一个线程 B 比线程 A 先一步获取了写锁并修改了数据,那么线程 A 是无法感知到线程 B 所做的数据更新的。
如果线程 A 遵循锁降级的步骤,则线程 B 试图进行获取写锁进行更新的时候将会被阻塞住,直到线程 A 使用完数据并释放读锁之后,线程 B 才能获取写锁进行数据更新。
锁降级的主要作用是保证数据的可见行。

Condition 接口

回顾下 synchronized 的知识,我们可以通过 Object 类中的 wait 和 notify/notifyAll 方法来随时随地的去挂起和唤醒线程,那对于 Lock 来说,它是通过 Condition 这个接口来实现的:
image.png
image.png
Condition 的基本使用:

// 创建 Lock 对象
Lock lock = new ReentrantLock();
// 通过 Lock 对象创建 Condition 对象
Condition condition = lock.newCondition(); 

public void conditionWait() throws InterruptedException {    
    // 加锁  
    lock.lock();  
    try {     
        // 挂起线程
        condition.await(); 
    } finally {       
        // 解锁
        lock.unlock();    
    }
}

public void conditionSignal() throws InterruptedException {    
    lock.lock();    
    try {       
        // 唤醒线程
        condition.signal(); 
    } finally {            
        lock.unlock();    
    }
}

image.png
image.png
我们把 AQS 中的 CLH 队列称为同步队列。

可以看到,Condition 中等待队列的节点实现和同步队列是一样的,用的都是 AQS 中的 Node 节点。

不同之处在于,CLH 同步队列是一个 FIFO 的双向队列,而 Condition 等待队列是一个 FIFO 的单向队列!

image.png

await

image.png

Signal

image.png