StampedLock如何解决写饥饿问题?

StampedLock是JDK1.8 新引入的,对读写锁的增强。

方案就是:读采用乐观锁,如果不一致,就

https://www.liaoxuefeng.com/wiki/1252599548343744/1309138673991714

前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。
如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。
StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
我们来看例子:

Plain Text复制代码

1

public class Point {
2

  1. private final StampedLock stampedLock = new StampedLock();<br />3
  2. private double x;<br />4
  3. private double y;<br />5
  4. public void move(double deltaX, double deltaY) {<br />6
  5. long stamp = stampedLock.writeLock(); // 获取写锁<br />7
  6. try {<br />8
  7. x += deltaX;<br />9
  8. y += deltaY;<br />10
  9. } finally {<br />11
  10. stampedLock.unlockWrite(stamp); // 释放写锁<br />12
  11. }<br />13
  12. }<br />14
  13. public double distanceFromOrigin() {<br />15
  14. long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁<br />16
  15. // 注意下面两行代码不是原子操作<br />17
  16. // 假设x,y = (100,200)<br />18
  17. double currentX = x;<br />19
  18. // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)<br />20
  19. double currentY = y;<br />21
  20. // 此处已读取到y,如果没有写入,读取是正确的(100,200)<br />22
  21. // 如果有写入,读取是错误的(100,400)<br />23
  22. if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生<br />24
  23. stamp = stampedLock.readLock(); // 获取一个悲观读锁<br />25
  24. try {<br />26
  25. currentX = x;<br />27
  26. currentY = y;<br />28
  27. } finally {<br />29
  28. stampedLock.unlockRead(stamp); // 释放悲观读锁<br />30
  29. }<br />31
  30. }<br />32
  31. return Math.sqrt(currentX * currentX + currentY * currentY);<br />33
  32. }<br />34

}

和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。
可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。
StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
小结
StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;
StampedLock是不可重入锁。

——
一篇文章搞定——JDK8中新增的StampedLock
https://cloud.tencent.com/developer/article/1470988

一、StampedLock类简介
StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。
首先明确下,该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件,用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。

1.1 StampedLock的引入
上一篇文章,讲解了读写锁——ReentrantReadWriteLock原理详解,那么为什么有了ReentrantReadWriteLock,还要引入StampedLock?
ReentrantReadWriteLock使得多个读线程同时持有读锁(只要写锁未被占用),而写锁是独占的。
但是,读写锁如果使用不当,很容易产生“饥饿”问题:
比如在读线程非常多,写线程很少的情况下,很容易导致写线程“饥饿”,虽然使用“公平”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。

1.2 StampedLock的特点
try系列获取锁的函数,当获取锁失败后会返回为0的stamp值。当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的stamp值。
StampedLockd的内部实现是基于CLH锁的,CLH锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,
保存着一个标记位locked,用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。

二、StampedLock使用示例
先来看一个Oracle官方的例子:
可以看到,上述示例最特殊的其实是distanceFromOrigin方法,这个方法中使用了“Optimistic reading”乐观读锁,使得读写可以并发执行,但是“Optimistic reading”的使用必须遵循以下模式:

三、StampedLock原理

3.1 StampedLock的内部常量
StampedLock虽然不像其它锁一样定义了内部类来实现AQS框架,但是StampedLock的基本实现思路还是利用CLH队列进行线程的管理,通过同步状态值来表示锁的状态和类型。
StampedLock内部定义了很多常量,定义这些常量的根本目的还是和ReentrantReadWriteLock一样,对同步状态值按位切分,以通过位运算对State进行操作:
对于StampedLock来说,写锁被占用的标志是第8位为1,读锁使用0-7位,正常情况下读锁数目为1-126,超过126时,使用一个名为readerOverflow的int整型保存超出数。

部分常量的比特位表示如下:

另外,StampedLock相比ReentrantReadWriteLock,对多核CPU进行了优化,可以看到,当CPU核数超过1时,会有一些自旋操作:

3.2 示例分析
假设现在有多个线程:ThreadA、ThreadB、ThreadC、ThreadD、ThreadE。操作如下:
ThreadA调用writeLock————获取写锁
ThreadB调用readLock————获取读锁
ThreadC调用readLock————获取读锁
ThreadD调用writeLock————获取写锁
ThreadE调用readLock————获取读锁

1. StampedLock对象的创建
StampedLock的构造器很简单,构造时设置下同步状态值:
/
Creates a new lock, initially in unlocked state.
/
public StampedLock() {
state = ORIGIN;
}
另外,StamedLock提供了三类视图:
// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
这些视图其实是对StamedLock方法的封装,便于习惯了ReentrantReadWriteLock的用户使用:
例如,
ReadLockView**其实相当于ReentrantReadWriteLock.readLock()返回的读锁;
final class ReadLockView implements Lock {
public void lock() { readLock(); }
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
public boolean tryLock() { return tryReadLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockRead(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}

2. ThreadA调用writeLock获取写锁
来看下writeLock方法:
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&//(s=state)&ABITS==0L表示读锁和写锁都未被使用
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?//CAS操作:将第8位置为1,表示写锁被占用
next : acquireWrite(false, 0L));//获取失败则调用acquireWrite,加入到等待队列
}
说明:上述代码获取写锁,如果获取失败,则进入阻塞,注意该方法不响应中断,返回非0表示获取成功。
StampedLock中大量运用了位运算,这里(s = state) & ABITS == 0L 表示读锁和写锁都未被使用,这里写锁可以立即获取成功,然后CAS操作更新同步状态值State。
操作完成后,等待队列的结构如下:

注意:StampedLock中,等待队列的结点要比AQS中简单些,仅仅三种状态。
0:初始状态
-1:等待中
1:取消
另外,结点的定义中有个cowait字段,该字段指向一个栈,用于保存读线程,这个后续会讲到。

3. ThreadB调用readLock获取读锁
来看下readLock方法:
由于ThreadA此时持有写锁,所以ThreadB获取读锁失败,将调用acquireRead方法,加入等待队列:
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&//表示写锁未被占用,且读锁数量没用超限
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
说明:上述代码获取读锁,如果写锁被占用,线程会阻塞,注意该方法不响应中断,返回非0表示获取成功。
acquireRead方法非常复杂,用到了大量自旋操作:
我们来分析下这个方法。
该方法会首先自旋的尝试获取读锁,获取成功后,就直接返回;否则,会将当前线程包装成一个读结点,插入到等待队列。
由于,目前等待队列还是空,所以ThreadB会初始化队列,然后将自身包装成一个读结点,插入队尾,然后在下面这个地方跳出自旋:
if (p == null) { // initialize queue,表示等待队列为空,且当前线程未获得读锁,则初始化队列(构造头结点)
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)//将当前线程保证成共享节点
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {//如果等待队列只有一个头结点或当前入队的是写线程,则直接将节点链接到队尾,链接完成后退出自旋
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;//这里退出循环
}
}
此时,等待队列的结构如下:

跳出自旋后,ThreadB会继续向下执行,进入下一个自旋,在下一个自旋中,依然会再次尝试获取读锁,如果这次再获取不到,就会将前驱的等待状态置为WAITING, 表示我(当前线程)要去睡了(阻塞),到时记得叫醒我:
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)//将前驱结点的等待状态置为WAITING,表示之后将唤醒当前结点
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}

最终, ThreadB进入阻塞状态:
else {//阻塞当前线程
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)//限时等待超时,取消等待
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);//如果前驱的等待状态为WAITINF,其写锁被占用,则阻塞当前调用线程
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
最终,等待队列的结构如下:

4. ThreadC调用readLock获取读锁
这个过程和ThreadB获取读锁一样,区别在于ThreadC被包装成结点加入等待队列后,是链接到ThreadB结点的栈指针中的。调用完下面这段代码后,ThreadC会链接到以Thread B为栈顶指针的栈中:
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))//CAS操作队尾结点,p的cowait字段,实际上就是头插法插入节点
node.cowait = null;
说明:上述代码队列不为空,且队尾是读结点,则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈,p是栈顶指针)

