读写锁介绍
场景:读多写少,
读读共享
读写,写写,写读 互斥
为了提高并发量,吞吐量
ReentrantReadWriteLock:维护了一对相关的锁,一个只读操作,一个写入操作
线程进入读锁的前提条件:
- 没有其他线程的写锁
-
线程进入写锁的前提条件:
没有其他线程的读锁
-
读写锁有以下三个重要的特性
公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
- 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
- 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。
ReentrantReadWriteLock的使用
ReadWriteLock:
public interface ReadWriteLock {
/** Returns the lock used for reading.
* @return the lock used for reading
*/
Lock readLock();
/** Returns the lock used for writing.
* @return the lock used for writing
*/
Lock writeLock();
}
ReentrantReadWriteLock: 可重入的读写锁实现类
写锁:独占
读锁:共享
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
如何使用读写锁
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock r = readWriteLock.readLock();
private Lock w = readWriteLock.writeLock();
// 读操作上读锁
public Data get(String key) {
r.lock();
try {
// TODO 业务逻辑
}finally {
r.unlock();
}
}
// 写操作上写锁
public Data put(String key, Data value) {
w.lock();
try {
// TODO 业务逻辑
}finally {
w.unlock();
}
}
注意事项
- 读锁不支持条件变量
- 重入时升级不支持:持有读锁的情况下去获取写锁,会导致获取永久等待
- 重入时支持降级: 持有写锁的情况下可以去获取读锁
示例Demo
public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 获取一个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
锁降级
写锁 ——> 读锁
先把持写锁——>再获取读锁——-> 再释放写锁-
目的:保证线程修改的数据能够被其他线程可见,主要是为了保证数据的可见性
如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新
不支持锁升级
目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
private volatile boolean update = false;
public void processData() {
readLock.lock();
if (!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// TODO 准备数据的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 锁降级完成,写锁降级为读锁
}
try {
//TODO 使用数据的流程(略)
} finally {
readLock.unlock();
}
}
ReentrantReadWriteLock源码分析
类结构
读写锁的设计
使用Sync 的int 类型 state 表示同步状态
把一个变量,分为两步分: 高16位表示读,低16位表示写
假如当前同步状态为S,那么:
- 写状态:等于 S & 0x0000FFFF 把高位16位 全部抹去,当写状态加1 ,等于 S + 1
- 读状态:等于 S >>> 16 , 0 右移16位,当读状态加1,等于 s + (1<<16)也就是 S + 0x00010000
推论:S 不等于0时,当写状态(S&0x0000FFFF) 等于0 ,则读状态 (S >>>16) 大于0 ,读锁已经获取
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count
* 获取持有读状态锁的线程数量,读锁可以被多个线程持有,
读锁支持重入特性,每个线程持有的读锁的数量是单独计算的,HoldCounter 计算器
*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count
* 获得持有写状态的锁的次数
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
HoldCounter 计数器
读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/**
*HoldCounter与线程绑定,记录当前线程的重入次数的
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
- HoldCounter是用来记录读锁重入数的对象
- ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象
写锁的获取
写锁是一个支持重进入的排它锁
如果是当前线程获取的写锁,则增加写状态
如果当前线程获取血锁时,读锁已经被获取,或该线程不是获取写锁的线程,则等待
protected final boolean tryAcquire(int acquires) {
//当前线程
Thread current = Thread.currentThread();
//获取state状态 存在读锁或者写锁,状态就不为0
int c = getState();
//获取写锁的重入数
int w = exclusiveCount(c);
//当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
if (c != 0) {
// c!=0 && w==0 表示存在读锁
// 当前存在读锁或者写锁已经被其他写线程获取,则写锁获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 超出最大范围 65535
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//同步state状态
setState(c + acquires);
return true;
}
// writerShouldBlock有公平与非公平的实现AQS实现的, 非公平返回false,会尝试通过cas加锁
//c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//实现重入的功能
//设置写锁为当前线程所有
setExclusiveOwnerThread(current);
return true;
}
- 读写互斥
- 写写互斥
- 写锁支持同一个线程重入
writerShouldBlock写锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)
写锁的释放
写锁释放通过重写AQS的tryRelease方法实现
protected final boolean tryRelease(int releases) {
//若锁的持有者不是当前线程,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//当前写状态是否为0,为0则释放写锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);//把独占线程设置为空
setState(nextc);
return free;
}
读锁的获取
实现共享式同步组件的同步语义需要通过重写AQS的tryAcquireShared方法和tryReleaseShared方法。读锁的获取实现方法为:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁已经被获取并且获取写锁的线程不是当前线程,当前线程获取读锁失败返回-1
// 判断锁降级
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//计算出读锁的数量
int r = sharedCount(c);
/**
* 读锁是否阻塞 readerShouldBlock()公平与非公平的实现
* r < MAX_COUNT: 持有读锁的线程小于最大数(65535)
* compareAndSetState(c, c + SHARED_UNIT) cas设置获取读锁线程的数量
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { //当前线程获取读锁
if (r == 0) { //设置第一个获取读锁的线程
firstReader = current;
firstReaderHoldCount = 1; //设置第一个获取读锁线程的重入数
} else if (firstReader == current) { // 表示第一个获取读锁的线程重入
firstReaderHoldCount++;
} else { // 非第一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; //记录其他获取读锁的线程的重入次数
}
return 1;
}
// 尝试通过自旋的方式获取读锁,实现了重入逻辑
return fullTryAcquireShared(current);
}
读锁共享,读读不互斥
- 读锁可重入,每个获取读锁的线程都会记录对应的重入数
- 读写互斥,锁降级场景除外
- 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
- readerShouldBlock读锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)
读锁的释放
获取到读锁,执行完临界区后,要记得释放读锁(如果重入多次要释放对应的次数),不然会阻塞其他线程的写操作。
读锁释放的实现主要通过方法tryReleaseShared:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果当前线程是第一个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--; //重入次数减1
} else { //不是第一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; //重入次数减1
}
for (;;) { //cas更新同步状态
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}