可重入的读写锁.
线程进入读锁的前提条件:
- 没有其他的线程在写,如果是同一个线程在写,可以获得读锁
线程进入写锁的前提条件:
- 没有线程在写
- 没有线程在读
| 先读再写(不释放读锁) | 先写再读(不释放写锁) | |
|---|---|---|
| 同一个线程 | 阻塞,获取不到写锁 | 可以 |
| 不同线程 | 阻塞,获取不到写锁 | 阻塞,获取不到读锁 |
源码解析
ReentrantReadWriteLock里有5个内部类,分别是Sync(同步类,继承了AQS),NonfairSync(非公平同步类,继承Sync),FairSync(公平同步类,继承Sync),ReadLock(读锁),WriteLock(写锁)。其中Sync中又包含两个内部类:HoldCounter(和读锁配套使用,记录某个读线程的重入次数和线程id);ThreadLocalHoldCounter(本地线程计数器,继承了ThreadLocal,将线程和HoldCounter对象关联
写锁(独占锁)
protected final boolean tryAcquire(int acquires) {//获取当前线程Thread current = Thread.currentThread();//获得state,持有锁的线程数量int c = getState();//独占线程的数量int w = exclusiveCount(c);if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)if (w == 0 || current != getExclusiveOwnerThread())return false;//原有独占线程的数量+本次占有的线程数>最大限制?if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquire//设置状态setState(c + acquires);return true;}//写锁是否阻塞 CAS是否成功if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;//设置独占线程setExclusiveOwnerThread(current);return true;}

读锁(共享锁)
protected final int tryAcquireShared(int unused) {// 获取当前线程Thread current = Thread.currentThread();// 获取状态int c = getState();//如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 读锁数量int r = sharedCount(c);/** readerShouldBlock():读锁是否需要等待(公平锁原则)* r < MAX_COUNT:持有线程小于最大数(65535)* compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态*/// 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {//r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中if (r == 0) { // 读锁数量为0// 设置第一个读线程firstReader = current;// 读线程占用的资源数为1firstReaderHoldCount = 1;} else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入// 占用资源数加1firstReaderHoldCount++;} else { // 读锁数量不为0并且不为当前线程// 获取计数器HoldCounter rh = cachedHoldCounter;// 计数器为空或者计数器的tid不为当前正在运行的线程的tidif (rh == null || rh.tid != getThreadId(current))// 获取当前线程对应的计数器cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0) // 计数为0//加入到readHolds中readHolds.set(rh);//计数+1rh.count++;}return 1;}return fullTryAcquireShared(current);}
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) { // 无限循环// 获取状态int c = getState();if (exclusiveCount(c) != 0) { // 写线程数量不为0if (getExclusiveOwnerThread() != current) // 不为当前线程return -1;} else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) { // 当前线程为第一个读线程// assert firstReaderHoldCount > 0;} else { // 当前线程不为第一个读线程if (rh == null) { // 计数器不为空//rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tidrh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功if (sharedCount(c) == 0) { // 读线程数量为0// 设置第一个读线程firstReader = current;//firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}


使用场景
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();int QUEUE_SIZE = 5;LinkedBlockingQueue queue = new LinkedBlockingQueue(QUEUE_SIZE);public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();Thread readThread = new Thread(() -> {try {demo.readLock.lock();// TimeUnit.SECONDS.sleep(1);while (demo.queue.size() == 0) {}int currQueueSize = demo.queue.size();for (int i = 0; i < currQueueSize; i++) {System.out.println(Thread.currentThread().getName() + "读取到数据:" + demo.queue.poll());}} catch (Exception e) {} finally {demo.readLock.unlock();}}, "readThread");Thread writeThread = new Thread(() -> {try {demo.writeLock.lock();while (demo.queue.size() < demo.QUEUE_SIZE) {int i = new Random().nextInt(20);demo.queue.offer(i);System.out.println(Thread.currentThread().getName() + "写入数据:" + i);TimeUnit.SECONDS.sleep(1);}} catch (Exception e) {} finally {demo.writeLock.unlock();}}, "writeThread");writeThread.start();readThread.start();//同一个线程//先获取写锁,然后可以获取读锁//可以不用释放写锁,就能获取读锁new Thread(() -> {try{demo.writeLock.lock();System.out.println("写入数据");demo.queue.offer(new Random().nextInt(10));demo.readLock.lock();System.out.println("读取到数据:" + demo.queue.poll());}catch(Exception e){}finally {demo.readLock.unlock();demo.writeLock.unlock();}}).start();//同一个线程// 先获取读锁,再获取写锁//必须得先把读锁释放,才可获得写锁new Thread(() -> {try{demo.readLock.lock();System.out.println("读取到数据:" + demo.queue.poll());demo.writeLock.lock();demo.queue.offer(new Random().nextInt(10));System.out.println("写入数据");}catch(Exception e){}finally {demo.writeLock.unlock();demo.readLock.unlock();}}).start();new Thread(() -> {try{demo.readLock.lock();System.out.println("读取到数据:" + demo.queue.poll());}catch(Exception e){}finally {demo.readLock.unlock();}}).start();new Thread(() -> {try{demo.writeLock.lock();System.out.println("写入数据");demo.queue.offer(new Random().nextInt(10));demo.readLock.lock();}catch(Exception e){}finally {demo.writeLock.unlock();}}).start();new Thread(() -> {try{demo.writeLock.lock();System.out.println("写入数据");demo.queue.offer(new Random().nextInt(10));}catch(Exception e){}finally {demo.writeLock.unlock();}}).start();TimeUnit.SECONDS.sleep(10);new Thread(() -> {try{demo.readLock.lock();System.out.println("读取到数据:" + demo.queue.poll());}catch(Exception e){}finally {demo.readLock.unlock();}}).start();}
