ReentrantReadWriteLock 的使用
概述
Java常见的多是排他锁,这些锁在同一时刻只允许一个线程进行访问 。当多个线程同时对同一个数据进行读取操作时,由于读取操作并不涉及数据的修改,若果这个给时候仍然给读操作设计锁的话无疑会降低读取数据的性能;
而且在实际运用中读操作频率远远高于写操作时,这时候就可以使用 读写锁 , 读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,让多个只涉及读取数据的线程可以并行执行,使得并发性相比一般的排他锁有了很大提升,读写锁能有效提高读比写多的场景下的程序性能,比排它锁好。
当然了,多个线程同时涉及到读跟写或同时写的话,还是要加锁的。即读读并发,带写互斥。
类似于数据库中的 select … from … lock in share mode 表示查询可以并行执行,读写互斥;
ReadWriteLock接口管理一组锁,一个是只读的锁,一个是写锁。Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。
用法
写锁的获取与释放
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。
因此只有等待其他读线程都释放了读锁,写锁才能被当前线程所获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
写锁的释放比较简单。 每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
读锁的获取与释放
读锁不是独占式锁,即同一时刻该锁可以被多个读线程获取也就是一种共享式锁。在没有其他写线程访问(或者写状态为0)时,读锁总会成功的被获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
ReentrantReadWriteLock特性
(1)公平性选择
非公平模式(默认)
当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。
公平模式
当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。
(2)重入性
可重入锁,就是说一个线程在获取某个锁后,还可以再次获取该锁,即允许一个线程多次获取同一个锁。比如synchronized内置锁就是可重入的,如果A类有2个synchornized方法method1和method2,那么method1调用method2是允许的。显然重入锁给编程带来了极大的方便。假如内置锁不是可重入的,那么导致的问题是:1个类的synchornized方法不能调用本类其他synchornized方法,也不能调用父类中的synchornized方法。与内置锁对应,JDK提供的显示锁ReentrantLock也是可以重入的,ReentrantReadWriteLock也是可以重入的。
(3)锁降级
读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级。
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待。因为同一个线程中,在没有释放读锁的情况下,就去申请写锁,这属于锁升级,ReentrantReadWriteLock是不支持的。
ReentrantLock获取到了锁之后想再次获取锁是可以的,但是ReentrantReadWriteLock 在获取到了锁的情况下再去获取锁是不可以的,会被阻塞住;比如持有读锁的情况下去获取写锁,会导致获取写锁永久等待 。
- 重入时降级支持:即持有写锁的情况下去获取读锁 。但是从写锁降级成读锁,并不会自动释放当前线程获取的写锁,仍然需要显示的释放,否则别的线程永远也获取不到写锁。
注意:
- 读锁不支持条件变量
使用演示
这里我们提供一个 数据容器类,其内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方 法
class DataContainer {
//需要保护的数据
private Object data;
//创建读写锁对象
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//获取读锁
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
//获取写锁
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() throws InterruptedException {
System.out.println("获取读锁...");
r.lock();
try {
System.out.println("读取");
Thread.sleep(1000);
return data;
} finally {
System.out.println("释放读锁...");
r.unlock();
}
}
public void write() {
System.out.println("获取写锁...");
w.lock();
try {
System.out.println("写入");
// sleep(1);
} finally {
System.out.println("释放写锁...");
w.unlock();
}
}
}
写完容器类之后我们来测试一下,多个线程同时进行读取时的情况,以及同时读写时的情况;
测试 读锁-读锁 是否可以并发
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
try {
dataContainer.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
dataContainer.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
可以看出,两个读取操作并没有被阻塞
测试 读锁-写锁 是否可以并发
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
try {
dataContainer.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
Thread.sleep(100);
new Thread(() -> {
try {
dataContainer.write();
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
}
可以看出,阻塞了
测试 写锁-写锁 是否可以并发
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
try {
dataContainer.write();
} catch (Exception e) {
e.printStackTrace();
}
}, "t1").start();
Thread.sleep(100);
new Thread(() -> {
try {
dataContainer.write();
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
}
阻塞了
//降级
class CachedData {
Object data;
// 表示缓存数据是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须去释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新;因为外层的if (!cacheValid) 不能保证其受写锁保护
if (!cacheValid) {
//更新数据
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
读写锁原理
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。 读写锁虽然有两个锁但用的是却是同一个 Sycn同步器,因此等待队列、state等也是同一个。
回想ReentrantLock中自定义同步器的实现,同步状态state表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该state状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就需要用“按位切割使用”的方式来使用这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写,划分方式如下图所示
上图所示,当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?
答案是通过位运算。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。
t1 w.lock t2 r.lock
这里我们就以t1线程加写锁,t2线程加读锁进行流程的讲解;
- 在没有其他线程的情况下t1线程添加写锁,当然 t1成功上锁,流程与ReentrantLock 加锁相比没有特殊之处,都是将state修改为1,不同的是写锁状态占了state的低16位,而读锁使用的是state的高16位
//写锁
public void lock() {
//调用同步器的方法
sync.acquire(1);
}
//就是AQS中的acquire方法
public final void acquire(int arg) {
//调用调用tryAcquire方法尝试加锁,加锁成功后面的代码就不执行了。
if (!tryAcquire(arg) &&
//加锁失败后尝试将其加入队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//毕竟子类中重写了该方法,肯定有些不一样的东西
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//意味着其他线程可能加了读锁或写锁,说不定的
if (c != 0) {
//w==0表示别人加的是读锁;w=0不成立来到第二条件,表示判断写锁是不是当前线程加的
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
//因为读锁跟现在要加的写锁互斥,所以返回false;或者说写锁不是当前线程加的,也互斥
return false;
//写锁加上1后超过写锁的最大值,抛异常(几乎不存在)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires); //在写锁的基础上多次重入,发生可重入
return true;
}
//走到这里说明state是等于0的,表示其他线程还没有进行加锁
//writerShouldBlock表示写锁是否该阻塞(公平与否的体现),非公平锁总会返回false;公平锁检查是否是老二,这里看非公平的实现
if (writerShouldBlock() ||
//将state从0变为1
!compareAndSetState(c, c + acquires))
return false; //加写锁失败
setExclusiveOwnerThread(current);
return true;
}
2) t2执行r.lock添加读锁,这时进入读锁的sync.acquireShared(1)流程,首先会进入tryAcquireShared流程; 如果有写锁占据,那么tryAcquireShared返回-1表示失败
3)这时会进入sync.doAcquireShared(1)流程,首先也是调用addWaiter添加节点,不同之处在于节点被设置为Node.SHARED模式而非Node.EXCLUSIVE模式,注意此时t2仍处于活跃状态,还没有执行park阻塞
目前为止涉及过程的源码如下:
//读锁
public void lock() {
sync.acquireShared(1); //调用同步器方法
}
/*
tryAcquireShared返回值表示:
-1表示获取读锁失败
0表示成功,但没有后继节点,不会继续唤醒
正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回1(后面的信号量该方法会返回大于1的情况)
*/
public final void acquireShared(int arg) {
//尝试获取读锁,在本例中返回值是-1条件成立
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//查看写锁部分是否不为0,即别人是否有加写锁;在我们本例子中,因为t1线程已经加了写锁了,所以这里c不为0,条件成立,继续往后判断
if (exclusiveCount(c) != 0 &&
//查看是否是自己添加的写锁。因为倘若是之前自己线程添加的写锁,那么此时自己线程想要添加读锁是被允许的,因为支持锁降级(不支持锁升级)。
//但是在本例中,由于添加写锁的不是自己线程所以这个条件有成立了最后返回了-1
getExclusiveOwnerThread() != current)
return -1; //
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
//这个方法里面很多流程跟独占锁里面的相似
private void doAcquireShared(int arg) {
//区别是添加的节点是共享类型的节点,而不是独占型节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//找老二节点,才有资格去争抢锁
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//若果是老二节点才会调用该方法去尝试获取锁
int r = tryAcquireShared(arg);
//假如获取锁失败tryAcquireShared方法会返回-1,不满足该条件
if (r >= 0) {
//节点的替换
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//获取锁失败了是否应该将其阻塞?第一次不会park,第三次尝试还是失败就会阻塞住
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
t3 r.lock t4 w.lock
在上面分析的这种状态下,假设又有t3加读锁和t4加写锁,这期间t1仍然持有锁,就变成了下面的样子
t1 w.unlock
下面我们再来看看释放的流程,比如在上面情况的基础下,t1线程终于干完了它的活,正要调用写锁的unlock方法释放锁;这时就会走到写锁的sync.release(1)流程,里面调用tryRelease(1)方法,判断state状态的写锁部分是否减到0了,没有减到0表示有锁重入,返回false;减到0表示完全释放锁,返回成功,我们这里的例子还没有涉及锁重入,则释放锁成功,变成下面的样子
t1释放锁涉及代码
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//本例中不涉及锁重入,释放锁成功,返回真
if (tryRelease(arg)) {
Node h = head;
//队列中头节点不为空,且状态不为0
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//在原来的基础上自减
int nextc = getState() - releases;
//查看state的写锁部分是否减到0了,没有减到0表示有锁重入,返回false
boolean free = exclusiveCount(nextc) == 0;
if (free)
//减到0了就将持有者设置为null
setExclusiveOwnerThread(null);
setState(nextc); //重新设置状态
return free;
}
接下来执行唤醒流程sync.unparkSuccessor,即让老二恢复运行,这时t2在doAcquireShared内parkAndCheckInterrupt()方法处恢复运行
这回t2线程再来一次for (;;)执行tryAcquireShared 尝试获取共享读锁,成功则让读锁计数加一;我们来看看该方法内是怎么做的
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//读锁是否阻塞
if (!readerShouldBlock() &&
//重入次数
r < MAX_COUNT &&
//用CAS操作尝试将state的高16位+1
compareAndSetState(c, c + SHARED_UNIT)) {
//下面的分支是去给读锁计数
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
//表示读锁加锁成功
return 1;
}
return fullTryAcquireShared(current);
}
当读锁获取锁成功后恢复运行。但是事情还没完,在setHeadAndPropagate方法内还会检查下一个节点是否是shared状态,
如果是则调用doReleaseShared()将head 的状态从-改为О并唤醒老二,这时t3在doAcquireShared内parkAndCheckInterrupt(处恢复运行
这回t3再来一次for (;;)执行tryAcquireShared成功则让读锁计数加一,也就是state中关于读锁的部分+1,注意多个线程都可以对同一个读锁进行计数,这就是为什么这里的state读锁部分变成了2。
这时t3已经恢复运行,接下来t3调用setHeadAndPropagate(node,1),它原本所在节点被置为头节点
下一个节点不是shared 了,因此不会继续唤醒t4所在节点
t2 r.unlock,t3 r.unlock
最后再来看看读锁的释放流程,也就是刚才的t2、t3都在运行过程中,运行完了要释放读锁。
t2进入sync.releaseShared(1)中,调用tryReleaseShared(1)让计数减一,但由于计数还不为零
t3进入sync.releaseShared(1)中,调用tryReleaseShared(1)让计数减一,这回计数为零了,进入doReleaseShared()将头节点从-1改为О并唤醒老二,即t4线程
之后t4在acquireQueued 中parkAndCheckInterrupt处恢复运行,再次for (:)这次自己是老二,并且没有其他竞争,tryAcquire(l)成功,修改头结点,流程结束
//读锁
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
完整源码注释
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
private final ReentrantReadWriteLock.ReadLock readerLock; // 读锁
private final ReentrantReadWriteLock.WriteLock writerLock;// 写锁
final Sync sync; // 同步器
/**
* 创建默认的非公平锁
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 创建公平锁/非公平锁、读锁、写锁
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync(); // true则是公平锁,false则是非公平锁
readerLock = new ReadLock(this); // 读锁
writerLock = new WriteLock(this);// 写锁
}
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
} // 返回写锁
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
} // 返回读锁
/**
* 读写锁的同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;//序列化版本号
static final int SHARED_SHIFT = 16; // 常量16,目的是将state按位右移16位得到的值就是读锁的个数
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 2的16次方,实际上表示读锁加的锁次数是1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 2的16次方再减1,前面16位全0后面16位就是全1,目的就是通过&运算得到写锁的个数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 2的16次方再减1,表示加锁(读/写)最大的计数超过了则抛异常
private transient Thread firstReader; // 第一个获取到读锁的线程
private transient int firstReaderHoldCount; // 第一个线程重入锁的次数计数
private transient HoldCounter cachedHoldCounter; // 读锁计数器对象
private transient ThreadLocalHoldCounter readHolds; // 在构造Sync的时候就会被赋值,重入读锁的计数器保持对象(对象中存了获取读锁的次数)
/**
* 构造方法
*/
Sync() {
readHolds = new ThreadLocalHoldCounter(); //
setState(getState()); // 确保readHolds的可见性
}
/**
* 读锁计数,实际存入Thread中的ThreadLocal变量中
*/
static final class HoldCounter {
int count; //获取读锁的次数,相当于线程存了线程id为key,value为获得锁的计数
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
/**
* 将锁计数保持对象存入当前线程的ThreadLocal变量
*/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* 由于读写锁的设计将state前16位用于读锁的个数,后16位标识写锁的个数因此厦门两个方法目的就是得到读锁和写锁的个数
*/
/**
* 返回读锁的个数
*/
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}
/**
* 返回写锁的个数
*/
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
/**
* 获取读锁时阻塞当前线程,由子类公平锁/非公平锁实现
*/
abstract boolean readerShouldBlock();
/**
* 获取写锁时阻塞当前线程,由子类公平锁/非公平锁实现
*/
abstract boolean writerShouldBlock();
/**
* 尝试释放锁
*/
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//判断当前现是否是持有锁线程如果是则不执行,如果不是则需要抛异常,因为当前线程没有持有锁
throw new IllegalMonitorStateException();
int nextc = getState() - releases;// 计算释放锁释放合法,不允许释放超过加锁次数
boolean free = exclusiveCount(nextc) == 0;//
if (free) // 判断释放锁后释放锁计数是否为0,为0则说明当前线程不再是持有锁线程将其从排他线程状态清除
setExclusiveOwnerThread(null);
setState(nextc);// 更新锁次数计数
return free;//如果释放锁后计数为0则返回true,否则返回false
}
/**
* 这个方法是写锁调用才会执行的
* 不是第一次加写锁
* 因为是写锁才调用的方法,因此只需要排除前面加的都是读锁这种情况即可,也就是 c!=0 但 w==0的情况
* 先判断是否加了锁c!=0,如果加了锁也就是c!=0内部的这个分支,
* 是否没有加过写锁
* 是否是重入写锁
* 是第一次加写锁
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();// 获取当前线程对象
int c = getState(); // 获取state(前16位是读锁个数、后16位是写锁个数)
int w = exclusiveCount(c);// 获得写锁的个数,w有write的含义。这个值就是写锁的个数,通过按位与 15 得到写锁个数
if (c != 0) {// c!=0则说明加过锁
// 如果写锁个数为0 (说明加的都是读锁,不需要阻塞因此抢占锁失败) 或者 当前线程不是持有写锁线程(w!=0说明加过写锁需要判断当前线程是否是持有写锁的那个线程,不是则说明抢占锁失败)
if (w == 0 || current != getExclusiveOwnerThread())
return false; // 表示抢占锁失败,这里导致了两种情况,一种是加的都是读锁,一种是加了写锁,但当前线程不是持有锁线程
// 执行下面的判断都表示加过了写锁,相当于写锁的重入,因此需要将写锁计数相加也就是判断里的操作
if (w + exclusiveCount(acquires) > MAX_COUNT) // 说明是重入锁,判断本次加了acquires次锁后锁计数是否超过最大值 2的16次方-1
throw new Error("Maximum lock count exceeded");// 超过能加写锁的最大值则抛异常
// 写锁重入,因此保留读锁加上写锁重入的acquires次,将state更新
setState(c + acquires);
return true;//返回true说明加锁成功
}
// 前面没有加过锁,需要加写锁,尝试利用CAS操作更新state进行加锁,实际上逻辑上不需要这里的if,但是应该是由于并发问题怕中途state值被改了,因此CAS操作可能失败(所以失败则return false)
// c==0 说明没有加过锁,尝试将state从0更新为acquires,更新成功则说明加锁成功,因此不会返回false,而是执行后面的return true
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);// 将当前线程设置为独占线程,表示加写锁成功!
return true;// 加锁成功
}
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();// 获取当前线程对象
if (firstReader == current) { // 当前线程是否是第一个持有锁线程
if (firstReaderHoldCount == 1) // 是否是第一次上锁后就解锁了
firstReader = null; // 清除第一个读锁线程
else
firstReaderHoldCount--;// 将读锁计数减一
} else {
HoldCounter rh = cachedHoldCounter; // 得到缓存的计数器对象
if (rh == null || rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get(); // 如果缓存的计数器对象不是当前线程的,则获取当前线程的计数器对象,重新赋值
int count = rh.count; // 得到当前线程的读锁计数
if (count <= 1) { // 释放锁后为0,或者过度释放,则移除计数器
readHolds.remove();// 移除计数器
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; // 锁计数减1
}
for (; ; ) {
int c = getState(); // 获得锁计数
int nextc = c - SHARED_UNIT; // 读锁计数减一
if (compareAndSetState(c, nextc)) // cas操作更新state值
// 释放读取锁定对读取器没有影响.但是,如果现在读和写锁都已释放,则可能允许等待的编写器继续进行.
return nextc == 0;
}
}
private static IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread");
}
/**
* 读锁才调用的方法,当前线程尝试获取读锁
*/
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread(); // 获取当前线程
int c = getState();// 获取存有读和写锁次数的state值
/**
* 是写锁则进入
*/
// 通过exclusiveCount(c)得到写锁次数,如果不为0则说明加了写锁。加了写锁需要判断当前线程是否是持有写锁的线程,是则不返回-1,不是则说明是写读状态需要进行阻塞当前线程
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1; // 说明是写读状态、返回-1,抢占读锁失败
// 执行到这里说明前面没有加过写锁,可能加过读锁
int r = sharedCount(c); // 获取加的读锁次数,r就是read,实际就是将state右移16位得到
// 到这里说明没有加过锁,到这里c是0,因此进行加锁操作将state更新为读锁的1 实际二进制是:0000 0000 0000 0001 0000 0000 0000 0000
/**
* 是读锁,
* 一、读是共享的情况直接执行if内
*/
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { // 第一次进入,因为能到达这里就说明没有写锁,有判断r==0则说明读锁也为0,则说明是第一次调用
firstReader = current; // 将第一个线程存起来
firstReaderHoldCount = 1;// 计数为1
} else if (firstReader == current) {
firstReaderHoldCount++; // 读重入,读锁计数进行累加
} else {
// 说明不是获得读锁的线程进来了
// tid 为key ,value为读锁次数
HoldCounter rh = cachedHoldCounter;// 将当前线程初始值是null
// 第一次null直接创建一个
if (rh == null || rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();// 通过ThreadLocal得到HoldCounter(计数保持器,内部存了加锁计数)
else if (rh.count == 0) // 如果锁计数为0
readHolds.set(rh); // 更新锁计数保持器对象
rh.count++; // 计数累加
}
return 1;// 表示抢占读锁成功
}
/**
* 二、读是排他的情况,调用下面这个方法
*/
return fullTryAcquireShared(current);
}
/**
* 读是排他的情况采用自旋方式
* 完整版本的获取读,可处理CAS错误和tryAcquireShared中未处理的可重入读。
*/
final int fullTryAcquireShared(Thread current) {
/**
* 该代码与tryAcquireShared中的代码部分冗余,但由于不使tryAcquireShared与重试和延迟读取保持计数之间的交互复杂化,因此整体代码更简单。
*/
HoldCounter rh = null;
for (; ; ) {// 自旋
int c = getState(); // 获取读写锁计数
/**
* 如果存在写锁
*/
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)// 判断当前线程是否是持有同一把写锁的线程
return -1;// 加锁失败,当前线程不是持有写锁线程
}
/**
* 不存在写的情况
*/
// 1.判断读是否是排他的,如果是则进入
else if (readerShouldBlock()) {
// 当前线程是不是第一个读锁线程,是则说明当前线程是重入的读锁线程
if (firstReader == current) {
// 什么也没有
} else {
// 如果当前线程不是第一个抢占到读锁的线程,如果锁计数存在
if (rh == null) {
rh = cachedHoldCounter; // 得到锁计数保持器
if (rh == null || rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get(); // 得到锁计数保持器
if (rh.count == 0) // 如果计数为0
readHolds.remove(); // 清除保持器
}
}
// 读锁计数保持器存在,如果等于0则抢占读锁失败,因为这个计数器在tryAcquireShared方法已经被赋值了,所以不会为0,为0说明cas操作失败了
if (rh.count == 0)
return -1; // 加锁失败,当前线程
}
}
// 2.到这里说明是共享的读
/**
* 注意:
* 如果是tryAcquireShared方法过来的其实下面不会执行到的,
* 因为在tryAcquireShared方法中已经走过一遍这个逻辑了,
* 这里加上这个逻辑只是处于对当前方法的封装,这样当前方法可以不用依赖tryAcquireShared方法
*/
if (sharedCount(c) == MAX_COUNT) // 判断读锁是否超过最大值
throw new Error("Maximum lock count exceeded");
// 读共享,因此只需要通过cas将读锁计数累加1即可,因为CAS操作多以是单线程所以是加1
if (compareAndSetState(c, c + SHARED_UNIT)) {// 更新state值
// c 一开始是0,因为上面更新的不是c而是state值,如果c是0说明是第一个线程调用了这个方法,执行到了这里
if (sharedCount(c) == 0) {
firstReader = current; // 保存当前的第一个线程
firstReaderHoldCount = 1;// 保存计数(因为是第一次进入所以是1)
} else if (firstReader == current) {
firstReaderHoldCount++; // 持锁的同一个线程重入读锁
} else {
if (rh == null)
rh = cachedHoldCounter; // 其它线程尝试获取读锁,获取第一个线程产生的HoldCounter对象
if (rh == null || rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get(); // 从ThreadLocal中获取HoldCounter对象
else if (rh.count == 0)
readHolds.set(rh); // 如果锁计数为0更新锁计数保持其对象
rh.count++; // 读锁计数累加
cachedHoldCounter = rh; // 保存读锁计数器对象
}
return 1; // 读锁加锁成功
}
}
}
/**
* 执行tryLock进行写入,从而在两种模式下都可以进行插入。 这与tryAcquire的作用相同,只是缺少对writerShouldBlock的调用。
*/
@ReservedStackAccess
final boolean tryWriteLock() {
Thread current = Thread.currentThread(); // 得到当前线程
int c = getState(); // 得到锁计数
if (c != 0) { // 不为0说明加过锁
int w = exclusiveCount(c); // 得到写锁次数
if (w == 0 || current != getExclusiveOwnerThread())
return false;// 写锁被其它线程占用,当前线程抢占写锁失败
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 第一次就加写锁,cas更新state值
if (!compareAndSetState(c, c + 1))
return false;
// 将当前线程设置为独占
setExclusiveOwnerThread(current);
return true;// 写锁加锁成功!
}
/**
* 执行tryLock进行读取,从而在两种模式下都可以进行插入。 除了没有调用readerReaderShouldBlock以外,这与tryAcquireShared的作用相同。
*/
@ReservedStackAccess
final boolean tryReadLock() {
Thread current = Thread.currentThread(); // 获取当前线程
for (; ; ) {
int c = getState(); // 获取锁计数
// 存在写的情况
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return false;
// 不存在写的情况
int r = sharedCount(c); // 计算读锁的次数
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded"); // 值越界
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current; // 第一个线程进来读计数为0,保存第一个线程
firstReaderHoldCount = 1; // 设置计数为1
} else if (firstReader == current) { // 重入读
firstReaderHoldCount++; // 读计数累加1
} else {
// 其它线程进入
HoldCounter rh = cachedHoldCounter;// 其它线程尝试获取读锁,获取第一个线程产生的HoldCounter对象
if (rh == null || rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();// 从ThreadLocal中获取HoldCounter对象
else if (rh.count == 0)
readHolds.set(rh); // 如果锁计数为0更新锁计数保持其对象
rh.count++; // 读计数累加
}
return true; // 加读锁成功
}
}
}
protected final boolean isHeldExclusively() {
// 虽然我们必须在拥有者之前先阅读一下状态
// 我们不需要检查当前线程是否为所有者
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 与外部有关的方法
// 得到Condition对象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 得到持有锁线程
final Thread getOwner() {
return ((exclusiveCount(getState()) == 0) ? null : getExclusiveOwnerThread());
}
// 得到读锁的上锁次数
final int getReadLockCount() {
return sharedCount(getState());
}
//是否上了写锁
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
// 得到写锁上锁次数
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
// 得到读锁的上锁次数
final int getReadHoldCount() {
if (getReadLockCount() == 0) // 读锁上锁次数是否为0
return 0;
Thread current = Thread.currentThread(); // 获取当前线程
if (firstReader == current) // 当前线程是否是第一个上读的锁线程
return firstReaderHoldCount;// 返回锁计数
HoldCounter rh = cachedHoldCounter; // 读计数器对象
if (rh != null && rh.tid == LockSupport.getThreadId(current))
return rh.count; // 返回读计数器存储的锁计数
int count = readHolds.get().count; // 得到计数
if (count == 0) readHolds.remove();// 清除读计数器对象
return count; // 返回计数
}
/**
* 从流中重构实例(即反序列化它)。
*/
private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // 重置为解锁状态
}
// 得到state值,读锁写锁都在
final int getCount() {
return getState();// 返回state
}
}
/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 默认返回false
final boolean writerShouldBlock() {
return false;
}
// 阻塞当前读锁线程,如果是第一次调用就会返回false因为没有阻塞线程
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive(); // aqs第一个节点是排他节点则返回true,否则返回false
}
}
/**
* 公平锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
/**
* ReentrantReadWriteLock.readLock()返回读锁的实例
*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
/**
* 构造方法
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 尝试获取读锁
*/
public void lock() {
sync.acquireShared(1);
}
/**
* 可以被中断的方式获取锁
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 尝试获取读锁
*/
public boolean tryLock() {
return sync.tryReadLock();
}
/**
* 超过规定的时间内抢占锁,则中断获取锁操作
*/
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 释放锁
*/
public void unlock() {
sync.releaseShared(1);
}
/**
* 不允许操作,在这里是由于实现了Lock接口不得不写这个方法,如果调用则直接抛异常
*/
public Condition newCondition() {
throw new UnsupportedOperationException();
}
/**
* toString方法
*/
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
/**
* 通过ReentrantReadWriteLock.writeLock()返回该对象的实例
*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
/**
* 构造方法
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 尝试获取锁
* 调用的是带一个参数的acquire方法,因此里面会进行一次尝试抢占锁,失败则进入aqs队列
*/
public void lock() {
sync.acquire(1);
}
/**
* 以可以被中断的方式抢占锁
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 尝试获取锁
*/
public boolean tryLock() {
return sync.tryWriteLock();
}
/**
* 超过规定时间则中断获取锁操作
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
/**
* 释放锁
*/
public void unlock() {
sync.release(1);
}
/**
* 由于实现Lock接口不得不实现的方法,如果直接调用则直接抛异常
*/
public Condition newCondition() {
return sync.newCondition();
}
/**
* toString方法
*/
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
/**
* 判断当前线程是否持有锁
*/
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
/**
* 如果当前线程没有持有锁则返回0,否则返回加锁计数
*/
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
/**
* 是否是公平锁
*/
public final boolean isFair() {
return sync instanceof FairSync;
}
/**
* 返回当前持有锁的线程
*/
protected Thread getOwner() {
return sync.getOwner();
}
/**
* 获取读锁的计数
*/
public int getReadLockCount() {
return sync.getReadLockCount();
}
/**
* 是否是写锁上的锁
*/
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
/**
* 当前线程是否是获取到写锁的线程
*/
public boolean isWriteLockedByCurrentThread() {
return sync.isHeldExclusively();
}
/**
* 获取当前线程写锁上锁的次数
*/
public int getWriteHoldCount() {
return sync.getWriteHoldCount();
}
/**
* 获取当前线程读锁上锁的次数
*/
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
/**
* 获取正在获取写锁的线程的集合
*/
protected Collection<Thread> getQueuedWriterThreads() {
return sync.getExclusiveQueuedThreads();
}
/**
* 获取正在获取读锁的线程的集合
*/
protected Collection<Thread> getQueuedReaderThreads() {
return sync.getSharedQueuedThreads();
}
/**
* 是否有线程正在等待获取 读锁或写锁
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 判断线程是否在等待队列中
*/
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
/**
* 返回等待读或者等待写的线程的估计值
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* 返回等待读或写的线程的集合
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
/**
* 是否有等待线程,有则返回true
*/
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject) condition);
}
/**
* 返回等待线程的估计数量
*/
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject) condition);
}
/**
* 返回符合Condition条件的等待线程的集合
*/
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject) condition);
}
/**
* toString方法
*/
public String toString() {
int c = sync.getCount();
int w = Sync.exclusiveCount(c);
int r = Sync.sharedCount(c);
return super.toString() +
"[Write locks = " + w + ", Read locks = " + r + "]";
}
}