一、锁的可重入性

因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。“可重入锁”是指当一个线程调用object.lock()拿到锁,进入互斥区后,再次调用object.lock(),仍然可以拿到该锁。很显然,通常的锁都要设计成可重入的,否则就会发生死锁。
synchronized关键字,同样是可重入锁

二、类的继承层次

2.1、接口Lock

  1. public interface Lock {
  2. // 尝试去获得锁
  3. // 如果锁不可用,当前线程会变得不可用,直到获得锁为止。(中途会忽略中断)
  4. void lock();
  5. // 尝试去获取锁,如果锁获取不到,线程将不可用
  6. // 直到获取锁,或者被其他线程中断
  7. // 线程在获取锁操作中,被其他线程中断,则会抛出InterruptedException异常,并且将中断标识清除。
  8. void lockInterruptibly() throws InterruptedException;
  9. // 锁空闲时返回true,锁不空闲是返回false
  10. // 该方法不会引起当前线程阻塞
  11. boolean tryLock();
  12. // 在unit时间内成功获取锁,返回true
  13. // 在unit时间内未成功获取锁,返回false
  14. // 如果当前线程在获取锁操作中,被其他线程中断,则会抛出InterruptedException异常,并且将中断标识清除。
  15. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  16. // 释放锁
  17. void unlock();
  18. // 获取一个绑定到当前Lock对象的Condition对象
  19. // 获取Condition对象的前提是当前线程持有Lock对象
  20. Condition newCondition();
  21. }
  22. }

image.png

2.2、lock()和lockInterruptibly()方法的区别

lock()方法类似于使用synchronized关键字加锁,如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
lockInterruptibly()方法顾名思义,就是如果锁不可用,那么当前正在等待的线程是可以被中断的,这比synchronized关键字更加灵活。

2.3、实现ReentrantLock

public class ReentrantLock implements Lock, java.io.Serializable {
     private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
            ...
                 abstract void lock();
    }
    ...
    public void lock() {
        sync.lock();
    }

    public void unlock() {
        sync.release(1);
    }

    //默认非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     *公平锁与非公平锁
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

}

三、锁的公平性与非公平性

Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。
公平锁:是指线程在抢占锁失败后会进入一个等待队列,先进入队列的线程会先获得锁。公平性体现在先来先得。
非公平锁:是指线程抢占锁失败后会进入一个等待队列,但是这些等待线程谁能先获得锁不是按照先来先得的规则,而是随机的。不公平性体现在后来的线程可能先得到锁。

如果有很多线程竞争一把公平锁,系统的总体吞吐量(即速度很慢,常常极其慢)比较低,因为此时在线程调度上面的开销比较大。
原因是采用公平策略时,当一个线程释放锁时,需要先将等待队列中的线程唤醒。这个唤醒的调度过程是比较耗费时间的。如果使用非公平锁的话,当一个线程释放锁之后,可用的线程能立马获得锁,效率较高。默认设置的是非公平锁,其实是为了提高效率,减少线程切换。

四、锁实现的基本原理

4.1、实现锁的基本要素

1、需要一个state变量,标记该锁的状态

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

     /**
     * The synchronization state.
     */
    private volatile int state;//记录锁的状态,通过CAS修改状态的值
}

state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。
例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。
当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
当state>1时,说明该线程重入了该锁。

2、需要记录当前是哪个线程持有锁。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;//记录锁被那个线程持有
}

3、需要底层支持对一个线程进行阻塞或唤醒操作。

在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。
有一个LockSupport的工具类,对这一对原语做了简单封装:
在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread t),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。尤其是unpark(Thread t),它实现了一个线程对另外一个线程的“精准唤醒”。前面讲到的wait()/notify(),notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。

4、需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS。

在AQS中利用双向链表和CAS实现了一个阻塞队列。如下所示。
阻塞队列是整个AQS核心中的核心,下面做进一步的阐述。如图3-2所示,head指向双向链表头部,tail指向双向链表尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把head向后移一个位置。
初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。

4.2、公平锁和非公平锁的实现差异

