5.1 Lock接口

在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的。Java SE5之后,并发包中新增了Lock接口用来实现锁功能,提供了与synchronized关键字类似的同步功能,只是在使用时需要显式的获取和释放锁。虽然它缺少了隐式获得释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
LOCK的使用方式

  1. Lock lock = new ReentrantLock();
  2. lock.lock();
  3. try{
  4. } finally {
  5. lock.close();
  6. }

不要将获取锁的过程写在try块中,如果获取锁时发生异常,异常抛出的同时,也会导致锁无故释放

LOCK接口提供的synchronized关键字不具备的主要特性

特性 描述
尝试非阻塞地获取锁 当前线程尝试获取锁,如果则一时刻锁没有被其他线程获取到,则成功获取并持有锁
能被中断地获取锁 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回

LOCK的API

方法名称 描述
void lock() 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回
void lockInterruptibly() throws InterruptedException 可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程
boolean tryLock() 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false
boolean tryLock( long time, TimeUnit unit) throws InterruptedException 超时的获取锁,当前线程在以下3中情况下会返回:
1. 当前线程在超时时间内获得了锁
1. 当前线程在超时时间内被中断
1. 超时时间结束,返回false
void unlock() 释放锁
Condition newCondition() 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁

5.2 队列同步器

队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法(getState()、 setState(int newState)和compareAndSetState(int expect, int update))来进行操作,因为它们能够保证状态的改变是安全的。

同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。

锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节; 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作

5.2.1 队列同步器的接口与示例

使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模版方法,而这些模版方法将会调用使用者重写的方法。
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态:

  • getState(): 获取当前同步状态
  • setState(int newState): 设置当前同步状态
  • compareAndSetState(int expect, int update): 使用CAS设置当前状态,该方法能够保证状态设置的原子性

5.2.2 队列同步器的实现分析

1. 同步队列

同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
image.png
为了保证加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法: compareAndSetTail(Node expect, Noed update),它需要传递当前线程“认为”的尾节点和当前结点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
image.png
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
image.png
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头结点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

2. 独占式同步状态获取与释放

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列,后续对线程进行中断操作,线程不会从同步队列中移出。

  1. public final void acquire(int arg){
  2. if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  3. selfInterrupt();
  4. }

主要逻辑:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以死循环的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

  1. private Node addWaiter(Node node){
  2. Node node = new Node(Thread.currentThread(), mode);
  3. Node pred = tail;
  4. if ( pred != null) {
  5. node.prev = pred;
  6. if(compareAndSetTail(pred,node)){
  7. pred.next = node;
  8. return node;
  9. }
  10. }
  11. enq(node);
  12. return node;
  13. }
  14. private Node enq(final Node node){
  15. for(;;){
  16. Node t = tail;
  17. if(t == null) {
  18. if( compareAndSetHead(new Node()))
  19. tail = head;
  20. } else {
  21. node.prev = t;
  22. if ( compareAndSetTail(t, node)){
  23. t.next = node;
  24. return t;
  25. }
  26. }
  27. }

通过使用compareAndSetTail(Node expect, Node update)方法来确保节点能够被线程安全添加。
在enq方法中,同步器通过死循环来保证节点的正确添加,在死循环中只要通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。
节点进入同步队列,就进入了一个自旋的过程,每个节点都在自省的观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)

  1. final boolean acquireQueued(final Node node,int arg){
  2. boolean failed = true;
  3. try{
  4. boolean interrupted = false;
  5. for(;;){
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)){
  8. setHead(node);
  9. p.next = null;
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldPardAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  14. interrupted = true;
  15. }
  16. } finally {
  17. if ( failed)
  18. cancelAcquire(node);
  19. }
  20. }

在acquireQueued(final Node node,int arg)中,当前线程在“死循环”中获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态:

  1. 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点
  2. 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如图:

image.png
节点和节点之间在循环检查的过程中基本不互相通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)

独占式同步状态获取流程:
image.png
总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

3. 共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. if (r >= 0) {
  11. setHeadAndPropagate(node, r);
  12. p.next = null; // help GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }
  28. public final void acquireShared(int arg) {
  29. if (tryAcquireShared(arg) < 0)
  30. doAcquireShared(arg);
  31. }

在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0. 在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。

4.独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。

超时获取同步状态过程可以被视作响应中断获取同步状态过程的增强版,doAcquireNanos(int arg, long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeout计算公式为:nanosTimeout = now - lastTime,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,反之,表示已经超时.

  1. private boolean doAcquireNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (nanosTimeout <= 0L)
  4. return false;
  5. final long deadline = System.nanoTime() + nanosTimeout;
  6. final Node node = addWaiter(Node.EXCLUSIVE);
  7. boolean failed = true;
  8. try {
  9. for (;;) {
  10. final Node p = node.predecessor();
  11. if (p == head && tryAcquire(arg)) {
  12. setHead(node);
  13. p.next = null; // help GC
  14. failed = false;
  15. return true;
  16. }
  17. nanosTimeout = deadline - System.nanoTime();
  18. if (nanosTimeout <= 0L)
  19. return false;
  20. if (shouldParkAfterFailedAcquire(p, node) &&
  21. nanosTimeout > spinForTimeoutThreshold)
  22. LockSupport.parkNanos(this, nanosTimeout);
  23. if (Thread.interrupted())
  24. throw new InterruptedException();
  25. }
  26. } finally {
  27. if (failed)
  28. cancelAcquire(node);
  29. }
  30. }