注意:读结点的cowait字段其实构成了一个栈,入栈的过程其实是个“头插法”插入单链表的过程。比如,再来个ThreadX读结点,则cowait链表结构为:ThreadB - > ThreadX -> ThreadC。最终唤醒读结点时,将从栈顶开始。
然后会在下一次自旋中,阻塞当前读线程:
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);//写锁被占用,且当前节点不是队首节点,则阻塞当前线程
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
最终,等待队列的结构如下:

可以看到,此时ThreadC结点并没有把它的前驱的等待状态置为-1,因为ThreadC是链接到栈中的,当写锁释放的时候,会从栈底元素开始,唤醒栈中所有读结点。

5. ThreadD调用writeLock获取写锁
ThreadD调用writeLock方法获取写锁失败后(ThreadA依然占用着写锁),会调用acquireWrite方法,该方法整体逻辑和acquireRead差不多,首先自旋的尝试获取写锁,获取成功后,就直接返回;否则,会将当前线程包装成一个写结点,插入到等待队列。
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&//表示读锁和写锁都未被使用
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?//CAS操作:将第8位置位1,表示写锁被占用
next : acquireWrite(false, 0L));//获取失败则调用acquireWrite,加入等待队列
}
说明:上述代码获取写锁,如果失败,则进入阻塞,注意该方法不响应中断,返回非0,表示获取成功
acquireWrite源码:
acquireWrite中的下面这个自旋操作,用于将线程包装成写结点,插入队尾:
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) {//没用任何锁被占用
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))//尝试立即获取写锁
return ns;//获取成功直接返回
}
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
—spins;
}
else if ((p = wtail) == null) { // initialize queue,队列为空,则初始化队列,构造队列的头结点
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(WMODE, p);//将当前线程包装成写节点
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//链接节点至队尾
p.next = node;
break;
}
}
说明:上述代码自旋入队操作,如果没用任何锁被占用,则立即尝试获取写锁,获取成功则返回,如果存在锁被使用,则将当前线程包装成独占节点,并插入等待队列尾部
插入完成后,队列结构如下:

然后,进入下一个自旋,并在下一个自旋中阻塞ThreadD,最终队列结构如下:

6. ThreadE调用readLock获取读锁
同样,由于写锁被ThreadA占用着,所以最终会调用acquireRead方法,在该方法的第一个自旋中,会将ThreadE加入等待队列:

注意,由于队尾结点是写结点,所以当前读结点会直接链接到队尾;如果队尾是读结点,则会链接到队尾读结点的cowait链中。
然后进入第二个自旋,阻塞ThreadE,最终队列结构如下:

7. ThreadA调用unlockWrite释放写锁
通过CAS操作,修改State成功后,会调用release方法唤醒等待队列的队首结点:
//释放写锁
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)//stamp不匹配,或者写锁未被占用,抛出异常
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//正常情况下,stamp+=WBIT后,第8位位0,表示写锁被释放;但是溢出,则置为ORIGIN
if ((h = whead) != null && h.status != 0)
release(h);//唤醒等待队列中的队首节点
}
release方法非常简单,先将头结点的等待状态置为0,表示即将唤醒后继结点,然后立即唤醒队首结点:
//唤醒等待队列的队首节点(即头结点whead的后继节点)
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//将头结点的等待状态从-1置为0,表示将要唤醒后继节点
if ((q = h.next) == null || q.status == CANCELLED) {//从队尾开始查找距离头结点最近的WAITING节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);//唤醒队首节点
}
}
此时,等待队列的结构如下:

8. ThreadB被唤醒后继续向下执行
ThreadB被唤醒后,会从原阻塞处继续向下执行,然后开始下一次自旋:
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)前驱结点的等待状态设置为WAITING,表示之后唤醒当前结点
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);//将
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {//阻塞当前读线程
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)//限时等待超时,取消等待
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park,如果前驱的等待状态为WAITING,且写锁被占用,则阻塞当前调用线程,注意,ThreadB从此处被唤醒,并继续向下执行
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
第二次自旋时,ThreadB发现写锁未被占用,则成功获取到读锁,然后从栈顶(ThreadB的cowait指针指向的结点)开始唤醒栈中所有线程,
最后返回:
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?//判断写锁是否被占用
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) ://写锁未被占用,且读锁数量未超限制,则更新同步状态
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {//写锁未被占用,但读锁数量限制,超出部分放到readerOverflow字段中
WNode c; Thread w;//获取读锁成功,释放cowrite链中的所有读结点
whead = node;
node.prev = null;//释放头节点,当前队首节点成为新的头结点
//从栈顶开始(node.cowait指向的节点),依次唤醒所有读结点,最终node.cowait==null,node成为新的头结点
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && —k <= 0)
break;
}
最终,等待队列的结构如下:

9. ThreadC被唤醒后继续向下执行
ThreadC被唤醒后,继续执行,并进入下一次自旋,下一次自旋时,会成功获取到读锁。
for (;;) {
WNode pp, c; Thread w;
//尝试唤醒头节点whead的cowait中的第一个元素,假如是读锁会通过循环释放cowait链
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);//写锁被释放,且当前节点不是队首节点,则阻塞当前线程
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
注意,此时ThreadB和ThreadC已经拿到了读锁,ThreadD(写线程)和ThreadE(读线程)依然阻塞中,原来ThreadC对应的结点是个孤立结点,会被GC回收。
最终,等待队列的结构如下:

10. ThreadB和ThreadC释放读锁
ThreadB和ThreadC调用unlockRead方法释放读锁,CAS操作State将读锁数量减1:
//释放读锁
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||//stamp不匹配,或没用任何锁被占用,都会抛出异常
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {//读锁数量未超限
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {//读锁数量-1
if (m == RUNIT && (h = whead) != null && h.status != 0)//如果当前读锁数量为1,唤醒等待队列中的队首节点
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)//读锁数量超限,则溢出字段要-1
break;
}
}
注意,当读锁的数量变为0时才会调用release方法,唤醒队首结点:
//唤醒等待队列中的队首节点(即头结点whead的后继节点)
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//将头结点的等待状态从-1置为0,表示将要唤醒后继节点
if ((q = h.next) == null || q.status == CANCELLED) {//从队尾开始查找距离头结点最近的WAITING节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);//唤醒队首节点
}
}
队首结点(ThreadD写结点被唤醒),最终等待队列的结构如下:

11. ThreadD被唤醒后继续向下执行
ThreadD会从原阻塞处继续向下执行,并在下一次自旋中获取到写锁,然后返回:
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && —k <= 0)
break;
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
最终,等待队列的结构如下:

12. ThreadD调用unlockWrite释放写锁
ThreadD释放写锁的过程和步骤7完全相同,会调用unlockWrite唤醒队首结点(ThreadE)。

ThreadE被唤醒后会从原阻塞处继续向下执行,但由于ThreadE是个读结点,所以同时会唤醒cowait栈中的所有读结点,过程和步骤8完全一样。最终,等待队列的结构如下:

至此,全部执行完成。

四、StampedLock总结
StampedLock的等待队列与RRW的CLH队列相比,有以下特点:
1当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的cowait链中,cowait链本质是一个栈;
2当入队一个线程时,如果队尾是写结点,则直接链接到队尾;
3QS类似唤醒线程的规则和A,都是首先唤醒队首结点。区别是StampedLock中,当唤醒的结点是读结点时,会唤醒该读结点的cowait链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。
另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。

无标题 - 图1

Plain Text复制代码

