读写锁虽然看起来比 synchronized 的粒度似乎细了一些,但在实际应用中,其表现也并不尽如人意,因为读写锁虽然分离了读和写的功能,使得读与读之间可以完全并发。但在读和写之间依然是冲突的。读锁会完全阻塞写锁,它使用的依然是悲观的锁策略,如果有大量读线程,它也可能引起写线程的饥饿。
因此在 JDK 1.8 中提供了一种叫 StampedLock 的锁,可以认为它是读写锁的一个改进版本,在提供类似读写锁的同时,还提供了一种乐观的读策略,这种乐观的锁非常类似无锁操作,使得乐观锁完全不会阻塞写线程。
StampedLock 不是基于 AQS 实现的,但实现的原理和 AQS 是类似的,都是基于队列和锁状态实现的。与读写锁不一样的是,StampedLock 支持三种模式:写锁、悲观读锁 和 乐观读。
读、写锁模式
其中,写锁、悲观读锁的语义和 ReadWriteLock 中的写锁、读锁的语义是类似的,允许多个线程同时获取悲观读锁,但只允许一个线程获取写锁,并且写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁在加锁成功后会返回一个票据 stamp;然后解锁时需要传入这个 stamp。在乐观读模式下,stamp 还会作为读取共享资源后的二次校验。
1. 使用示例
我们先通过一个官方的例子来了解下 StampedLock 是如何使用的,示例代码如下:
public class Point {
private double x, y;
private final StampedLock s1 = new StampedLock();
void move(double deltaX, double deltaY) {
//获取写锁
long stamp = s1.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
//释放写锁
s1.unlockWrite(stamp);
}
}
/**
* 锁升级的过程
*/
void moveIfAtOrigin(double newX, double newY) {
// 获取读锁
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
// 尝试升级写锁
long ws = sl.tryConvertToWriteLock(stamp);
// 不为 0 升级写锁成功
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
// 升级失败,释放之前加的读锁并上写锁,通过循环再试
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
// 释放最后加的锁
sl.unlock(stamp);
}
}
}
一个写线程获取写锁的过程中,首先是通过 WriteLock 获取一个票据 stamp,WriteLock 是一个独占锁,同时只会有一个线程可以获取该锁,当一个线程获取该锁后,其它请求的线程必须等待,当没有线程持有读锁或者写锁时才可以获取到该锁。请求该锁成功后会返回一个 stamp 票据变量,用来表示该锁的版本,当释放该锁的时候,需要 unlockWrite 并传递参数 stamp。
2. 实现原理
StampedLock 提供的读写锁并不像其他锁一样通过定义内部类继承 AbstractQueuedSynchronizer 抽象类然后子类实现模板方法来实现同步逻辑。但实现思路还是类似的,依然使用了 CLH 队列来管理线程,通过同步状态值 state 来标识锁的状态。
private transient volatile long state;
StampedLock 的内部也定义了很多变量,这些变量的目的跟 ReentrantReadWriteLock 一样,用于将状态值按位切分,通过位运算操作 state 变量来区分读、写的同步状态。比如写锁使用的是第八位为 1 则表示写锁,读锁使用 0-7 位,所以一般情况下获取读锁的线程数量为 1-126,超过以后会使用 readerOverflow int 变量保存超出的线程数。
获取写锁:
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
获取写锁,如果获取失败则构建节点放入队列,同时阻塞线程,需要注意的是该方法不响应中断,如需中断需要调用 writeLockInterruptibly()。否则会造成高 CPU 占用的问题。
(s = state) & ABITS 标识读锁和写锁未被使用,那么直接执行 CAS 操作将 state 第八位设置 1,标识写锁占用成功。
CAS 失败的话则调用 acquireWrite 先进行若干次自旋,试图通过 CAS 操作获得锁。如果自旋失败则加入等待队列,同时将线程阻塞。
获取读锁:
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
如果 CLH 队列为空并且读锁线程数未超过限制,则通过 CAS 方式修改 state 标识获取读锁获取成功。否则调用 acquireRead 尝试使用自旋获取读锁,获取不到则进入等待队列。
乐观读模式
StampedLock 的性能之所以比 ReadWriteLock 还要好,关键是其支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读时,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
这里我们用的是“乐观读”这个词,而不是“乐观读锁”,是因为乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。
1. 使用示例
下面演示乐观读的使用示例:
class Point {
private int x, y;
final StampedLock sl = new StampedLock();
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入局部变量,由于是无锁的,所以在读的过程数据可能被其他线程修改了
int curX = x, curY = y;
// 判断执行读操作期间,是否存在写操作,如果存在则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(curX * curX + curY * curY);
}
}
由于 tryOptimisticRead() 是无锁的,所以在读数据的过程中数据可能被其他线程修改了。因此最后读完之后,还需要通过调用 validate(stamp) 再次验证一下是否存在写操作,即是否有其他线程持有了写锁。如果在执行乐观读操作期间存在写操作,则会把乐观读升级为悲观读锁,保证能够读到最新的数据。
相比读写锁,StampedLock 获取读锁只是使用与或操作进行检验,不涉及 CAS 操作,即使第一次乐观锁获取失败也会马上升级至悲观锁,这样就避免了一直进行 CAS 操作带来的 CPU 占用性能的问题,因此效率更高。
2. 实现原理
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
可以看到,乐观读只使用了位运算,当写锁已经被获取时直接返回 0。其中,WBIT 用来获取写锁状态位,如果写锁没有被获取则返回当前 state 的值。
如果在乐观读后,有线程申请了写锁,那么 state 的状态就会改变:
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
那么,在进行乐观锁校验时,如果自乐观读时取得的 stamp 和当前 stamp 不一致,则返回 true,表示乐观读失败。如果 stamp 为零,则始终返回 false。如果 stamp 代表当前持有的锁,则始终返回 true。
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
注意事项
StampedLock 在某些简单的应用场景上基本可以替代 ReadWriteLock,但是 StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用时有几个地方需要注意一下。
StampedLock 在命名上并没有增加 Reentrant,事实上 StampedLock 也确实不支持重入。StampedLock 的悲观读锁、写锁都不支持条件变量。
当线程阻塞在 StampedLock 的 readLock() 或 writeLock() 上时,此时如果调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。因为 StampedLock 在挂起线程时,使用的是 Unsafe 的 park 方法,而 park 方法在遇到线程中断时,会直接返回而不是抛出异常。StampedLock 在 CAS 死循环获取锁时没有处理有关中断的逻辑,导致阻塞在 park 方法上的线程被中断后,再次进入 CAS 死循环。而当退出条件得不到满足时,就会导致 CPU 飙升的情况。如果需要支持中断功能,一定要使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly() 方法。