ReentrantReadWriteLock类的使用

读写锁介绍

简介

Java中的读写锁就是ReentrantReadWriteLock对象,它维护一对锁,即写锁和读锁。其中读锁可以通过ReentrantReadWriteLock的 readLock 方法获取,写锁可以通过 writeLock 方法获取,ReadLock是共享锁,而WriteLock是独占锁。

由来

那么为什么要出现读写锁?因为在读多写少的时候ReentrantLock的性能比较低,读操作是不存在并发安全的问题,理论上应该并发执行来提高效率,但是如果使用了ReentrantLock,即使是进行读操作,也只能一个个线程去排队获取锁,这里大幅度的降低了并行度,效率比较低。因此,这个时候就出现了读锁,当线程需要进行读操作时,需要先获取读锁,并且读锁时共享锁,可以被多个线程同时获取,也就是在只要读操作的情况下,多个线程可以同时安全的并发执行。但是,如果有线程正在进行写操作,也就是获得了写锁,这样为了确保安全,其它的读线程和写线程都会被阻塞,无法在获取读锁和写锁。因此,读写锁是适合读多写少的情况,它的作用为下面两点:

  • 在同一时间,可以允许多个读线程同时获取读锁进行读操作
  • 但是,,在写线程访问时,所有读线程和写线程都会无法获取读锁和写锁

    主要特征

  • 可重入:ReentrantReadWriteLock的读锁和写锁都是可重入的

  • 支持公平锁和非公平锁:ReentrantReadWriteLock是可以切换公平锁和非公平锁的
  • 降级性:ReentrantReadWriteLock允许写锁降低为读锁,但是不允许读写升级为写锁

    使用示例

    创建锁对象

    1. //创建读写锁对象
    2. ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    3. //通过读写锁对象获取读锁
    4. ReentrantReadWriteLock.ReadLock r = rw.readLock();
    5. //通过读写锁对象获取写锁
    6. ReentrantReadWriteLock.WriteLock w = rw.writeLock();

    特征验证

    1. @Slf4j(topic = "c.DataContainer")
    2. public class DataContainer {
    3. private String data = "666";
    4. private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    5. private final ReentrantReadWriteLock.ReadLock r = rw.readLock();
    6. private final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
    7. public void read() {
    8. r.lock();
    9. log.debug("获取读锁...");
    10. try {
    11. log.debug("读取数据:" + data);
    12. Thread.sleep(1000);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. } finally {
    16. log.debug("释放读锁...");
    17. r.unlock();
    18. }
    19. }
    20. public void write() {
    21. w.lock();
    22. log.debug("获取写锁...");
    23. try {
    24. log.debug("写入数据:" + data);
    25. Thread.sleep(1000);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. } finally {
    29. log.debug("释放写锁...");
    30. w.unlock();
    31. }
    32. }
    33. }

    “读-读”并发

    1. public static void main(String[] args) throws InterruptedException {
    2. DataContainer dataContainer = new DataContainer();
    3. new Thread(() -> {
    4. dataContainer.read();
    5. }, "t1").start();
    6. new Thread(() -> {
    7. dataContainer.read();
    8. }, "t2").start();
    9. }

    image.png
    可以看到,线程t1和t2都是同时获取并释放读锁的,也就是不具有排他性,验证“读-读”是并发执行的。

    “读-写”互斥

    1. public static void main(String[] args) throws InterruptedException {
    2. DataContainer dataContainer = new DataContainer();
    3. new Thread(() -> {
    4. dataContainer.read();
    5. }, "t1").start();
    6. new Thread(() -> {
    7. dataContainer.write();
    8. }, "t2").start();
    9. }

    image.png
    可以看到用时3s(读1s+写2s),是串行执行的,因为t1先获取到读锁,然后t2获取到写锁,写入操作在等待读取操作结束后(释放读锁)才进行。我们把读操作和写操作的先后顺序调换一下:

    1. public static void main(String[] args) throws InterruptedException {
    2. DataContainer dataContainer = new DataContainer();
    3. new Thread(() -> {
    4. dataContainer.write();
    5. }, "t2").start();
    6. new Thread(() -> {
    7. dataContainer.read();
    8. }, "t1").start();
    9. }

    image.png
    同样也是在等待写锁释放才进行读操作,没有并发执行,是互斥的。

    “写-写”互斥

    1. public static void main(String[] args) throws InterruptedException {
    2. DataContainer dataContainer = new DataContainer();
    3. new Thread(() -> {
    4. dataContainer.write();
    5. }, "t2").start();
    6. new Thread(() -> {
    7. dataContainer.write();
    8. }, "t1").start();
    9. }

    image.png
    可以看到具体很明显排他性。

    可重入性

    读锁不能升级为写锁验证:

    1. public class CachedData {
    2. String data;
    3. // 是否有效,如果失效,需要重新计算 data
    4. volatile boolean cacheValid;
    5. final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    6. final ReentrantReadWriteLock.ReadLock r = rw.readLock();
    7. final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
    8. void test() {
    9. r.lock();
    10. try {
    11. System.out.println("111");
    12. w.lock();
    13. try {
    14. System.out.println("222");
    15. } finally {
    16. w.unlock();
    17. }
    18. } finally {
    19. r.unlock();
    20. }
    21. }
    22. public static void main(String[] args) {
    23. CachedData cachedData = new CachedData();
    24. cachedData.test();
    25. }
    26. }

    image.png
    可以看到程序一直无法正常运行下去,因为在持有读锁时还尝试获取写锁,但是由于“读-写”互斥的特征,导致尝试获取写锁时一直在等待读锁的释放,但是读锁不会释放,造成死锁。

