读写锁介绍

场景:读多写少,
读读共享
读写,写写,写读 互斥
为了提高并发量,吞吐量

ReentrantReadWriteLock:维护了一对相关的锁,一个只读操作,一个写入操作

线程进入读锁的前提条件:

  • 没有其他线程的写锁
  • 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。

    线程进入写锁的前提条件:

  • 没有其他线程的读锁

  • 没有其他线程的写锁

    读写锁有以下三个重要的特性

  • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

  • 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
  • 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。

ReentrantReadWriteLock的使用

ReadWriteLock:

  1. public interface ReadWriteLock {
  2. /** Returns the lock used for reading.
  3. * @return the lock used for reading
  4. */
  5. Lock readLock();
  6. /** Returns the lock used for writing.
  7. * @return the lock used for writing
  8. */
  9. Lock writeLock();
  10. }

ReentrantReadWriteLock: 可重入的读写锁实现类
写锁:独占
读锁:共享

  1. public class ReentrantReadWriteLock
  2. implements ReadWriteLock, java.io.Serializable {
  3. private static final long serialVersionUID = -6992448646407690164L;
  4. /** Inner class providing readlock */
  5. private final ReentrantReadWriteLock.ReadLock readerLock;
  6. /** Inner class providing writelock */
  7. private final ReentrantReadWriteLock.WriteLock writerLock;
  8. /** Performs all synchronization mechanics */
  9. final Sync sync;

如何使用读写锁

  1. private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  2. private Lock r = readWriteLock.readLock();
  3. private Lock w = readWriteLock.writeLock();
  4. // 读操作上读锁
  5. public Data get(String key) {
  6. r.lock();
  7. try {
  8. // TODO 业务逻辑
  9. }finally {
  10. r.unlock();
  11. }
  12. }
  13. // 写操作上写锁
  14. public Data put(String key, Data value) {
  15. w.lock();
  16. try {
  17. // TODO 业务逻辑
  18. }finally {
  19. w.unlock();
  20. }
  21. }

注意事项

  • 读锁不支持条件变量
  • 重入时升级不支持:持有读锁的情况下去获取写锁,会导致获取永久等待
  • 重入时支持降级: 持有写锁的情况下可以去获取读锁

示例Demo

  1. public class Cache {
  2. static Map<String, Object> map = new HashMap<String, Object>();
  3. static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  4. static Lock r = rwl.readLock();
  5. static Lock w = rwl.writeLock();
  6. // 获取一个key对应的value
  7. public static final Object get(String key) {
  8. r.lock();
  9. try {
  10. return map.get(key);
  11. } finally {
  12. r.unlock();
  13. }
  14. }
  15. // 设置key对应的value,并返回旧的value
  16. public static final Object put(String key, Object value) {
  17. w.lock();
  18. try {
  19. return map.put(key, value);
  20. } finally {
  21. w.unlock();
  22. }
  23. }
  24. // 清空所有的内容
  25. public static final void clear() {
  26. w.lock();
  27. try {
  28. map.clear();
  29. } finally {
  30. w.unlock();
  31. }
  32. }

锁降级
写锁 ——> 读锁
先把持写锁——>再获取读锁——-> 再释放写锁-
目的:保证线程修改的数据能够被其他线程可见,主要是为了保证数据的可见性
如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新
不支持锁升级
目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。

  1. private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  2. private final Lock r = rwl.readLock();
  3. private final Lock w = rwl.writeLock();
  4. private volatile boolean update = false;
  5. public void processData() {
  6. readLock.lock();
  7. if (!update) {
  8. // 必须先释放读锁
  9. readLock.unlock();
  10. // 锁降级从写锁获取到开始
  11. writeLock.lock();
  12. try {
  13. if (!update) {
  14. // TODO 准备数据的流程(略)
  15. update = true;
  16. }
  17. readLock.lock();
  18. } finally {
  19. writeLock.unlock();
  20. }
  21. // 锁降级完成,写锁降级为读锁
  22. }
  23. try {
  24. //TODO 使用数据的流程(略)
  25. } finally {
  26. readLock.unlock();
  27. }
  28. }

ReentrantReadWriteLock源码分析

类结构
1648645284(1).png
1648645329(1).png
读写锁的设计
使用Sync 的int 类型 state 表示同步状态
把一个变量,分为两步分: 高16位表示读,低16位表示写
假如当前同步状态为S,那么:

  • 写状态:等于 S & 0x0000FFFF 把高位16位 全部抹去,当写状态加1 ,等于 S + 1
  • 读状态:等于 S >>> 16 , 0 右移16位,当读状态加1,等于 s + (1<<16)也就是 S + 0x00010000

推论:S 不等于0时,当写状态(S&0x0000FFFF) 等于0 ,则读状态 (S >>>16) 大于0 ,读锁已经获取
1648646309(1).png

  1. static final int SHARED_SHIFT = 16;
  2. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  3. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  4. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  5. /** Returns the number of shared holds represented in count
  6. * 获取持有读状态锁的线程数量,读锁可以被多个线程持有,
  7. 读锁支持重入特性,每个线程持有的读锁的数量是单独计算的,HoldCounter 计算器
  8. */
  9. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  10. /** Returns the number of exclusive holds represented in count
  11. * 获得持有写状态的锁的次数
  12. */
  13. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

HoldCounter 计数器

读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。

  1. /**
  2. * A counter for per-thread read hold counts.
  3. * Maintained as a ThreadLocal; cached in cachedHoldCounter
  4. */
  5. static final class HoldCounter {
  6. int count = 0;
  7. // Use id, not reference, to avoid garbage retention
  8. final long tid = getThreadId(Thread.currentThread());
  9. }
  10. /**
  11. *HoldCounter与线程绑定,记录当前线程的重入次数的
  12. */
  13. static final class ThreadLocalHoldCounter
  14. extends ThreadLocal<HoldCounter> {
  15. public HoldCounter initialValue() {
  16. return new HoldCounter();
  17. }
  18. }
  • HoldCounter是用来记录读锁重入数的对象
  • ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象

写锁的获取

写锁是一个支持重进入的排它锁
如果是当前线程获取的写锁,则增加写状态
如果当前线程获取血锁时,读锁已经被获取,或该线程不是获取写锁的线程,则等待

  1. protected final boolean tryAcquire(int acquires) {
  2. //当前线程
  3. Thread current = Thread.currentThread();
  4. //获取state状态 存在读锁或者写锁,状态就不为0
  5. int c = getState();
  6. //获取写锁的重入数
  7. int w = exclusiveCount(c);
  8. //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
  9. if (c != 0) {
  10. // c!=0 && w==0 表示存在读锁
  11. // 当前存在读锁或者写锁已经被其他写线程获取,则写锁获取失败
  12. if (w == 0 || current != getExclusiveOwnerThread())
  13. return false;
  14. // 超出最大范围 65535
  15. if (w + exclusiveCount(acquires) > MAX_COUNT)
  16. throw new Error("Maximum lock count exceeded");
  17. //同步state状态
  18. setState(c + acquires);
  19. return true;
  20. }
  21. // writerShouldBlock有公平与非公平的实现AQS实现的, 非公平返回false,会尝试通过cas加锁
  22. //c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
  23. if (writerShouldBlock() ||
  24. !compareAndSetState(c, c + acquires))
  25. return false;
  26. //实现重入的功能
  27. //设置写锁为当前线程所有
  28. setExclusiveOwnerThread(current);
  29. return true;
  30. }

image.png

  • 读写互斥
  • 写写互斥
  • 写锁支持同一个线程重入
  • writerShouldBlock写锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)

    写锁的释放

    写锁释放通过重写AQS的tryRelease方法实现

    1. protected final boolean tryRelease(int releases) {
    2. //若锁的持有者不是当前线程,抛出异常
    3. if (!isHeldExclusively())
    4. throw new IllegalMonitorStateException();
    5. int nextc = getState() - releases;
    6. //当前写状态是否为0,为0则释放写锁
    7. boolean free = exclusiveCount(nextc) == 0;
    8. if (free)
    9. setExclusiveOwnerThread(null);//把独占线程设置为空
    10. setState(nextc);
    11. return free;
    12. }

    1648647373(1).png

    读锁的获取

    实现共享式同步组件的同步语义需要通过重写AQS的tryAcquireShared方法和tryReleaseShared方法。读锁的获取实现方法为:

    1. protected final int tryAcquireShared(int unused) {
    2. Thread current = Thread.currentThread();
    3. int c = getState();
    4. // 如果写锁已经被获取并且获取写锁的线程不是当前线程,当前线程获取读锁失败返回-1
    5. // 判断锁降级
    6. if (exclusiveCount(c) != 0 &&
    7. getExclusiveOwnerThread() != current)
    8. return -1;
    9. //计算出读锁的数量
    10. int r = sharedCount(c);
    11. /**
    12. * 读锁是否阻塞 readerShouldBlock()公平与非公平的实现
    13. * r < MAX_COUNT: 持有读锁的线程小于最大数(65535)
    14. * compareAndSetState(c, c + SHARED_UNIT) cas设置获取读锁线程的数量
    15. */
    16. if (!readerShouldBlock() &&
    17. r < MAX_COUNT &&
    18. compareAndSetState(c, c + SHARED_UNIT)) { //当前线程获取读锁
    19. if (r == 0) { //设置第一个获取读锁的线程
    20. firstReader = current;
    21. firstReaderHoldCount = 1; //设置第一个获取读锁线程的重入数
    22. } else if (firstReader == current) { // 表示第一个获取读锁的线程重入
    23. firstReaderHoldCount++;
    24. } else { // 非第一个获取读锁的线程
    25. HoldCounter rh = cachedHoldCounter;
    26. if (rh == null || rh.tid != getThreadId(current))
    27. cachedHoldCounter = rh = readHolds.get();
    28. else if (rh.count == 0)
    29. readHolds.set(rh);
    30. rh.count++; //记录其他获取读锁的线程的重入次数
    31. }
    32. return 1;
    33. }
    34. // 尝试通过自旋的方式获取读锁,实现了重入逻辑
    35. return fullTryAcquireShared(current);
    36. }
  • 读锁共享,读读不互斥

  • 读锁可重入,每个获取读锁的线程都会记录对应的重入数
  • 读写互斥,锁降级场景除外
  • 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
  • readerShouldBlock读锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)

