1. 引入
前面所介绍的解决线程同步的方案中,不管是作为重量级锁的synchronized,或是优化后的synchronized,以及同样支持可重入且可以更好的操作加锁和释放锁的ReentranLock,它们都属于排它锁。当多线程中的某一个线程竞争到了锁,那么不管其他线程执行的是读操作还是写操作,在没有获得锁的前提下都无法进行,这显然有些不太合理。按照常理来说,当不同的线程同时对共享变量执行读操作时,它们不应该彼此之间是互斥的,但是如果有线程想要执行写操作是应该阻塞的。另外,如果某个线程对于共享变量执行写操作,那么其他的线程不管是读还是写都应该阻塞。
如果想要实现上述的功能,那么就需要一个读写锁,它维护了一个读锁和一个写锁,这通过分隔读锁和写锁,使得并发性能大幅提高。下面即将介绍的ReentrantWriteReadLock就是这样的一种机制。
2. ReentrantWriteReadLock
2.1 概念
ReentrantWriteReadLockt通过读写锁实现:写锁被获取时,后续的读写操作都被阻塞;读锁被获取时,写操作被阻塞,读操作不受影响。
ReentrantWriteReadLock相比于之前的ReentrantLock具有如下特性:
- 公平性选择:支持公平锁和非公平锁两种模式,当要求吞吐量时推荐使用非公平锁
- 重入:支持锁重入,当线程获得写锁后,可再次获取写锁或读锁;当线程获取读锁后,能够再次获取读锁
- 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁可以降级为读锁
ReentrantWriteReadLock的源码定义如下:
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
}
可以看到它内部维护了一个读写锁,并且同样使用了自定义的同步器。其中,ReadWriteLock接口中只定义了如下的两个方法用于获取写锁和读锁。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
构造函数同样有无参和带参两种形式,其中无参默认使用非公平锁,带参可以设置使用公平锁。
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
另外,除了和ReentrantLock中相同的方法外,ReentrantWriteReadLock还提供了一些展示内部工作状态的方法:
getReadLOckCount()
:获取当前读锁别所有线程获取的总次数getReadHoldCount()
:获取当前线程获取读锁的次数isWriteLocked()
:判断读锁是否被获取getWriteHoldCount()
:获取当前线程获取写锁的次数
2.2 核心
ReentrantLock中需要使用AQS的同步状态来维护线程之间的同步操作,根据上面的源码定义可知,ReentrantWriteReadLock同样需要AQS的同步状态来维护读进程和写进程之间的同步操作。
假设线程同步状态使用32bit的变量来表示,那么可以使用高16位表示读,低16位表示写,整体上就可以用来同时维护读写状态。如下所示:
如果想要获取写状态,即获取低16位的值,那么可以使用与操作,如下所示:
如果想要获取读状态,那么只需要获取高16位,直接左移16位即可。如下所示:
2.3 图解流程
在了解了如何使用一个同步状态的高低位来实现读写锁的控制,结合前面对于ReentrantLock的分析,下面通过例子看一下ReentrantWriteReadLock是如何控制写锁和读锁的,这里只以非公平锁为例说明。
当没有线程加读锁,也没有线程加写锁时,读写锁如下所示,state高位和低位计数都为0:
如果此时线程1想要加写锁,由于此时并没有线程加锁。因此加锁成功,修改state为0_1
public static class WriteLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
}
acqurie()
为AQS中的方法,如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()
定义如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获取写锁被所有线程获取的次数
int w = exclusiveCount(c);
// 如果c!=0,表示写锁已经被线程获取
if (c != 0) {
// 如果读锁也被获取,而且持锁线程不是当前线程
if ( w == 0 || current != getExclusiveOwnerThread()
) {
// 那么当前线程获取锁失败
return false;
}
// 写锁计数超过低 16 位, 报异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入, 获得写锁成功
setState(c + acquires);
return true;
}
// 判断写锁是否该阻塞,或尝试更改写锁计数值
if ( writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
// 获得锁失败
return false;
}
// 获得锁成功
setExclusiveOwnerThread(current);
return true;
}
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
如图所示:
接着线程2想要加读锁,根据加锁规则,此时线程2应该被阻塞。它首先进入读锁的acquireShared(1)
流程,接着进入tryAqcquire()
尝试获取锁。因为t1已经加了写锁,方法返回-1表示加锁失败。
方法的返回值有三种情况:
- -1:失败
- 0:成功,但是不会继续唤醒后继节点
- 整数:成功,数值表示后续需要唤醒的节点个数
public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquireShared(1);
}
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
其中尝试获取读锁的方法tryAcquireShared()
定义如下:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果此时已有其他的线程持有写锁,并且持锁线程不是当前线程,则读锁获取失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读状态位
int r = sharedCount(c);
if (
// 读锁不该阻塞
!readerShouldBlock() &&
// 读锁计数小于最大值
r < MAX_COUNT &&
// 使用CAS尝试增加state中读锁计数
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// ... 省略不重要的代码
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// ... 省略不重要的代码
return 1;
}
}
}
如图所示:
由于线程2加锁被阻塞,此时会进入doAcquireShared(1)
流程,也是调用addWaiter()
添加节点,此时的节点被设置为Node.SHARED模式,而且线程2此时仍处于活跃状态。
private void doAcquireShared(int arg) {
// 将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再一次尝试获取读锁
int r = tryAcquireShared(arg);
// 成功
if (r >= 0) {
// r 表示可用资源数, 在这里总是1,允许链式唤醒,继续唤醒下一个SHARED节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (
// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
shouldParkAfterFailedAcquire(p, node) &&
// park 当前线程
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此时,线程2会看他是不是哨兵节点后的第一个节点,如果是,继续调用tryAcquireShared(1)
尝试加锁。但由于线程1还没有释放写锁,加锁依然会失败。线程2会在doAcquireShared()
内的死循环中继续循环一次,将它的前驱节点的waitStatus改为-1。然后再次调用tryAcquireShared(1)
尝试加锁,如果还是失败,则在parkAndCheckInterrupt()
处被阻塞,不再尝试加锁。
线程2加锁失败后,假设又有线程3想要加读锁,线程4想要加写锁。但是由于线程1仍然持有写锁,它们加锁也会失败,进入等待队列。
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
for (;;) {
Node h = head;
// 队列还有节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 下一个节点 unpark 如果成功获取读锁
// 并且下下个节点还是 shared, 继续 doReleaseShared
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
千辛万苦,终于等于线程1释放锁,修改exclusiveOwnerThread为null,如下所示:
释放锁成功后调用unparkSuccessor()
唤醒等待队列中可能的阻塞线程,此时,线程2在doAcquireShared()
内parkAndCheckInterrupt()
处恢复运行。接着再执行一次循环,调用tryAcquireShared()
让读状态计数加一。
static final class NonfairSync extends Sync {
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 尝试释放写锁成功
if (tryRelease(arg)) {
// 唤醒等待队列中的线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因为可重入的原因, 写锁计数为 0, 才算释放成功
boolean free = exclusiveCount(nextc) == 0;
if (free) {
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}
}
因此,此时state值为1_0
线程2加锁成功后,调用setHeadAndPropagate()
将本来所在的节点设置为头节点。然后在方法内检查下一个节点的状态是否是SHARED,如果是,则调用doReleaseShared()
将头节点的waitStatus修改为-1,并唤醒后继线程3,它会在parkAndCheckInterrupt()
处恢复运行。执行上面相同的操作,修改state为2_0
,并将原本的节点设置为头节点,waitStatus修改为-1
不久之后,线程2和线程3相继执行结束,调用releaseShared(1)
修改读状态计数值为0。
static final class NonfairSync extends Sync {
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
// ... 省略不重要的代码
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
// 计数为 0 才是真正释放
return nextc == 0;
}
}
}
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
// 防止 unparkSuccessor 被多次执行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
然后调用doReleaseShared()
将头节点waitStatus修改为0,唤醒线程4
线程4同样在parkAndCheckInterrupt()
处恢复运行,并且发现自己是哨兵节点的后继,而且此时没有其他的线程竞争锁,则加锁成功,修改state为0_1
。
2.4 锁降级
最后一个点就是前面特性中提高的锁降级,它是指把持住写锁,再获取读锁,随后释放写锁的过程。经过锁降级之后,写锁就会被降级为读锁。之所以在释放写锁之前需要先获取读锁,是为了避免直接释放写锁后,其他线程对于数据的更新对当前线程不可见。如果当前线程先获取读锁,那么想要获取写锁的线程就都会被阻塞,只有当前线程成功释放了写锁,其他竞争写锁的线程才能成功获取到。
3. 使用
假设demo代码如下所示:
/**
* @Author dyliang
* @Date 2020/9/6 15:06
* @Version 1.0
*/
public class Test {
static ReentrantWriteReadLockDemo d = new ReentrantWriteReadLockDemo();
public static void main(String[] args) {
// ReadRead();
// ReadWrite();
// WriteWrite();
}
public static void ReadRead() {
new Thread(() -> {
d.read();
}, "t1").start();
new Thread(() -> {
d.read();
}, "t2").start();
}
public static void ReadWrite(){
new Thread(() -> {
d.read();
}, "t2").start();
new Thread(() -> {
d.write();
}, "t3").start();
}
public static void WriteWrite(){
new Thread(() -> {
d.write();
}, "t3").start();
new Thread(() -> {
d.write();
}, "t4").start();
}
}
class ReentrantWriteReadLockDemo{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReadLock readLock = lock.readLock();
WriteLock writeLock = lock.writeLock();
public void read(){
try {
readLock.lock();
System.out.println("Thread-" + Thread.currentThread().getName() + " enter...");
Thread.sleep(3000);
System.out.println("Thread-" + Thread.currentThread().getName() + " leave...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
readLock.unlock();
}
}
public void write(){
try {
writeLock.lock();
System.out.println("Thread-" + Thread.currentThread().getName() + " enter...");
Thread.sleep(3000);
System.out.println("Thread-" + Thread.currentThread().getName() + " leave...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
writeLock.unlock();
}
}
}
如果运行ReadRead()
,可以看到线程1和线程2同时进入,证明一个线程持有读锁不会影响其他线程加读锁。
Thread-t1 enter...
Thread-t2 enter...
Thread-t2 leave...
Thread-t1 leave...
如果运行ReadWrite()
,控制台输出:
Thread-t2 enter...
Thread-t2 leave...
Thread-t3 enter...
Thread-t3 leave...
可以看到,当线程2先加读锁后,线程3只有等到读锁被释放才能加写锁。如果运行WriteWrite()
,控制台输出:
Thread-t3 enter...
Thread-t3 leave...
Thread-t4 enter...
Thread-t4 leave...
可以看到只有线程3释放了写锁后,线程4才能加写锁。