ReentrantReadWriteLock类的使用
读写锁介绍
简介
Java中的读写锁就是ReentrantReadWriteLock对象,它维护一对锁,即写锁和读锁。其中读锁可以通过ReentrantReadWriteLock的 readLock 方法获取,写锁可以通过 writeLock 方法获取,ReadLock是共享锁,而WriteLock是独占锁。
由来
那么为什么要出现读写锁?因为在读多写少的时候ReentrantLock的性能比较低,读操作是不存在并发安全的问题,理论上应该并发执行来提高效率,但是如果使用了ReentrantLock,即使是进行读操作,也只能一个个线程去排队获取锁,这里大幅度的降低了并行度,效率比较低。因此,这个时候就出现了读锁,当线程需要进行读操作时,需要先获取读锁,并且读锁时共享锁,可以被多个线程同时获取,也就是在只要读操作的情况下,多个线程可以同时安全的并发执行。但是,如果有线程正在进行写操作,也就是获得了写锁,这样为了确保安全,其它的读线程和写线程都会被阻塞,无法在获取读锁和写锁。因此,读写锁是适合读多写少的情况,它的作用为下面两点:
- 在同一时间,可以允许多个读线程同时获取读锁进行读操作
但是,,在写线程访问时,所有读线程和写线程都会无法获取读锁和写锁
主要特征
可重入:ReentrantReadWriteLock的读锁和写锁都是可重入的
- 支持公平锁和非公平锁:ReentrantReadWriteLock是可以切换公平锁和非公平锁的
降级性:ReentrantReadWriteLock允许写锁降低为读锁,但是不允许读写升级为写锁
使用示例
创建锁对象
//创建读写锁对象
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//通过读写锁对象获取读锁
ReentrantReadWriteLock.ReadLock r = rw.readLock();
//通过读写锁对象获取写锁
ReentrantReadWriteLock.WriteLock w = rw.writeLock();
特征验证
@Slf4j(topic = "c.DataContainer")
public class DataContainer {
private String data = "666";
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock r = rw.readLock();
private final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public void read() {
r.lock();
log.debug("获取读锁...");
try {
log.debug("读取数据:" + data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
w.lock();
log.debug("获取写锁...");
try {
log.debug("写入数据:" + data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
“读-读”并发
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
可以看到,线程t1和t2都是同时获取并释放读锁的,也就是不具有排他性,验证“读-读”是并发执行的。“读-写”互斥
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
可以看到用时3s(读1s+写2s),是串行执行的,因为t1先获取到读锁,然后t2获取到写锁,写入操作在等待读取操作结束后(释放读锁)才进行。我们把读操作和写操作的先后顺序调换一下:public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
}
同样也是在等待写锁释放才进行读操作,没有并发执行,是互斥的。“写-写”互斥
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
new Thread(() -> {
dataContainer.write();
}, "t1").start();
}
可重入性
读锁不能升级为写锁验证:
public class CachedData {
String data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
final ReentrantReadWriteLock.ReadLock r = rw.readLock();
final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
void test() {
r.lock();
try {
System.out.println("111");
w.lock();
try {
System.out.println("222");
} finally {
w.unlock();
}
} finally {
r.unlock();
}
}
public static void main(String[] args) {
CachedData cachedData = new CachedData();
cachedData.test();
}
}
可以看到程序一直无法正常运行下去,因为在持有读锁时还尝试获取写锁,但是由于“读-写”互斥的特征,导致尝试获取写锁时一直在等待读锁的释放,但是读锁不会释放,造成死锁。
写锁可以降级为读锁的验证:
public class CachedData {
String data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
final ReentrantReadWriteLock.ReadLock r = rw.readLock();
final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
void processCachedData() {
r.lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
r.unlock();
w.lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = "666";
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
w.lock();
} finally {
w.unlock();
}
}
// 自己用完数据, 释放读锁
try {
System.out.println("用完数据");
} finally {
w.unlock();
}
}
public static void main(String[] args) {
CachedData cachedData = new CachedData();
cachedData.CachedData();
}
}
可以看到在持有写锁并且释放后可以再获取读锁,但是特别注意如果写锁没有释放那么还是会造成死锁。
读写锁的应用
缓存更新策略
如果使用缓存来缓存数据,那么当数据需要更新的时候,我们应该选择“先清理缓存再更新数据库”还是“先更新数据库再清理缓存”,其实,这两种选择如果不加锁进行保护,都会存在一定程度的并发问题,如下所示:
先更新数据库再清理缓存
可以看到,一旦出现图中情况,A线程将会一直使用旧值,这是一个很严重的问题。
先更新数据库再清理缓存
上图的情况也是可能发生的,数据在一小段时间内会不一致,过一段时间会自动得到新的数据,但是并没有“先清理缓存再更新数据库”的问题那么严重。
补充: 其实还有可能一种情况,就是缓存刚好过期失效或者第一次查询缓存的时候,如下图所示:
读写锁实现缓存一致性
其实上面的几种情况的出现,就是一个并发安全问题罢了,因此我们只需要对共享资源加锁即可,但是如果我们直接使用ReentrantLock或者synchronized关键字在这种“读多写少”的情况下一定会大幅度降低效率,因此这时候就展现了读写锁的强大之处,如下代码所示:
public class GenericCachedDao<T> {
// HashMap 作为缓存非线程安全, 需要保护
HashMap<SqlPair, T> cache = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
GenericDao genericDao = new GenericDao();
public int update(String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
int rows = genericDao.update(sql, params);
cache.clear();
return rows;
} finally {
lock.writeLock().unlock();
}
}
public T queryOne(Class<T> beanClass, String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加读锁, 防止其它线程对缓存更改
lock.readLock().lock();
try {
T value = cache.get(key);
if (value != null) {
return value;
}
} finally {
lock.readLock().unlock();
}
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据,为防止重复查询数据库, 再次验证
T value = cache.get(key);
if (value == null) {
// 如果没有, 查询数据库
value = genericDao.queryOne(beanClass, sql, params);
cache.put(key, value);
}
return value;
} finally {
lock.writeLock().unlock();
}
}
// 作为 key 保证其是不可变的
static class SqlPair {
private final String sql;
private final Object[] params;
}
}
上述代码中,在queryOne方法中,先加读锁判断缓存是否有想要的数据,有的话直接返回,没有就加写锁从数据库查询并更新缓存保证原子性,可以防止其它线程对缓存读取和更改。而在update方法里面则直接加写锁来保证删除缓存和更新数据库的原子性。
ReentrantReadWriteLock类的原理
原理流程
步骤一:t1 w.lock,t2 r.lock
- t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位:
- t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先t2线程先会进入tryAcquireShared 流程,如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示 :
- -1 表示失败
- 0 表示成功,但后继节点不会继续唤醒
- 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
- 由于读锁被占用,方法返回-1,这时会进入 sync.doAcquireShared(1) 流程,首先是调用 addWaiter 添加节点,在于节点被设置为Node.SHARED 模式,注意此时 t2 仍处于活跃状态:
- t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁;如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,会把前驱节点的 waitStatus 改为 -1,再循环一次尝试 tryAcquireShared(1) ,如果还不成功,就会被park
步骤二:t3 r.lock,t4 w.lock
- 这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子:
注意: 其中 Ex 是独占模式
步骤三:t1 w.unlock
- 这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子:
- 接下来执行唤醒流程 sync.unparkSuccessor,这时 t2 在 doAcquireShared 方法内的 parkAndCheckInterrupt() 处恢复运行这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一:
- 这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点:
- 在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行:
- 这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一:
- 这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点,下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点:
步骤四:t2 r.unlock,t3 r.unlock
- t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零:
- t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即:
- 之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束: