1.场景描述

对资源的访问一般包括两种类型的动作——读和写(更新、删除、增加等资源会发生变化的动作),如果多个线程在某个时刻都在进行资源的读操作,虽然有资源的竞争,但是这种竞争不足以引起数据不一致的情况发生,那么这个时候直接采用排他的方式加锁,就显得有些简单粗暴了。表1将两个线程对资源的访问动作进行了枚举,除了多线程在同一时间都进行读操作时不会引起冲突之外,其余的情况都会导致访问的冲突,需要对资源进行同步处理。
image.png

2.读写分离程序设计

2.1 接口定义

读写锁的类图
image.png

1.Lock接口定义

  1. public interface Lock {
  2. // 获取显示锁, 没有获得锁的线程将被堵塞
  3. void lock() throws InterruptedException;
  4. // 释放获取的锁
  5. void unlock();
  6. }

Lock接口定义了锁的基本操作, 加锁和解锁, 显式锁的操作强烈建议与try finally语句块一起使用,加锁和解锁说明如下。

  • lock() :当前线程尝试获得锁的拥有权, 在此期间有可能进入阻塞。
  • unlock() :释放锁, 其主要目的就是为了减少reader或者writer的数量。

2.ReadWriteLock接口定义

ReadWrite Lock虽然名字中有lock, 但是它并不是lock, 它主要是用于创建read lock和write lock的, 并且提供了查询功能用于查询当前有多少个reader和writer以及waiting中的writer, 根据我们在前文中的分析, 如果reader的个数大于0, 那就意味着writer的个数等于0, 反之writer的个数大于0(事实上writer最多只能为1) , 则reader的个数等于0,由于读和写,写和写之间都存在着冲突,因此这样的数字关系也就不奇怪了。

  1. readLock() :该方法主要用来获得一个Read Lock
  2. writeLock() :同read Lock类似, 该方法用来获得Write Lock
  3. getWriting Writers) :获取当前有多少个线程正在进行写的操作, 最多是1个。
  4. getWaiting Writers() :获取当前有多少个线程由于获得写锁而导致阻塞。
  5. getReading Readers() :获取当前有多少个线程正在进行读的操作。

2.2程序实现

1.ReadWriteLockImpl

相对于Lock, ReadWrite Lock Impl更像是一个工厂类, 可以通过它创建不同类型的锁,我们将ReadWrite Lock Impl设计为包可见的类, 其主要目的是不想对外暴露更多的细节,在ReadWrite Lock Impl中还定义了非常多的包可见方法, 代码所示

  1. public class ReadWriteLockImpl implements ReadWriteLock{
  2. // 定义对象锁
  3. private final Object MUTEX = new Object();
  4. // 当前有多少个线程正在写入
  5. private int writingWriters = 0;
  6. // 当前有多少个线程正在等待写入
  7. private int waitingWriters = 0;
  8. // 当前有多少个线程正在read
  9. private int readingReaders = 0;
  10. // read 和 write 的偏好设置
  11. private boolean preferWriter;
  12. // 默认情况下preferWrite为true
  13. public ReadWriteLockImpl() {
  14. this(true);
  15. }
  16. // 构造ReadWriteLockImpl并且传入preferWriter
  17. public ReadWriteLockImpl(boolean preferWriter) {
  18. this.preferWriter = preferWriter;
  19. }
  20. public Object getMUTEX() {
  21. return MUTEX;
  22. }
  23. public int getWaitingWriters() {
  24. return waitingWriters;
  25. }
  26. public boolean getPreferWriter() {
  27. return preferWriter;
  28. }
  29. // 创建读锁
  30. @Override
  31. public Lock readLock() {
  32. return new ReadLock(this);
  33. }
  34. // 创建写锁
  35. @Override
  36. public Lock writeLock() {
  37. return new WriteLock(this);
  38. }
  39. // 使写线程的数量增加
  40. void incrementWritingWriters() {
  41. this.writingWriters++;
  42. }
  43. // 使等待写入的线程数量增加
  44. void incrementWaitingWriters() {
  45. this.waitingWriters++;
  46. }
  47. // 使读线程的数量增加
  48. void incrementReadingReaders() {
  49. this.readingReaders++;
  50. }
  51. // 使写线程的数量减少
  52. void decrementWritingWriters() {
  53. this.writingWriters--;
  54. }
  55. // 使等待获取写入锁的数量减一
  56. void decrementWaitingWriters() {
  57. this.waitingWriters--;
  58. }
  59. // 使读取线程的数量减少
  60. void descementReadingReaders() {
  61. this.readingReaders--;
  62. }
  63. @Override
  64. public int getWritingWriters() {
  65. return this.writingWriters;
  66. }
  67. @Override
  68. public int getReadingReaders() {
  69. return this.readingReaders;
  70. }
  71. void changePrefer(boolean preferWriter) {
  72. this.preferWriter = preferWriter;
  73. }
  74. }

虽然我们在开发一个读写锁,但是在实现的内部也需要一个锁进行数据同步以及线程之间的通信, 其中MUTEX的作用就在于此, 而prefer Writer的作用在于控制倾向性, 一般来说读写锁非常适用于读多写少的场景, 如果prefer Writer为false, 很多读线程都在读数据,那么写线程将会很难得到写的机会。

2.ReadLock

读锁是Lock的实现, 同样将其设计成包可见以透明其实现细节, 让使用者只用专注于对接口的调用,代码如所示

  1. public class ReadLock implements Lock{
  2. private final ReadWriteLockImpl readWriteLock;
  3. ReadLock(ReadWriteLockImpl readWriteLock) {
  4. this.readWriteLock = readWriteLock;
  5. }
  6. @Override
  7. public void lock() throws InterruptedException {
  8. // 使用Mutex 作为 锁
  9. synchronized (readWriteLock.getMUTEX()) {
  10. // 若此时有线程在进行写操作,或者有写线程在等待并且偏向写锁的标识为
  11. // true时,就会无法获得读锁,只能被挂起
  12. while(readWriteLock.getWritingWriters() > 0
  13. || (readWriteLock.getPreferWriter() && readWriteLock.getWritingWriters() > 0 )) {
  14. readWriteLock.getMUTEX().wait();
  15. }
  16. readWriteLock.incrementReadingReaders();
  17. }
  18. }
  19. @Override
  20. public void unlock() {
  21. // 使用Mutex作为锁,并且进行同步
  22. synchronized (readWriteLock.getMUTEX()) {
  23. // 释放锁的过程就是使得当前reading的数量减一
  24. // 将perferWriter设置为true,可以使得writer线程获得更多的机会
  25. // 通知唤醒与Mutex关联monitor waitset中的线程
  26. readWriteLock.descementReadingReaders();
  27. readWriteLock.changePrefer(true);
  28. readWriteLock.getMUTEX().notifyAll();
  29. }
  30. }
  31. }
  • 当没有任何线程对数据进行写操作的时候,读线程才有可能获得锁的拥有权,当然除此之外,为了公平起见,如果当前有很多线程正在等待获得写锁的拥有权,同样读线程将会进入Mutex的wait set中, reading Reader的数量将增加。
  • 读线程释放锁, 这意味着reader的数量将减少一个, 同时唤醒wait中的线程, reader唤醒的基本上都是由于获取写锁而进入阻塞的线程,为了提高写锁获得锁的机会,需要将prefer Writer修改为true

3.WriteLock