1648649053(1).png

读锁的释放

获取到读锁,执行完临界区后,要记得释放读锁(如果重入多次要释放对应的次数),不然会阻塞其他线程的写操作。
读锁释放的实现主要通过方法tryReleaseShared:

  1. protected final boolean tryReleaseShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. //如果当前线程是第一个获取读锁的线程
  4. if (firstReader == current) {
  5. // assert firstReaderHoldCount > 0;
  6. if (firstReaderHoldCount == 1)
  7. firstReader = null;
  8. else
  9. firstReaderHoldCount--; //重入次数减1
  10. } else { //不是第一个获取读锁的线程
  11. HoldCounter rh = cachedHoldCounter;
  12. if (rh == null || rh.tid != getThreadId(current))
  13. rh = readHolds.get();
  14. int count = rh.count;
  15. if (count <= 1) {
  16. readHolds.remove();
  17. if (count <= 0)
  18. throw unmatchedUnlockException();
  19. }
  20. --rh.count; //重入次数减1
  21. }
  22. for (;;) { //cas更新同步状态
  23. int c = getState();
  24. int nextc = c - SHARED_UNIT;
  25. if (compareAndSetState(c, nextc))
  26. // Releasing the read lock has no effect on readers,
  27. // but it may allow waiting writers to proceed if
  28. // both read and write locks are now free.
  29. return nextc == 0;
  30. }
  31. }

1648649092(1).png