1、ReentrantLock在公平性和非公平性上的实现差异。

  • 非公平锁NonfairSync

      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = 7316153563782823691L;
    
          /**
           * Performs lock.  Try immediate barge, backing up to normal
           * acquire on failure.
           */
          final void lock() {
              if (compareAndSetState(0, 1))//非公平锁,一上来就争抢锁
                  setExclusiveOwnerThread(Thread.currentThread());
              else
                  acquire(1);
          }
    
          protected final boolean tryAcquire(int acquires) {
              return nonfairTryAcquire(acquires);
          }
      }
    
  • 公平锁FairSync

      static final class FairSync extends Sync {
          private static final long serialVersionUID = -3000897897090466540L;
    
          final void lock() {//没有一上来就争抢锁
              acquire(1);  
          }
    
          /**
           * Fair version of tryAcquire.  Don't grant access unless
           * recursive call or no waiters or is first.
           */
          protected final boolean tryAcquire(int acquires) {
              final Thread current = Thread.currentThread();
              int c = getState();
              if (c == 0) {
                  if (!hasQueuedPredecessors() &&
                      compareAndSetState(0, acquires)) {
                      setExclusiveOwnerThread(current);
                      return true;
                  }
              }
              else if (current == getExclusiveOwnerThread()) {
                  int nextc = c + acquires;
                  if (nextc < 0)
                      throw new Error("Maximum lock count exceeded");
                  setState(nextc);
                  return true;
              }
              return false;
          }
      }
    

    2、acquire()方法

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
    

    acquire()是AQS的一个模板方法。

    3、tryAcquire(..)虚函数

    tryAcquire(..)是一个虚函数,也就是再次尝试拿锁,被NonfairSync与FairSync分别实现。

  • 非公平锁实现tryAcquire

      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = 7316153563782823691L;
    
          /**
           * Performs lock.  Try immediate barge, backing up to normal
           * acquire on failure.
           */
          final void lock() {
              if (compareAndSetState(0, 1))
                  setExclusiveOwnerThread(Thread.currentThread());
              else
                  acquire(1);
          }
    
          protected final boolean tryAcquire(int acquires) {
              return nonfairTryAcquire(acquires);
          }
      }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        ....
    }
  • 公平锁实现tryAcquire

      static final class FairSync extends Sync {
          private static final long serialVersionUID = -3000897897090466540L;
    
          final void lock() {
              acquire(1);
          }
    
          /**
           * Fair version of tryAcquire.  Don't grant access unless
           * recursive call or no waiters or is first.
           */
          protected final boolean tryAcquire(int acquires) {
              final Thread current = Thread.currentThread();
              int c = getState();
              if (c == 0) {
                  if (!hasQueuedPredecessors() &&
                      compareAndSetState(0, acquires)) {
                      setExclusiveOwnerThread(current);
                      return true;
                  }
              }
              else if (current == getExclusiveOwnerThread()) {
                  int nextc = c + acquires;
                  if (nextc < 0)
                      throw new Error("Maximum lock count exceeded");
                  setState(nextc);
                  return true;
              }
              return false;
          }
      }
    

这两段代码非常相似,唯一的区别是公平锁实现的代码多了一个if (!hasQueuedPredecessors())。什么意思呢?就是只有当c==0(没有线程持有锁),并且排在队列的第1个时(即当队列中没有其他线程的时候),才去抢锁,否则继续排队,这才叫“公平”!

4、acquireQueued(..)函数

目的是把线程放入阻塞队列,然后阻塞该线程。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • addWaiter函数

      private Node addWaiter(Node mode) {
          Node node = new Node(Thread.currentThread(), mode);
          // Try the fast path of enq; backup to full enq on failure
          Node pred = tail;
          if (pred != null) {
              node.prev = pred;
              if (compareAndSetTail(pred, node)) {//尝试加入到队列尾部,如果不成功则执行下面的enq方法
                  pred.next = node;
                  return node;
              }
          }
          enq(node);//enq内部会进行队列的初始化,建立一个新的空Node,然后不断尝试自选,直到把该Node加入队列尾部。
          return node;
      }
    

    就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。

  • addWaiter(..)

在addWaiter(..)函数把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(..)函数完成。线程一旦进入acquireQueued(..)就会被无限期阻塞,即使有其他线程调用interrupt()函数也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(..)返回。

注意:进入acquireQueued(..),该线程被阻塞。在该函数返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//被唤醒,如果自己在队列头部(自己的前一个节点是head指向的空节点),则尝试拿锁
                    setHead(node);//拿锁成功,出队列(即head指针前移一个节点),同时会把node的thread变量置为null,所以head还是指向一个空节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//自己调用park阻塞自己
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先,acquireQueued(..)函数有一个返回值,表示什么意思呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。基于这个返回值,才有了下面的代码:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

当acquireQueued(..)返回true 时,会调用selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。
阻塞就发生在下面这个函数中:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

线程调用park()函数,自己把自己阻塞起来,直到被其他线程唤醒,该函数返回。park()函数返回有两种情况。
情况1:其他线程调用了unpark(Thread t)。
情况2:其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。
也正因为LockSupport.park()可能被中断唤醒,acquireQueued(..)函数才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。
被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。

4.3、unlock()实现分析

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

release()里面做了两件事:tryRelease(..)函数释放锁;unparkSuccessor(..)函数唤醒队列中的后继者。
在上面的代码中有一个关键点要说明:因为是排他锁,只有已经持有锁的线程才有资格调用release(..),这意味着没有其他线程与它争抢。所以,在上面的tryRelease(..)函数中,对state值的修改,不需要CAS操作,直接减1即可。
但对于读写锁中的读锁,也就是releaseShared(..),就不一样了,见后续分析。

4.4、lockInterruptibly()实现分析

lock 不能被中断,lockInterruptibly()可以被中断,

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

acquireInterruptibly(..)也是AQS 的模板方法,里面的tryAcquire(..)分别被FairSync和NonfairSync实现,主要注意doAcquireInterruptibly(..)函数。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个函数返回。

4.5、tryLock()实现分析

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

tryLock()实现基于调用非公平锁的tryAcquire(..),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。

五、syncchronized和lock的区别

  • syncchronized是一个内置的关键字,lock是一个java类
  • syncchronized无法判断获取锁的状态,lock可以判断是否获取到了锁
  • syncchronized会自动释放锁,lock必须要手动释放锁,如果不释放,会造成死锁
  • syncchronized中如果线程1获取锁,其他线程会等待,lock就不一定会等待下去。
  • syncchronized可重入锁,不可以中断,非公平。lock,可重入锁,可以中断,也可以调整公平或者非公平。