一、为什么引入StampedLock

读与读 读与写 写与写
ReentrantLock 互斥 互斥 互斥
ReentrantReadWriteLock 不互斥 互斥 互斥
StampedLock 不互斥 不互斥 互斥

从ReentrantLock到StampedLock,并发度依次提高。MySQL 高并发的核心机制MVCC,也就是一份数据多个版本,此处的StampedLock有异曲同工之妙。

另一方面,因为ReentrantLock采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。

二、StampedLock的使用

  1. public class Point {
  2. private double x,y;
  3. private final StampedLock s1=new StampedLock();
  4. void move(double x,double y){ //多个线程调用该函数,修改x和y的值
  5. long stamp = s1.writeLock();
  6. try{
  7. this.x=x;
  8. this.y=y;
  9. }catch (Exception e){
  10. }finally {
  11. s1.unlockWrite(stamp);
  12. }
  13. }
  14. double distanceFromOrigin(){ //多个线程调用该函数求距离
  15. long stamp = s1.tryOptimisticRead(); //使用乐观读
  16. double currentX=x; double currentY=y; //将共享变量拷贝到线程栈
  17. if(!s1.validate(stamp)){ //拷贝期间有其他线程修改该数据,读的是脏数据
  18. stamp =s1.readLock(); //升级为悲观读
  19. try{
  20. currentX=x;
  21. currentY=y;
  22. }catch (Exception e){
  23. }finally {
  24. s1.unlockRead(stamp);
  25. }
  26. }
  27. return Math.sqrt(currentX*currentX+currentY*currentY);
  28. }
  29. }

如上面代码所示,有一个Point类,多个线程调用move()函数,修改坐标;还有多个线程调用distanceFromOrigin()函数,求距离。
首先,执行move操作的时候,要加写锁。这个用法和ReadWriteLock的用法没有区别,写操作和写操作也是互斥的。关键在于读的时候,用了一个“乐观读”sl.tryOptimisticRead(),相当于在读之前给数据的状态做了一个“快照”。然后,把数据拷贝到内存里面,在用之前,再比对一次版本号。如果版本号变了,则说明在读的期间有其他线程修改了数据。读出来的数据废弃,重新获取读锁。
这三行关键代码对顺序非常敏感,不能有重排序。因为state 变量已经是volatile,所以可以禁止重排序,但stamp并不是volatile的。为此,在validate(stamp)函数里面插入内存屏障。

三、“乐观读”的实现原理

3.1、state的重新定义,增加了version版本号

首先,StampedLock是一个读写锁,因此也会像读写锁那样,把一个state变量分成两半,分别表示读锁和写锁的状态。同时,它还需要一个数据的version。一次CAS没有办法操作两个变量,所以这个state变量本身同时也表示了数据的version。

public class StampedLock implements java.io.Serializable {
        /** The number of bits to use for reader count before overflowing */
    private static final int LG_READERS = 7;

    // Values for lock state and stamp operations
    private static final long RUNIT = 1L;
    private static final long WBIT  = 1L << LG_READERS;//第八位表示写锁
    private static final long RBITS = WBIT - 1L;//最低的7位表示读锁的状态。因为写锁只有一个bit位,所以写锁是不可重入的。
    private static final long RFULL = RBITS - 1L;
    private static final long ABITS = RBITS | WBIT;
    private static final long SBITS = ~RBITS; // note overlap with ABITS

    // Initial value for lock state; avoid failure value zero
    private static final long ORIGIN = WBIT << 1;
    /** Lock sequence/state */
    private transient volatile long state;
    /** extra reader count when state read count saturated */

}

用最低的8位表示读和写的状态,其中第8位表示写锁的状态,最低的7位表示读锁的状态。因为写锁只有一个bit位,所以写锁是不可重入的。

3.2、state初始化

    //初始化
    public StampedLock() {
        state = ORIGIN;
    }

    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

    public boolean validate(long stamp) {
        U.loadFence();
        return (stamp & SBITS) == (state & SBITS);
    }