写锁是Lock的实现, 同样将其设计成包可见以透明其实现细节, 让使用者只用专注于对接口的调用,由于写-写冲突的存在,同一时间只能由一个线程获得锁的拥有权,代码所示。

  1. public class WriteLock implements Lock{
  2. private final ReadWriteLockImpl readWriteLock;
  3. public WriteLock(ReadWriteLockImpl readWriteLock) {
  4. this.readWriteLock = readWriteLock;
  5. }
  6. @Override
  7. public void lock() throws InterruptedException {
  8. synchronized (readWriteLock.getMUTEX()) {
  9. try {
  10. // 首先使等待获取写入锁的数字加一
  11. readWriteLock.incrementWritingWriters();
  12. // 如果此时有其他线程正在进行读操作,或者写操作,那么当前线程将被挂起
  13. while(readWriteLock.getReadingReaders() > 0
  14. || readWriteLock.getWritingWriters() > 0) {
  15. readWriteLock.getMUTEX().wait();
  16. }
  17. } finally {
  18. // 成功获取到了写入锁,使得等待获取写入锁的计数减一
  19. this.readWriteLock.decrementWaitingWriters();
  20. }
  21. // 将正在写入的线程数量加一
  22. readWriteLock.incrementWritingWriters();
  23. }
  24. }
  25. @Override
  26. public void unlock() {
  27. synchronized (readWriteLock.getMUTEX() ) {
  28. // 减少正在写入锁的线程计数器
  29. readWriteLock.decrementWritingWriters();
  30. // 将偏好状态修改为false, 可以使得读锁被最快速的获得
  31. readWriteLock.changePrefer(false);
  32. // 通知唤醒其他在Mutex monitor waitset中的线程
  33. readWriteLock.getMUTEX().notifyAll();
  34. }
  35. }
  36. }
  • 当有线程在进行读操作或者写操作的时候,若当前线程试图获得锁,则其将会进入MUTEX的wait set中而阻塞, 同时增加waiting Writer和writing Writer的数量, 但是当线程从wait set中被激活的时候waiting Writer将很快被减少。
  • 写释放锁, 意味着writer的数量减少, 事实上变成了0, 同时唤醒wait中的线程,并将prefer Writer修改为false, 以提高读线程获得锁的机会。

3.读写锁的使用

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.TimeUnit;
  4. public class ShareData {
  5. // 定义共享数据(资源)
  6. private final List<Character> container = new ArrayList<>();
  7. //构造ReadWriteLock
  8. private final ReadWriteLock readWriteLock = ReadWriteLock.readWriteLock();
  9. // 创建读取锁
  10. private final Lock readLock = readWriteLock.readLock();
  11. // 创建写入锁
  12. private final Lock writeLock = readWriteLock.writeLock();
  13. private final int length;
  14. public ShareData(int length) {
  15. this.length = length;
  16. for(int i = 0; i < length; i++) {
  17. container.add(i, 'c');
  18. }
  19. }
  20. public char[] read() throws InterruptedException {
  21. try {
  22. // 首先使用读锁进行lock
  23. readLock.lock();
  24. char[] newBuffer = new char[length];
  25. for(int i = 0; i < length; i++) {
  26. newBuffer[i] = container.get(i);
  27. }
  28. slowly();
  29. return newBuffer;
  30. } finally {
  31. // 当操作结束之后,将锁释放
  32. readLock.unlock();
  33. }
  34. }
  35. public void write(char c) throws InterruptedException {
  36. try {
  37. //使用写锁进行lock
  38. writeLock.lock();
  39. for(int i = 0; i < length; i++ ) {
  40. this.container.add(i, c);
  41. }
  42. slowly();
  43. }finally {
  44. // 当所有的操作都完成之后,对写锁进行释放
  45. writeLock.unlock();
  46. }
  47. }
  48. // 简单模拟操作的耗时
  49. private void slowly() {
  50. try {
  51. TimeUnit.SECONDS.sleep(1);
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }

ShareData中涉及了对数据的读写操作, 因此它是需要进行线程同步控制的。首先, 创建一个ReadWrite Lock工厂类, 然后用该工厂分别创建ReadLock和WriteLock的实例, 在read方法中使用Read Lock对其进行加锁, 而在write方法中则使用WriteLock, 的程序则是关于对ShareData的使用。

  1. public class ReadWriteLockTest {
  2. //// this is the example for read write lock
  3. private final static String text = "this";
  4. public static void main(String[] args) {
  5. // 定义共享数据
  6. final ShareData shareData = new ShareData(50);
  7. // 创建两个线程进行数据写操作
  8. for(int i = 0; i < 2; i++) {
  9. new Thread(
  10. () -> {
  11. for(int index = 0; index < text.length(); index++ ) {
  12. try {
  13. char c= text.charAt(index);
  14. shareData.write(c);
  15. System.out.println(Thread.currentThread() + " write " + c);
  16. }catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. ).start();
  22. // 创建10个线程进行数据读操作
  23. for(int i1 = 0; i1 < 10; i1++ ) {
  24. new Thread( ()-> {
  25. while(true) {
  26. try {
  27. System.out.println(Thread.currentThread() + " read " + new String(shareData.read()));
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }).start();
  33. }
  34. }
  35. }
  36. }