锁的排他性使得多个线程无法以线程安全的方式在同一时刻对共享变量进行读取,这在读多写少的并发场景下不利于提高系统的并发性。为此,Java SDK 并发包提供了读写锁——ReadWriteLock,读写锁是一种改进型的排他锁,在同一时刻允许多个读线程访问,但在写线程访问时,所有读线程和其他写线程均被阻塞。
获得条件 | 排他性 | 作用 | |
---|---|---|---|
读锁 | 相应的写锁未被任何线程持有 | 对读线程是共享的,对写线程是排他的 | 允许多个读线程可以同时读取共享变量,并保障读线程读取共享变量期间没有其他任何线程能够更新共享资源。 |
写锁 | 该写锁未被其他任何线程持有,并且相应的读锁未被其他任何线程持有 | 对写线程和读线程都是排他的 | 使得写线程能够以独占的方式访问共享变量,且写线程对共享变量的更新对读线程是可见的。 |
此外有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。
代码示例
public class ReadWriteLockDemo {
private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
// 模拟读任务
public void handRead(Lock lock) {
readLock.lock();
try {
Thread.sleep(1000);
System.out.println("reading...");
} catch (InterruptedException e) {
//
} finally {
readLock.unlock();
}
}
// 模拟写任务
public void handWrite(Lock lock, int index) {
writeLock.lock();
try {
Thread.sleep(1000);
System.out.println("writing...");
} catch (InterruptedException e) {
//
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable read = () -> demo.handRead(readLock);
Runnable write = () -> demo.handWrite(writeLock, new Random().nextInt());
// 模拟18个读线程,2个写线程
for (int i = 0; i < 18; i++) {
new Thread(read).start();
}
for (int i = 18; i < 20; i++) {
new Thread(write).start();
}
}
}
实现原理
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// 读锁由内部类ReadLock实现
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁由内部类WriteLock实现
private final ReentrantReadWriteLock.WriteLock writerLock;
......
}
在 ReentrantReadWriteLock 里面,ReadLock 和 WriteLock 都继承了 Lock 接口,内部执行具体操作的主体都是 Sync 内部类,Sync 继承了 AQS 并重写了获取、释放锁的逻辑。只是在获取锁时,ReadLock 采用共享模式来获取锁,而 WriteLock 采用独占模式来获取锁。
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 共享模式
public void lock() {
sync.acquireShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 独占模式
public void lock() {
sync.acquire(1);
}
}
1. state
读写锁也是基于 AQS 实现的,它的自定义同步器 Sync 需要在同步状态 state 字段上分别维护多个读线程和一个写线程的状态,该状态的设计成为实现读写锁的关键。在读写锁的实现中,很好地使用了高低位,来实现一个整型控制两种状态的功能。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
sharedCount:获取读锁数量,就是将同步状态无符号右移 16 位,即取同步状态的高 16 位。
exclusiveCount:获取写锁数量,先看下 EXCLUSIVE_MASK 这个静态变量,它是对 1 进行左移 16 位然后减 1 最后与同步状态取与,所以也就是获取同步状态的低 16 位来表示写锁的获取次数。
通过对这个 state 变量按位切割切分成了两部分,高 16 位表示读(读锁个数),低 16 位表示写(写锁个数)。也因此读锁、写锁的获取数量也有个最大值,即 216-1。
当获取读锁时,每次加锁需要修改 state 变量值的高 16 位值加一;同样,当获取写锁时,每次加锁需要修改 state 变量值的低 16 位值加一,虽然写锁是独占锁,但支持可重入,所以也会有数量限制。
2. WriteLock
WriteLock 是一个支持可重入的排它锁。当一个线程尝试获取写锁时,会先判断同步状态 state 是否为 0。如果 state 等于 0,说明暂时没有其它线程获取锁;如果 state 不等于 0,则说明有其它线程获取了锁。
此时再判断同步状态 state 的低 16 位(w)是否为 0,如果 w 为 0,则说明其它线程获取了读锁,此时进入同步队列进行阻塞等待;如果 w 不为 0,则说明其它线程获取了写锁,此时要判断获取了写锁的线程是不是当前线程,若不是就进入同步队列进行阻塞等待;若是,就应该判断当前线程获取写锁是否超过了最大次数,若超过则抛异常,反之更新同步状态。
独占模式加锁:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 取到当前锁的个数
int c = getState();
// 取写锁的个数
int w = exclusiveCount(c);
// 如果已经有线程持有了锁
if (c != 0) {
// 如果写线程数为0(即此时存在读锁)或者当前线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果写锁的数量大于最大数(65535),就抛出一个Error
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
// 公平锁和非公平锁逻辑由writerShouldBlock方法来判断
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;
// 获取同步状态成功,设置当前线程为锁的拥有者
setExclusiveOwnerThread(current);
return true;
}
在第 10 行,tryAcquire() 除了重入条件(当前线程为获取了写锁的线程)外,还额外增加了一个读锁是否存在的判断逻辑。如果存在读锁,则写锁不能被获取,因为要确保写锁的操作对读锁可见。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
独占模式释放锁:
protected final boolean tryRelease(int releases) {
// 如果不是持有写锁的线程,则直接抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 释放写状态
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 判断写锁数量是否为0,为0则修改持有写锁的线程为null,即释放写锁成功
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
3. ReadLock
ReadLock 是一个支持可重入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(写状态为 0)时读锁总是会被成功地获取。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取了,则进入等待状态。
共享模式加锁:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果其他线程持有了写锁,则直接返回-1,获取同步状态失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取state变量的高16位值(即读锁数量)
int r = sharedCount(c);
// 调用readerShouldBlock来判断是否公平与非公平,且不要超过可获取读锁数量的最大值
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 读锁数量为0,第一次加读锁,设置firstReader为当前线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 读锁重入,如果当前线程是第一个持有该读锁的线程,计数器+1
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 获取同步状态失败,则无限循环重试
return fullTryAcquireShared(current);
}
公平锁 FairSync 的 readerShouldBlock 调用了 AQS 的 hasQueuedPredecessors 方法来判断,跟可重入锁一样,先判断 AQS 同步队列中是否存在排在该线程之前的的节点,保证先入队的先执行。而非公平锁 NonfairSync 的 readerShouldBlock 实现是:如果 AQS 同步队列中第一个是等待写锁的线程则返回 true,当前获取读锁的线程先阻塞,优先让等待队列的线程去获取写锁。
共享模式释放锁:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
......
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
由于当前是共享模式,所以该方法会存在多线程并发执行的情况,因此采用 for 循环加 CAS 的方式修改 state 变量的高 16 位值,当 state 的高 16 位为 0 的时候,释放成功返回 true。
锁的降级
锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。代码示例如下所示(取自 JDK 文档):
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
//写锁
final Lock w = rwl.writeLock();
void processCachedData() {
// 获取读锁
r.lock();
// 如果数据过期了,则重新获取数据
if (!cacheValid) {
// 先释放读锁再获取写锁,因为不允许读锁的升级
r.unlock();
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁(同一个线程内可以获取到)
r.lock();
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁,去使用数据
try {
use(data);
} finally {
r.unlock();
}
}
}
为什么需要锁降级?
主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设在释放写锁的瞬间,另一个线程成功获取了写锁并修改了数据,那么当前线程就无法感知后续的数据更新了。如果当前线程遵循锁降级的步骤,那么其他线程无法获取写锁,直到当前线程释放读锁后,其他线程才能获取写锁进行数据更新。
注意:ReadWriteLock 不支持锁的升级。如果在持有读锁时去获取写锁,会导致获取写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。