现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)。这种时候,如果对每次读操作,也使用ReentrantLock这样的互斥锁是不合适的,同样的,Semaphore这样的共享锁也不一定满足需求。
在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。
在读多于写的情况下, 读写锁能够提供比排它锁更好的并发性和吞吐量。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它内部,维护了 一对相关的锁:
一个用于只读操作,称为读锁;
一个用于写入操作,称为写锁。
线程进入读锁的前提条件:
没有其他线程的写锁
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
而读写锁有以下三个重要的特性:
1.公平选择性
支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
2.可重入
读锁和写锁都支持线程重入。
a.读线程获取读锁后,能够再次获取读锁。
b.写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
3.锁降级
遵循下面的次序,写锁能够降级成为读锁:
a.获取写锁
b.再获取读锁
c.最后释放写锁
类结构
ReentrantReadWriteLock是可重⼊的读写锁实现类。在它内部,维护了⼀对相关的锁,⼀个⽤于只读操作,另⼀个⽤于写⼊操作。只要没有Writer线程,读锁可以由多个Reader线程同时持有。也就是说,写锁是独占的,读锁是共享的。
红线:内部类
蓝线:继承
绿线:实现接⼝
读写锁接口ReadWriteLock
一对方法,分别获得读锁和写锁Lock对象。
读写锁的使用
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock r = readWriteLock.readLock();
private Lock w = readWriteLock.writeLock();
// 读操作上读锁
public Data get(String key) {
r.lock();
try {
// TODO 业务逻辑
}finally {
r.unlock();
}
}
// 写操作上写锁
public Data put(String key, Data value) {
w.lock();
try {
// TODO 业务逻辑
}finally {
w.unlock();
}
}
注意:
1.读锁不⽀持条件变量
不存在阻塞,条件变量没⽤
2. 锁升级不⽀持
持有读锁的情况下去获取写锁,会导致阻塞
3. 锁⽀持降级(锁降级)
持有写锁的情况下可以去获取读锁
应用场景-读多写少
public class ReentrantReadWriteLockTest {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 获取⼀个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
上述示例中,Cache组合⼀个⾮线程安全的HashMap作为缓存的实现,同时使⽤读写锁的读锁和写锁来保证Cache是线程安全的。在读操作get(String key)⽅法中,需要获取读锁,这使得并发访问该⽅法时不会被阻塞。写操作put(String key,Object value)⽅法和clear()⽅法,在更新 HashMap时必须提前获取 写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,⽽只有写锁被释放之后,其他读写操作才能继续。Cache使⽤读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可⻅性, 同时简化了编程⽅式
锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。锁降级可以帮助我们拿到当前线程修改后的结果⽽不被其他线程所破坏,防⽌更新丢失。
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
private volatile boolean update = false;
public void processData() {
readLock.lock();
if (!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// TODO 准备数据的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 锁降级完成,写锁降级为读锁
}
try {
//TODO 使⽤数据的流程(略)
} finally {
readLock.unlock();
}
}
锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可⻅性,如果当前线程不获取读锁⽽是直接释放写锁,假设此刻另⼀个线程(记作线程T)获取了写锁并修改了数据,那么当前线程⽆法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使⽤数据并释放读锁之后,线程T才能获取写锁进⾏数据更新。如果是分段获取锁,先释放写锁,再获取读锁,当前线程可能会被阻塞,但当前线程想要的结果是,我释放写锁后,当前线程继续执⾏。
RentrantReadWriteLock不⽀持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。⽬的也是保证数据可⻅性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可⻅的。
源码分析
关注点
1. 读写锁怎样实现分别记录读写状态的?
采⽤“按位切割使⽤”的⽅式来维护int state,将其切分为两部分:⾼16为表示读,低16为表示写。 分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过位运算。假如当前同步状态为S,那么:
写状态,等于S&0x0000FFFF(将⾼16位全部抹去)。当写状态加1,等于S+1.
读状态,等于S >>> 16 (⽆符号补 0 右移 16 位)。当读状态加1,等于S+(1<<16),也就是 S+0x00010000
根据状态的划分能得出⼀个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)⼤于0,即读锁已被获取。
2. 写锁是怎样获取和释放的?
3. 读锁是怎样获取和释放的?
写锁
写锁是⼀个⽀持重进⼊的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程, 则当前线程进⼊等待状态。
获取
protected final boolean tryAcquire(int acquires) {
//当前线程
Thread current = Thread.currentThread();
//获取state状态 存在读锁或者写锁,状态就不为0
int c = getState();
//获取写锁的重⼊数
int w = exclusiveCount(c);
//当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
if (c != 0) {
// c!=0 && w==0 表示存在读锁
// 当前存在读锁或者写锁已经被其他写线程获取,则写锁获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 超出最⼤范围 65535
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//同步state状态
setState(c + acquires);
return true;
}
// writerShouldBlock有公平与⾮公平的实现, ⾮公平返回false,会尝试通过cas加锁
//c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置写锁为当前线程所有
setExclusiveOwnerThread(current);
return true;
}
通过源码我们可以知道:
读写互斥
写写互斥
写锁⽀持同⼀个线程重⼊
writerShouldBlock写锁是否阻塞实现取决公平与⾮公平的策略(FairSync和NonfairSync)
释放
protected final boolean tryRelease(int releases) {
//若锁的持有者不是当前线程,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//当前写状态是否为0,为0则释放写锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁
获取
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁已经被获取并且获取写锁的线程不是当前线程,当前线程获取读锁失败返回-1
判断锁降级
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//计算出读锁的数量
int r = sharedCount(c);
/**
* 读锁是否阻塞 readerShouldBlock()公平与⾮公平的实现
* r < MAX_COUNT: 持有读锁的线程⼩于最⼤数(65535)
* compareAndSetState(c, c + SHARED_UNIT) cas设置获取读锁线程的数量
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { //当前线程获取读锁
if (r == 0) { //设置第⼀个获取读锁的线程
firstReader = current;
firstReaderHoldCount = 1; //设置第⼀个获取读锁线程的重⼊数
} else if (firstReader == current) { // 表示第⼀个获取读锁的线程重⼊
firstReaderHoldCount++;
} else { // ⾮第⼀个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; //记录其他获取读锁的线程的重⼊次数
}
return 1;
}
// 尝试通过⾃旋的⽅式获取读锁,实现了重⼊逻辑
return fullTryAcquireShared(current);
}
读锁共享,读读不互斥
读锁可重⼊,每个获取读锁的线程都会记录对应的重⼊数
读写互斥,锁降级场景除外
⽀持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
readerShouldBlock读锁是否阻塞实现取决公平与⾮公平的策略(FairSync和NonfairSync)
释放
获取到读锁,执⾏完临界区后,要记得释放读锁(如果重⼊多次要释放对应的次数),不然会阻塞其他线程的写操作。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果当前线程是第⼀个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--; //重⼊次数减1
} else { //不是第⼀个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; //重⼊次数减1
}
for (;;) { //cas更新同步状态
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
问题一:有线程读的过程中不允许写,这种设计有什么问题?
如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是⼀种 悲观的读锁。为了进⼀步提升并发执⾏效率,Java 8引⼊了新的读写锁:StampedLock。
StampedLock和ReentrantReadWriteLock相⽐,改进之处在于:读的过程中也允许获取写锁后写⼊!在原先读写锁的基础上新增了⼀种叫乐观读(Optimistic Reading)的模式。该模式并不会加锁,所以不会阻塞线程,会有更⾼的吞吐量和更⾼的性能。
但StampedLock使⽤起来很复杂,且不⽀持重⼊锁,因此没被⼤范围使⽤。