写锁可以降级为读锁的验证:

  1. public class CachedData {
  2. String data;
  3. // 是否有效,如果失效,需要重新计算 data
  4. volatile boolean cacheValid;
  5. final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
  6. final ReentrantReadWriteLock.ReadLock r = rw.readLock();
  7. final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
  8. void processCachedData() {
  9. r.lock();
  10. if (!cacheValid) {
  11. // 获取写锁前必须释放读锁
  12. r.unlock();
  13. w.lock();
  14. try {
  15. // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
  16. if (!cacheValid) {
  17. data = "666";
  18. cacheValid = true;
  19. }
  20. // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
  21. w.lock();
  22. } finally {
  23. w.unlock();
  24. }
  25. }
  26. // 自己用完数据, 释放读锁
  27. try {
  28. System.out.println("用完数据");
  29. } finally {
  30. w.unlock();
  31. }
  32. }
  33. public static void main(String[] args) {
  34. CachedData cachedData = new CachedData();
  35. cachedData.CachedData();
  36. }
  37. }

image.png
可以看到在持有写锁并且释放后可以再获取读锁,但是特别注意如果写锁没有释放那么还是会造成死锁。

读写锁的应用

缓存更新策略

如果使用缓存来缓存数据,那么当数据需要更新的时候,我们应该选择“先清理缓存再更新数据库”还是“先更新数据库再清理缓存”,其实,这两种选择如果不加锁进行保护,都会存在一定程度的并发问题,如下所示:

先更新数据库再清理缓存

image.png
可以看到,一旦出现图中情况,A线程将会一直使用旧值,这是一个很严重的问题。

先更新数据库再清理缓存

image.png
上图的情况也是可能发生的,数据在一小段时间内会不一致,过一段时间会自动得到新的数据,但是并没有“先清理缓存再更新数据库”的问题那么严重。

补充: 其实还有可能一种情况,就是缓存刚好过期失效或者第一次查询缓存的时候,如下图所示:

image.png

读写锁实现缓存一致性

其实上面的几种情况的出现,就是一个并发安全问题罢了,因此我们只需要对共享资源加锁即可,但是如果我们直接使用ReentrantLock或者synchronized关键字在这种“读多写少”的情况下一定会大幅度降低效率,因此这时候就展现了读写锁的强大之处,如下代码所示:

  1. public class GenericCachedDao<T> {
  2. // HashMap 作为缓存非线程安全, 需要保护
  3. HashMap<SqlPair, T> cache = new HashMap<>();
  4. ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  5. GenericDao genericDao = new GenericDao();
  6. public int update(String sql, Object... params) {
  7. SqlPair key = new SqlPair(sql, params);
  8. // 加写锁, 防止其它线程对缓存读取和更改
  9. lock.writeLock().lock();
  10. try {
  11. int rows = genericDao.update(sql, params);
  12. cache.clear();
  13. return rows;
  14. } finally {
  15. lock.writeLock().unlock();
  16. }
  17. }
  18. public T queryOne(Class<T> beanClass, String sql, Object... params) {
  19. SqlPair key = new SqlPair(sql, params);
  20. // 加读锁, 防止其它线程对缓存更改
  21. lock.readLock().lock();
  22. try {
  23. T value = cache.get(key);
  24. if (value != null) {
  25. return value;
  26. }
  27. } finally {
  28. lock.readLock().unlock();
  29. }
  30. // 加写锁, 防止其它线程对缓存读取和更改
  31. lock.writeLock().lock();
  32. try {
  33. // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据,为防止重复查询数据库, 再次验证
  34. T value = cache.get(key);
  35. if (value == null) {
  36. // 如果没有, 查询数据库
  37. value = genericDao.queryOne(beanClass, sql, params);
  38. cache.put(key, value);
  39. }
  40. return value;
  41. } finally {
  42. lock.writeLock().unlock();
  43. }
  44. }
  45. // 作为 key 保证其是不可变的
  46. static class SqlPair {
  47. private final String sql;
  48. private final Object[] params;
  49. }
  50. }

上述代码中,在queryOne方法中,先加读锁判断缓存是否有想要的数据,有的话直接返回,没有就加写锁从数据库查询并更新缓存保证原子性,可以防止其它线程对缓存读取和更改。而在update方法里面则直接加写锁来保证删除缓存和更新数据库的原子性。

ReentrantReadWriteLock类的原理

原理流程

image.png

步骤一:t1 w.lock,t2 r.lock

  • t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位:

image.png

  • t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先t2线程先会进入tryAcquireShared 流程,如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败

    tryAcquireShared 返回值表示 :

    • -1 表示失败
    • 0 表示成功,但后继节点不会继续唤醒
    • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1

image.png

  • 由于读锁被占用,方法返回-1,这时会进入 sync.doAcquireShared(1) 流程,首先是调用 addWaiter 添加节点,在于节点被设置为Node.SHARED 模式,注意此时 t2 仍处于活跃状态:

image.png

  • t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁;如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,会把前驱节点的 waitStatus 改为 -1,再循环一次尝试 tryAcquireShared(1) ,如果还不成功,就会被park

image.png

步骤二:t3 r.lock,t4 w.lock

  • 这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子:

image.png

注意: 其中 Ex 是独占模式

步骤三:t1 w.unlock

  • 这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子:

image.png

  • 接下来执行唤醒流程 sync.unparkSuccessor,这时 t2 在 doAcquireShared 方法内的 parkAndCheckInterrupt() 处恢复运行这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一:

image.png

  • 这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点:

image.png

  • 在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行:

image.png

  • 这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一:

image.png

  • 这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点,下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点:

image.png

步骤四:t2 r.unlock,t3 r.unlock

  • t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零:

image.png

  • t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即:

image.png

  • 之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束:

image.png

源码分析

参考:https://juejin.cn/post/6844903952274685959