可重入的读写锁.
线程进入读锁的前提条件:
- 没有其他的线程在写,如果是同一个线程在写,可以获得读锁
线程进入写锁的前提条件:
- 没有线程在写
- 没有线程在读
先读再写(不释放读锁) | 先写再读(不释放写锁) | |
---|---|---|
同一个线程 | 阻塞,获取不到写锁 | 可以 |
不同线程 | 阻塞,获取不到写锁 | 阻塞,获取不到读锁 |
源码解析
ReentrantReadWriteLock里有5个内部类,分别是Sync(同步类,继承了AQS),NonfairSync(非公平同步类,继承Sync),FairSync(公平同步类,继承Sync),ReadLock(读锁),WriteLock(写锁)。其中Sync中又包含两个内部类:HoldCounter(和读锁配套使用,记录某个读线程的重入次数和线程id);ThreadLocalHoldCounter(本地线程计数器,继承了ThreadLocal,将线程和HoldCounter对象关联
写锁(独占锁)
protected final boolean tryAcquire(int acquires) {
//获取当前线程
Thread current = Thread.currentThread();
//获得state,持有锁的线程数量
int c = getState();
//独占线程的数量
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//原有独占线程的数量+本次占有的线程数>最大限制?
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//设置状态
setState(c + acquires);
return true;
}
//写锁是否阻塞 CAS是否成功
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置独占线程
setExclusiveOwnerThread(current);
return true;
}
读锁(共享锁)
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取状态
int c = getState();
//如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁数量
int r = sharedCount(c);
/*
* readerShouldBlock():读锁是否需要等待(公平锁原则)
* r < MAX_COUNT:持有线程小于最大数(65535)
* compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
*/
// 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
if (r == 0) { // 读锁数量为0
// 设置第一个读线程
firstReader = current;
// 读线程占用的资源数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
// 占用资源数加1
firstReaderHoldCount++;
} else { // 读锁数量不为0并且不为当前线程
// 获取计数器
HoldCounter rh = cachedHoldCounter;
// 计数器为空或者计数器的tid不为当前正在运行的线程的tid
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程对应的计数器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 计数为0
//加入到readHolds中
readHolds.set(rh);
//计数+1
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 无限循环
// 获取状态
int c = getState();
if (exclusiveCount(c) != 0) { // 写线程数量不为0
if (getExclusiveOwnerThread() != current) // 不为当前线程
return -1;
} else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 当前线程为第一个读线程
// assert firstReaderHoldCount > 0;
} else { // 当前线程不为第一个读线程
if (rh == null) { // 计数器不为空
//
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功
if (sharedCount(c) == 0) { // 读线程数量为0
// 设置第一个读线程
firstReader = current;
//
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
使用场景
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
int QUEUE_SIZE = 5;
LinkedBlockingQueue queue = new LinkedBlockingQueue(QUEUE_SIZE);
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();
Thread readThread = new Thread(() -> {
try {
demo.readLock.lock();
// TimeUnit.SECONDS.sleep(1);
while (demo.queue.size() == 0) {
}
int currQueueSize = demo.queue.size();
for (int i = 0; i < currQueueSize; i++) {
System.out.println(Thread.currentThread().getName() + "读取到数据:" + demo.queue.poll());
}
} catch (Exception e) {
} finally {
demo.readLock.unlock();
}
}, "readThread");
Thread writeThread = new Thread(() -> {
try {
demo.writeLock.lock();
while (demo.queue.size() < demo.QUEUE_SIZE) {
int i = new Random().nextInt(20);
demo.queue.offer(i);
System.out.println(Thread.currentThread().getName() + "写入数据:" + i);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
} finally {
demo.writeLock.unlock();
}
}, "writeThread");
writeThread.start();readThread.start();
//同一个线程
//先获取写锁,然后可以获取读锁
//可以不用释放写锁,就能获取读锁
new Thread(() -> {
try{
demo.writeLock.lock();
System.out.println("写入数据");
demo.queue.offer(new Random().nextInt(10));
demo.readLock.lock();
System.out.println("读取到数据:" + demo.queue.poll());
}catch(Exception e){
}finally {
demo.readLock.unlock();
demo.writeLock.unlock();
}
}).start();
//同一个线程
// 先获取读锁,再获取写锁
//必须得先把读锁释放,才可获得写锁
new Thread(() -> {
try{
demo.readLock.lock();
System.out.println("读取到数据:" + demo.queue.poll());
demo.writeLock.lock();
demo.queue.offer(new Random().nextInt(10));
System.out.println("写入数据");
}catch(Exception e){
}finally {
demo.writeLock.unlock();
demo.readLock.unlock();
}
}).start();
new Thread(() -> {
try{
demo.readLock.lock();
System.out.println("读取到数据:" + demo.queue.poll());
}catch(Exception e){
}finally {
demo.readLock.unlock();
}
}).start();
new Thread(() -> {
try{
demo.writeLock.lock();
System.out.println("写入数据");
demo.queue.offer(new Random().nextInt(10));
demo.readLock.lock();
}catch(Exception e){
}finally {
demo.writeLock.unlock();
}
}).start();
new Thread(() -> {
try{
demo.writeLock.lock();
System.out.println("写入数据");
demo.queue.offer(new Random().nextInt(10));
}catch(Exception e){
}finally {
demo.writeLock.unlock();
}
}).start();
TimeUnit.SECONDS.sleep(10);
new Thread(() -> {
try{
demo.readLock.lock();
System.out.println("读取到数据:" + demo.queue.poll());
}catch(Exception e){
}finally {
demo.readLock.unlock();
}
}).start();
}