1

  1. class Point {
    2

  2. private double x, y;
    3

  3. private final StampedLock sl = new StampedLock();
    4


  4. 5

  5. void move(double deltaX, double deltaY) {
    6

  6. long stamp = sl.writeLock(); //涉及对共享资源的修改,使用写锁-独占操作
    7

  7. try {
    8

  8. x += deltaX;
    9

  9. y += deltaY;
    10

  10. } finally {
    11

  11. sl.unlockWrite(stamp);
    12

  12. }
    13

  13. }
    14


  14. 15

  15. /**
    16

    • 使用乐观读锁访问共享资源
      17
    • 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
      18
    • 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
      19
  16. *
    20

    • @return
      21
  17. */
    22

  18. double distanceFromOrigin() {
    23

  19. long stamp = sl.tryOptimisticRead(); // 使用乐观读锁
    24

  20. double currentX = x, currentY = y; // 拷贝共享资源到本地方法栈中
    25

  21. if (!sl.validate(stamp)) { // 如果有写锁被占用,可能造成数据不一致,所以要切换到普通读锁模式
    26

  22. stamp = sl.readLock();
    27

  23. try {
    28

  24. currentX = x;
    29

  25. currentY = y;
    30

  26. } finally {
    31

  27. sl.unlockRead(stamp);
    32

  28. }
    33

  29. }
    34

  30. return Math.sqrt(currentX currentX + currentY currentY);
    35

  31. }
    36


  32. 37

  33. void moveIfAtOrigin(double newX, double newY) { // upgrade
    38

  34. // Could instead start with optimistic, not read mode
    39

  35. long stamp = sl.readLock();
    40

  36. try {
    41

  37. while (x == 0.0 && y == 0.0) {
    42

  38. long ws = sl.tryConvertToWriteLock(stamp); //读锁转换为写锁
    43

  39. if (ws != 0L) {
    44

  40. stamp = ws;
    45

  41. x = newX;
    46

  42. y = newY;
    47

  43. break;
    48

  44. } else {
    49

  45. sl.unlockRead(stamp);
    50

  46. stamp = sl.writeLock();
    51

  47. }
    52

  48. }
    53

  49. } finally {
    54

  50. sl.unlock(stamp);
    55

  51. }
    56

  52. }
    57

  53. }

Plain Text复制代码

1

  1. long stamp = lock.tryOptimisticRead(); // 非阻塞获取版本信息
    2

  2. copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
    3

  3. if(!lock.validate(stamp)){ // 校验
    4

  4. long stamp = lock.readLock(); // 获取读锁
    5

  5. try {
    6

  6. copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
    7

  7. } finally {
    8

  8. lock.unlock(stamp); // 释放悲观锁
    9

  9. }
    10


  10. 11

  11. }
    12

  12. useThreadMemoryVarables(); // 使用线程本地堆栈里面的数据进行操作

无标题 - 图2
无标题 - 图3
无标题 - 图4

无标题 - 图5
无标题 - 图6

Plain Text复制代码