初始值不为0,而是把WBIT 向左移动了一位,也就是上面的ORIGIN 常量
什么state的初始值不设为0呢?这就要从乐观锁的实现说起。
**
上面两个函数必须结合起来看:当state&WBIT!=0的时候,说明有线程持有写锁,上面的tryOptimisticRead会永远返回0。这样,再调用validate(stamp),也就是validate(0)也会永远返回false。
这正是我们想要的逻辑:当有线程持有写锁的时候,validate永远返回false,无论写线程是否释放了写锁。因为无论是否释放了(state回到初始值)写锁,state值都不为0,所以validate(0)永远为false。
为什么上面的validate(..)函数不直接比较stamp=state,而要比较state&SBITS=state&SBITS 呢?因为读锁和读锁是不互斥的!所以,即使在“乐观读”的时候,state 值被修改了,但如果它改的是第7位,validate(..)还是会返回true。
另外要说明的一点是,上面使用了内存屏障U.loadFence(),是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile的,由此可以禁止其和前面的currentX=X,currentY=Y进行重排序。
通过上面的分析,可以发现state的设计非常巧妙。只通过一个变量,既实现了读锁、写锁的状态记录,还实现了数据的版本号的记录。

四、悲观读/写:“阻塞”与“自旋”策略实现差异

4.1、阻塞队列

    static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // list of linked readers
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }

    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;

这个阻塞队列和AQS 里面的很像。刚开始的时候,whead=wtail=NULL,然后初始化,建一个空节点,whead和wtail都指向 这个空节点,之后往里面加入一个个读线程或写线程节点。但基于这个阻塞队列实现的锁的调度策略和AQS很不一样,也就是“自旋”。在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列,并且进入阻塞状态。但在StampedLock中,CAS state失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。为此,根据CPU的核数,定义了自旋次数的常量值。如果是单核的CPU,肯定不能自旋,在多核情况下,才采用自旋策略。

    /** Number of processors, for spin control */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

4.2、加锁

下面以写锁的加锁,也就是StampedLock的writeLock()函数为例,来看一下自旋的实现。

    public long writeLock() {
        long s, next;  // bypass acquireWrite in fully unlocked case only
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : acquireWrite(false, 0L));
    }

如上面代码所示,当state&ABITS==0的时候,说明既没有线程持有读锁,也没有线程持有写锁,此时当前线程才有资格通过CAS操作state。若操作不成功,则调用acquireWrite()函数进入阻塞队列,并进行自旋,这个函数是整个加锁操作的核心,代码如下。

    private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    return ns;
            }
            else if (spins < 0)
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if ((p = wtail) == null) { // initialize queue
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(WMODE, p);
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    if (((s = state) & ABITS) == 0L) {
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0)
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

1、acquireWrite(..)方法

整个acquireWrite(..)函数是两个大的for循环,内部实现了非常复杂的自旋策略。

  • 第一个for循环

在第一个大的for循环里面,目的就是把该Node加入队列的尾部,一边加入,一边通过CAS操作尝试获得锁。如果获得了,整个函数就会返回;如果不能获得锁,会一直自旋,直到加入队列尾部。

  • 第二个for循环

在第二个大的for循环里,也就是该Node已经在队列尾部了。这个时候,如果发现自己刚好也在队列头部,说明队列中除了空的Head节点,就是当前线程了。此时,再进行新一轮的自旋,直到达到MAX_HEAD_SPINS次数,然后进入阻塞。这里有一个关键点要说明:当release(..)函数被调用之后,会唤醒队列头部的第1个元素,此时会执行第二个大的for循环里面的逻辑,也就是接着for循环里面park()函数后面的代码往下执行。

  • cowait指针

另外一个不同于AQS的阻塞队列的地方是,在每个WNode里面有一个cowait指针,用于串联起所有的读线程。例如,队列尾部阻塞的是一个读线程1,现在又来了读线程2、3,那么会通过cowait指针,把1、2、3串联起来。1被唤醒之后,2、3也随之一起被唤醒,因为读和读之间不互斥。

4.3、释放锁

明白加锁的自旋策略后,下面来看锁的释放操作。和读写锁的实现类似,也是做了两件事情:一是把state变量置回原位,二是唤醒阻塞队列中的第一个节点。节点被唤醒之后,会继续执行上面的第二个大的for循环,自旋拿锁。如果成功拿到,则出队列;如果拿不到,则再次进入阻塞,等待下一次被唤醒。

    public void unlockWrite(long stamp) {
        WNode h;
        if (state != stamp || (stamp & WBIT) == 0L)
            throw new IllegalMonitorStateException();
        state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
        if ((h = whead) != null && h.status != 0)
            release(h);
    }

    private void release(WNode h) {
        if (h != null) {
            WNode q; Thread w;
            U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
            if ((q = h.next) == null || q.status == CANCELLED) {
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            if (q != null && (w = q.thread) != null)
                U.unpark(w);
        }
    }