1

  1. /**
    2

    • 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程
      3
  2. *
    4

    • @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
      5
    • @param deadline 如果非0, 则表示限时获取
      6
    • @return 非0表示获取成功, INTERRUPTED表示中途被中断过
      7
  3. */
    8

  4. private long acquireRead(boolean interruptible, long deadline) {
    9

  5. WNode node = null, p; // node指向入队结点, p指向入队前的队尾结点
    10


  6. 11

  7. /**
    12

    • 自旋入队操作
      13
    • 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.
      14
    • 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)
      15
  8. */
    16

  9. for (int spins = -1; ; ) {
    17

  10. WNode h;
    18

  11. if ((h = whead) == (p = wtail)) { // 如果队列为空或只有头结点, 则会立即尝试获取读锁
    19

  12. for (long m, s, ns; ; ) {
    20

  13. if ((m = (s = state) & ABITS) < RFULL ? // 判断写锁是否被占用
    21

  14. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //写锁未占用,且读锁数量未超限, 则更新同步状态
    22

  15. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
    23

  16. return ns; // 获取成功后, 直接返回
    24

  17. else if (m >= WBIT) { // 写锁被占用,以随机方式探测是否要退出自旋
    25

  18. if (spins > 0) {
    26

  19. if (LockSupport.nextSecondarySeed() >= 0)
    27

  20. —spins;
    28

  21. } else {
    29

  22. if (spins == 0) {
    30

  23. WNode nh = whead, np = wtail;
    31

  24. if ((nh == h && np == p) || (h = nh) != (p = np))
    32

  25. break;
    33

  26. }
    34

  27. spins = SPINS;
    35

  28. }
    36

  29. }
    37

  30. }
    38

  31. }
    39

  32. if (p == null) { // p == null表示队列为空, 则初始化队列(构造头结点)
    40

  33. WNode hd = new WNode(WMODE, null);
    41

  34. if (U.compareAndSwapObject(this, WHEAD, null, hd))
    42

  35. wtail = hd;
    43

  36. } else if (node == null) { // 将当前线程包装成读结点
    44

  37. node = new WNode(RMODE, p);
    45

  38. } else if (h == p || p.mode != RMODE) { // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
    46

  39. if (node.prev != p)
    47

  40. node.prev = p;
    48

  41. else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
    49

  42. p.next = node;
    50

  43. break;
    51

  44. }
    52

  45. }
    53

  46. // 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )
    54

  47. else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) { // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点
    55

  48. node.cowait = null;
    56

  49. } else {
    57

  50. for (; ; ) {
    58

  51. WNode pp, c;
    59

  52. Thread w;
    60

  53. // 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链
    61

  54. if ((h = whead) != null && (c = h.cowait) != null &&
    62

  55. U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
    63

  56. (w = c.thread) != null) // help release
    64

  57. U.unpark(w);
    65

  58. if (h == (pp = p.prev) || h == p || pp == null) {
    66

  59. long m, s, ns;
    67

  60. do {
    68

  61. if ((m = (s = state) & ABITS) < RFULL ?
    69

  62. U.compareAndSwapLong(this, STATE, s,
    70

  63. ns = s + RUNIT) :
    71

  64. (m < WBIT &&
    72

  65. (ns = tryIncReaderOverflow(s)) != 0L))
    73

  66. return ns;
    74

  67. } while (m < WBIT);
    75

  68. }
    76

  69. if (whead == h && p.prev == pp) {
    77

  70. long time;
    78

  71. if (pp == null || h == p || p.status > 0) {
    79

  72. node = null; // throw away
    80

  73. break;
    81

  74. }
    82

  75. if (deadline == 0L)
    83

  76. time = 0L;
    84

  77. else if ((time = deadline - System.nanoTime()) <= 0L)
    85

  78. return cancelWaiter(node, p, false);
    86

  79. Thread wt = Thread.currentThread();
    87

  80. U.putObject(wt, PARKBLOCKER, this);
    88

  81. node.thread = wt;
    89

  82. if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
    90

  83. // 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程
    91

  84. U.park(false, time);
    92

  85. }
    93

  86. node.thread = null;
    94

  87. U.putObject(wt, PARKBLOCKER, null);
    95

  88. if (interruptible && Thread.interrupted())
    96

  89. return cancelWaiter(node, p, true);
    97

  90. }
    98

  91. }
    99

  92. }
    100

  93. }
    101


  94. 102

  95. for (int spins = -1; ; ) {
    103

  96. WNode h, np, pp;
    104

  97. int ps;
    105

  98. if ((h = whead) == p) { // 如果当前线程是队首结点, 则尝试获取读锁
    106

  99. if (spins < 0)
    107

  100. spins = HEAD_SPINS;
    108

  101. else if (spins < MAX_HEAD_SPINS)
    109

  102. spins <<= 1;
    110

  103. for (int k = spins; ; ) { // spin at head
    111

  104. long m, s, ns;
    112

  105. if ((m = (s = state) & ABITS) < RFULL ? // 判断写锁是否被占用
    113

  106. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //写锁未占用,且读锁数量未超限, 则更新同步状态
    114

  107. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
    115

  108. // 获取读锁成功, 释放cowait链中的所有读结点

无标题 - 图7

无标题 - 图8

无标题 - 图9

无标题 - 图10

无标题 - 图11

Plain Text复制代码

1

  1. /**
    2

    • 尝试自旋的获取写锁, 获取不到则阻塞线程
      3
  2. *
    4

    • @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
      5
    • @param deadline 如果非0, 则表示限时获取
      6
    • @return 非0表示获取成功, INTERRUPTED表示中途被中断过
      7
  3. */
    8

  4. private long acquireWrite(boolean interruptible, long deadline) {
    9

  5. WNode node = null, p;
    10


  6. 11

  7. /**
    12

    • 自旋入队操作
      13
    • 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
      14
    • 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
      15
  8. */
    16

  9. for (int spins = -1; ; ) {
    17

  10. long m, s, ns;
    18

  11. if ((m = (s = state) & ABITS) == 0L) { // 没有任何锁被占用
    19

  12. if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) // 尝试立即获取写锁
    20

  13. return ns; // 获取成功直接返回
    21

  14. } else if (spins < 0)
    22

  15. spins = (m == WBIT && wtail == whead) ? SPINS : 0;
    23

  16. else if (spins > 0) {
    24

  17. if (LockSupport.nextSecondarySeed() >= 0)
    25

  18. —spins;
    26

  19. } else if ((p = wtail) == null) { // 队列为空, 则初始化队列, 构造队列的头结点
    27

  20. WNode hd = new WNode(WMODE, null);
    28

  21. if (U.compareAndSwapObject(this, WHEAD, null, hd))
    29

  22. wtail = hd;
    30

  23. } else if (node == null) // 将当前线程包装成写结点
    31

  24. node = new WNode(WMODE, p);
    32

  25. else if (node.prev != p)
    33

  26. node.prev = p;
    34

  27. else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 链接结点至队尾
    35

  28. p.next = node;
    36

  29. break;
    37

  30. }
    38

  31. }
    39


  32. 40

  33. for (int spins = -1; ; ) {
    41

  34. WNode h, np, pp;
    42

  35. int ps;
    43

  36. if ((h = whead) == p) { // 如果当前结点是队首结点, 则立即尝试获取写锁
    44

  37. if (spins < 0)
    45

  38. spins = HEAD_SPINS;
    46

  39. else if (spins < MAX_HEAD_SPINS)
    47

  40. spins <<= 1;
    48

  41. for (int k = spins; ; ) { // spin at head
    49

  42. long s, ns;
    50

  43. if (((s = state) & ABITS) == 0L) { // 写锁未被占用
    51

  44. if (U.compareAndSwapLong(this, STATE, s,
    52

  45. ns = s + WBIT)) { // CAS修改State: 占用写锁
    53

  46. // 将队首结点从队列移除
    54

  47. whead = node;
    55

  48. node.prev = null;
    56

  49. return ns;
    57

  50. }
    58

  51. } else if (LockSupport.nextSecondarySeed() >= 0 &&
    59

  52. —k <= 0)
    60

  53. break;
    61

  54. }
    62

  55. } else if (h != null) { // 唤醒头结点的栈中的所有读线程
    63

  56. WNode c;
    64

  57. Thread w;
    65

  58. while ((c = h.cowait) != null) {
    66

  59. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
    67

  60. U.unpark(w);
    68

  61. }
    69

  62. }
    70

  63. if (whead == h) {
    71

  64. if ((np = node.prev) != p) {
    72

  65. if (np != null)
    73

  66. (p = np).next = node; // stale
    74

  67. } else if ((ps = p.status) == 0) // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
    75

  68. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
    76

  69. else if (ps == CANCELLED) {
    77

  70. if ((pp = p.prev) != null) {
    78

  71. node.prev = pp;
    79

  72. pp.next = node;
    80

  73. }
    81

  74. } else { // 阻塞当前调用线程
    82

  75. long time; // 0 argument to park means no timeout
    83

  76. if (deadline == 0L)
    84

  77. time = 0L;
    85

  78. else if ((time = deadline - System.nanoTime()) <= 0L)
    86

  79. return cancelWaiter(node, node, false);
    87

  80. Thread wt = Thread.currentThread();
    88

  81. U.putObject(wt, PARKBLOCKER, this);
    89

  82. node.thread = wt;
    90

  83. if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
    91

  84. U.park(false, time); // emulate LockSupport.park
    92

  85. node.thread = null;
    93

  86. U.putObject(wt, PARKBLOCKER, null);
    94

  87. if (interruptible && Thread.interrupted())
    95

  88. return cancelWaiter(node, node, true);
    96

  89. }
    97

  90. }
    98

  91. }
    99

  92. }

无标题 - 图12

无标题 - 图13

无标题 - 图14

无标题 - 图15

无标题 - 图16

无标题 - 图17

无标题 - 图18

无标题 - 图19

无标题 - 图20

无标题 - 图21

无标题 - 图22