1 简介

  • JUC包简介

    • JUC全称java.util.concurrent,是JDK提供的并发工具包
    • JUC包由Doug Lea大师编写,提供了大量实用,高性能的工具类
  • JUC包结构图

image.png

  • JUC包下包含了阻塞队列、线程池、并发容器等,另外还包含了两个子包atomic和locks
  • JUC包的整体实现图

image.png

  • JUC包中类的实现主要是依赖于CAS和volatile关键字

2 locks包

1 Lock接口

  • Lock接口概述
    • 锁是用来控制多个线程访问共享资源的方式,一个锁能够防止多个线程同时访问共享资源
    • 在Lock接口出现之前,Java程序主要是靠synchronized关键字实现锁功能的

而JDK5之后JUC包中增加了Lock接口,Lock接口提供了与synchronized一样的锁功能

  • Lock接口是JUC包提供的一种全新的互斥同步手段
  • Lock接口的实现类通过CAS操作volatile关键字来保证线程安全
  • 通常使用Lock的方式如下

    1. Lock lock = new ReentrantLock(); //ReentrantLock是Lock的实现类
    2. lock.lock();
    3. try {
    4. .......
    5. } finally {
    6. lock.unlock();
    7. }
    • synchronized同步块执行完成或者遇到异常时锁会自动释放,而Lock必须调用**unlock()**方法释放锁

因此为了避免同步代码出现异常影响解锁,需要在finally块中释放锁

  • 虽然Lock接口失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性
  • Lock接口API

Lock接口定义了6个方法

  1. //获取锁
  2. void lock();
  3. //获取锁的过程能够响应中断
  4. void lockInterruptibly() throws InterruptedException;
  5. //非阻塞式响应中断能立即返回,获取锁返回true反之返回fasle
  6. boolean tryLock();
  7. //超时获取锁,在超时内或者未中断的情况下能够获取锁
  8. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  9. //释放锁
  10. unlock();
  11. //创建一个与Lock对象绑定的条件对象,当前线程必须获得了锁才能在条件对象上等待
  12. //线程等待时会先释放锁,当再次获取锁时才能从等待中返回
  13. Condition newCondition();
  • Lock的实现
    • ReentrantLock是常用的Lock接口实现类
    • ReentrantLock的源码并没有多少代码,并且ReentrantLock基本所有方法的实现都是调用了其静态内部类Sync(同步器)中的方法,而Sync类继承自抽象类AbstractQueuedSynchronizer(AQS)
    • 因此要想理解Lock的实现原理关键是要理解AQS

2 AQS同步器

1 简介

  • AQS概述
    • AQS全称AbstractQueuedSynchronizer(抽象阻塞同步器),是位于locks包下的抽象类,是一种通用的同步器机制,JUC中的同步器都继承自该类
    • 同步器是用来构建阻塞式锁和其他同步组件的基础框架

在AQS同步器出现之前,程序员会自己通过一种同步器去实现另一种相近的同步器,这显然不够优雅,因此在JDK5引入AQS同步器

  • AQS同步器自身没有实现任何同步接口(如Lock),它仅仅是定义了锁状态的若干获取和释放方法来供自定义同步组件的使用
  • AQS同步器既支持独占式获取锁状态,也可以支持共享式获取锁状态(独占模式是只有一个线程能够获取锁,而共享模式可以允许多个线程获取锁),这样就可以方便的实现不同类型的同步组件
  • 常用的Lock接口实现类ReentrantLock就是基于AQS的
  • AQS同步器与锁的关系
    • 同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义
    • 锁面向使用者,其定义了使用者与锁交互的接口,隐藏了实现细节

同步器是面向锁的实现者,它简化了锁的实现方式、屏蔽了锁状态的管理、线程的排队、等待和唤醒等底层操作。

  • 同步组件通过调用同步器提供的方法可以很方便地实现需要的功能

锁和同步器很好的隔离了使用者和实现者所需关注的领域

  • AQS的实现特点
    • AQS的核心是锁的状态,用int类型属性state来表示
    • 继承AQS的子类需要重写AQS的几个方法来自定义维护state的方式,进而控制如何获取锁和释放锁

AQS提供了以下方法来维护state

  1. - `**getState()**` - 获取state
  2. - `**setState()**` - 设置state
  3. - `**compareAndSetState()**` - CAS操作设置state,注意**使用CAS设置state仅保证设置state时的原子性,并不会反复重试,AQS的原理仍然是阻塞式的锁**
  • AQS提供了基于FIFO的阻塞队列,类似于Monitor的EntryList,不过Monitor的EntryList是基于C++实现的,而AQS的阻塞队列是基于Java实现的
  • AQS提供了条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet,不过AQS支持多个条件变量,而Monitor仅隐式支持一个
  • AQS的使用

AQS被推荐定义为自定义同步组件中继承自AQS的静态内部类,例如ReentrantLock的定义

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. //Sync是抽象的是因为ReentrantLock内部还实现了FairSync和NonfairSync,它们都继承自Sync
  4. abstract static class Sync extends AbstractQueuedSynchronizer {
  5. ...
  6. }
  7. ...
  8. }

2 AQS的设计模式

  • 模板方法设计模式

AQS使用模板方法设计模式,即AQS将一些方法开放给子类进行重写,而AQS给同步组件所提供模板方法又会重新调用其被子类所重写的方法

  • AQS提供的模板方法

在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义
模板方法不需要重写,但是模板方法会调用被子类重写的方法

  • **void acquire(int arg)**

独占式获取同步状态,如果获取失败则将线程插入阻塞队列进行等待

  • **void acquireInterruptibly(int arg)**

acquire()方法相同,但在阻塞队列中等待的时候可以被中断

  • **boolean tryAcquireNanos(int arg, long nanosTimeout)**

**acquireInterruptibly()**基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false

  • **boolean release(int arg)**

释放同步状态,该方法会唤醒在阻塞队列中的下一个节点

  • AQS可重写的方法(独占模式)

AQS中被protected修饰的方法可以根据需要被继承自AQS的子类重写,只需要重写以下三个方法

  • **protected boolean tryAcquire(int arg)**

独占式尝试使用CAS获取一次锁(实际上是尝试使用CAS设置锁状态),获取失败时返回false,否则返回true
实现该方法需要查询当前锁状态并判断锁状态是否符合预期,然后再通过CAS操作设置锁状态

  • **protected boolean tryRelease()**

独占式尝试释放锁

  • protected boolean isHeldExclusively()

当前锁是否在独占模式下被线程占用,即表示锁是否被当前线程所独占

3 自定义独占式不可重入锁

  • 实现Lock接口需要实现多个方法,如果方法内部的逻辑全部自己实现比较困难。但是我们自定义的同步器已经实现了大部分锁的底层逻辑,因此我们实现Lock接口的方法时只需要编写很少一部分代码

    1. class MyLock implements Lock {
    2. MySync sync = new MySync();
    3. //自定义非重入同步器
    4. static class MySync extends AbstractQueuedSynchronizer {
    5. //尝试获取锁
    6. @Override
    7. protected boolean tryAcquire(int acquires) {
    8. //尝试使用CAS操作将state设置为1
    9. if (compareAndSetState(0, 1)) {
    10. //加锁成功,将锁的owner设置为当前线程
    11. setExclusiveOwnerThread(Thread.currentThread());
    12. return true;
    13. }
    14. return false;
    15. }
    16. //尝试释放锁
    17. @Override
    18. protected boolean tryRelease(int acquires) {
    19. //注意以下两语句的顺序不能颠倒
    20. //因为AQS的state被volatile修饰,而volatile的写屏障可以保证在此之前语句的操作被其他线程看到
    21. setExclusiveOwnerThread(null);
    22. setState(0);
    23. return true;
    24. }
    25. //是否持有独占锁
    26. @Override
    27. protected boolean isHeldExclusively() {
    28. return getState() == 1;
    29. }
    30. protected Condition newCondition() {
    31. return new ConditionObject();
    32. }
    33. }
    34. @Override
    35. //尝试加锁,若不成功则进入阻塞队列
    36. public void lock() {
    37. sync.acquire(1);
    38. }
    39. @Override
    40. //尝试加锁,若不成功则进入阻塞队列,可被打断
    41. public void lockInterruptibly() throws InterruptedException {
    42. sync.acquireInterruptibly(1);
    43. }
    44. @Override
    45. //尝试加锁一次,若不成功则返回,不进入阻塞队列
    46. public boolean tryLock() {
    47. return sync.tryAcquire(1);
    48. }
    49. @Override
    50. //尝试加锁,若不成功则进入阻塞队列,阻塞时间有上限
    51. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    52. return sync.tryAcquireNanos(1, unit.toNanos(time));
    53. }
    54. @Override
    55. //释放锁
    56. public void unlock() {
    57. sync.release(1);
    58. }
    59. @Override
    60. //生成条件变量
    61. public Condition newCondition() {
    62. return sync.newCondition();
    63. }
    64. }
  • 测试 ```java MyLock lock = new MyLock(); new Thread(() -> { log.debug(“locking…”); lock.lock(); try {

    1. log.debug("get lock!");
    2. Thread.sleep(5000);

    } catch (InterruptedException e) {

    1. e.printStackTrace();

    } finally {

    1. log.debug("unlocking...");
    2. lock.unlock();

    } }, “t1”).start();

new Thread(() -> { log.debug(“locking…”); lock.lock(); try { log.debug(“get lock!”); } finally { log.debug(“unlocking…”); lock.unlock(); } }, “t2”).start();

  1. - 上述代码输出
  2. ```java
  3. 15:24:38 [t1] c.Test - locking...
  4. 15:24:38 [t1] c.Test - get lock!
  5. 15:24:38 [t2] c.Test - locking...
  6. 15:24:43 [t1] c.Test - unlocking...
  7. 15:24:43 [t2] c.Test - get lock!
  8. 15:24:43 [t2] c.Test - unlocking...

4 AQS原理

  • 同步组件依赖于AQS,同步组件的方法大量调用了AQS的模板方法(即AQS的API),因此了解了AQS的实现原理也就能很轻松的了解同步组件的实现原理


1 AQS的属性

  1. //阻塞队列的头节点
  2. private transient volatile Node head;
  3. //阻塞队列的尾节点
  4. private transient volatile Node tail;
  5. //同步状态
  6. private volatile int state;
  7. //独占模式下的持有同步状态的线程
  8. //该属性是AQS父类中的属性,放在此处方便阅读
  9. private transient Thread exclusiveOwnerThread

2 阻塞队列

  • 阻塞队列概述

    • 当同步状态被某个线程占有,其他请求该同步状态的线程将会阻塞,从而进入阻塞队列。
    • 就数据结构而言,队列的实现方式无外乎两者一是通过数组的形式,另外一种则是链表的形式。AQS中的同步队列则是通过链式方式进行实现
  • 阻塞队列节点的属性

在AQS有一个静态内部类Node,表示阻塞队列中的节点,其属性如下

  1. //节点等待状态
  2. volatile int waitStatus;
  3. //当前节点/线程的前驱节点
  4. volatile Node prev;
  5. //当前节点/线程的后继节点
  6. volatile Node next;
  7. //加入阻塞队列的线程引用
  8. volatile Thread thread;
  • 从上述属性我们可以看出阻塞队列是一个双向链表
  • 节点的等待状态

    1. //节点从同步队列中取消
    2. static final int CANCELLED = 1;
    3. //后继节点的线程处于等待状态
    4. static final int SIGNAL = -1;
    5. //当前节点位于条件变量的等待队列中
    6. static final int CONDITION = -2;
    7. //初始状态
    8. static final int INITIAL = 0;
  • AQS与阻塞队列

    • AQS的属性中有两个重要属性

      1. //阻塞队列的头节点
      2. private transient volatile Node head;
      3. //阻塞队列的尾节点
      4. private transient volatile Node tail;
    • AQS通过阻塞队列的头节点和尾结点来管理阻塞队列,实现包括获取锁失败的线程进行入队,释放锁时对阻塞队列中的线程进行通知等核心方法

image.png

3 获取同步状态原理

获取同步状态方法源码

  • **void acquire(int arg)**

    1. public final void acquire(int arg) {
    2. //如果尝试获取同步状态失败,则创建一个Node对象并将其加入到阻塞队列中
    3. if (!tryAcquire(arg) &&
    4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    5. selfInterrupt();
    6. }
  • **boolean acquireQueued(final Node node, int arg)**

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean interrupted = false;
    3. try {
    4. for (;;) {
    5. //获取node的前驱节点
    6. final Node p = node.predecessor();
    7. //如果前驱节点是哨兵,则再次调用tryAcquire()尝试获取同步状态
    8. if (p == head && tryAcquire(arg)) {
    9. //成功获取同步状态,从阻塞队列中摘掉node
    10. setHead(node);
    11. p.next = null; // help GC
    12. return interrupted;
    13. }
    14. //获取同步状态再次失败,调用shouldParkAfterFailedAcquire()方法,阻塞当前线程
    15. if (shouldParkAfterFailedAcquire(p, node))
    16. interrupted |= parkAndCheckInterrupt();
    17. }
    18. } catch (Throwable t) {
    19. cancelAcquire(node);
    20. if (interrupted)
    21. selfInterrupt();
    22. throw t;
    23. }
    24. }
  • **boolean shouldParkAfterFailedAcquire(Node pred, Node node)**

    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. int ws = pred.waitStatus;
    3. if (ws == Node.SIGNAL)
    4. //This node has already set status asking a release to signal it,
    5. //so it can safely park.
    6. return true;
    7. if (ws > 0) {
    8. //Predecessor was cancelled. Skip over predecessors and indicate retry.
    9. do {
    10. node.prev = pred = pred.prev;
    11. } while (pred.waitStatus > 0);
    12. pred.next = node;
    13. } else {
    14. //waitStatus must be 0 or PROPAGATE.
    15. //Indicate that we need a signal, but don't park yet.
    16. //Caller will need to retry to make sure it cannot acquire before parking.
    17. pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    18. }
    19. return false;
    20. }
  • **boolean parkAndCheckInterrupt()**

    1. private final boolean parkAndCheckInterrupt() {
    2. LockSupport.park(this);
    3. return Thread.interrupted();
    4. }

获取同步状态流程分析
现假设已经线程0持有了同步状态,此时线程1尝试获取同步状态

  1. 线程0持有同步状态,没有发生竞争

image.png

  1. 线程1尝试获取同步状态失败(子类重写的tryAcquire()逻辑,假设逻辑是如果state已经是1则返回false),执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法
  2. 进入addWaiter()方法,该方法用于加入节点至阻塞队列

image.png

  • 图中三角表示Node的waitStatus,0为默认正常状态
  • 第一个Node称为Dummy或哨兵或头节点(惰性创建),用来占位,并不关联线程

头节点虽然不关联线程(指向null),但是头节点变量本身并不为null,而是指向一个node对象

  1. 进入acquireQueued()方法

线程1会在acquireQueued()方法中再次尝试获取同步状态
image.png

  • 线程1在一个死循环中不断尝试获得锁,如果失败则调用LockSupport.park()进入等待状态
  • 如果关联线程1的节点是紧邻哨兵的节点,那么再次尝试获取锁,此时线程0没有释放锁,线程1获取同步状态再次失败
  • 进入shouldParkAfterFailedAcquire()方法,此时前驱节点的waitStatus为0,将前驱节点的waitStatus改为 -1,方法返回false
    • 节点的waitStatus为-1表示该节点有责任唤醒其后继节点
  • 再次进入循环尝试获取同步状态,若再次失败,进入shouldParkAfterFailedAcquire()方法,此时该方法返回true,并进入**parkAndCheckInterrupt()**方法
    1. parkAndCheckInterrupt()方法调用LockSupport.park(this)阻塞当前线程(线程1)

image.png

4 释放同步状态原理

释放同步状态方法方法源码

  • **boolean release(int arg)**

    1. public final boolean release(int arg) {
    2. if (tryRelease(arg)) {
    3. Node h = head;
    4. if (h != null && h.waitStatus != 0)
    5. unparkSuccessor(h);
    6. return true;
    7. }
    8. return false;
    9. }
  • **void unparkSuccessor(Node node)**

    1. private void unparkSuccessor(Node node) {
    2. int ws = node.waitStatus;
    3. //将头节点waitStatus置为0
    4. if (ws < 0)
    5. node.compareAndSetWaitStatus(ws, 0);
    6. //找到与头节点最近的waitStatus为-1的后继节点
    7. Node s = node.next;
    8. if (s == null || s.waitStatus > 0) {
    9. s = null;
    10. for (Node p = tail; p != node && p != null; p = p.prev)
    11. if (p.waitStatus <= 0)
    12. s = p;
    13. }
    14. //唤醒后继节点
    15. if (s != null)
    16. LockSupport.unpark(s.thread);
    17. }

释放同步状态方法流程分析
现假设已经线程0持有了同步状态,多个线程竞争失败

  1. 初始状态,多个线程阻塞

image.png

  1. 执行tryRelease()方法,线程0尝试释放锁,如果成功,则将exclusiveOwnerThread置为null、state置为0

image.png
tryRelease()方法会调用release()来唤醒一个线程

  1. 执行release()方法,由于当前头节点不为null且头节点的waitStatus为-1,因此进入unparkSuccessor()方法:找到阻塞队列中离头节点最近的一个waitStatus为-1的Node(图中为表示线程1的Node),调用LockSupport.unpark()恢复其运行
  2. 线程1被唤醒,继续执行其acquireQueued()方法,尝试获取同步状态

若获取成功,则:

  • 将exclusiveOwnerThread设置为Thread-1、state设置为1
  • 将表示线程1的Node置为头节点
  • 将之前的头节点与阻塞队列断开,此后该头节点可被垃圾回收

image.png
若获取失败,如又有一个新的线程4获取到了同步状态,那么线程1获取同步状态失败,重新调用LockSupport.park()方法阻塞线程1
image.png
由于等待最久的线程1不一定能获取到同步状态,因此这是不公平的策略

5 可打断原理

  • 如果希望在等待锁的时候可以被打断,则基于AQS的同步组件会调用**acquireInterruptibly()**而不是**acquire()**方法


  • **void acquireInterruptibly(int arg) throws InterruptedException**

    1. public final void acquireInterruptibly(int arg) throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. //尝试获取同步状态失败后,进入doAcquireInterruptibly()而不是acquireQueued()
    5. if (!tryAcquire(arg))
    6. doAcquireInterruptibly(arg);
    7. }
  • **void doAcquireInterruptibly(int arg)**

    1. private void doAcquireInterruptibly(int arg)
    2. throws InterruptedException {
    3. final Node node = addWaiter(Node.EXCLUSIVE);
    4. try {
    5. for (;;) {
    6. final Node p = node.predecessor();
    7. if (p == head && tryAcquire(arg)) {
    8. setHead(node);
    9. p.next = null; // help GC
    10. return;
    11. }
    12. if (shouldParkAfterFailedAcquire(p, node) &&
    13. parkAndCheckInterrupt())
    14. //注意这里与acquireQueued()的区别
    15. //这里会直接抛出异常,而acquireQueued()不会
    16. throw new InterruptedException();
    17. }
    18. } catch (Throwable t) {
    19. cancelAcquire(node);
    20. throw t;
    21. }
    22. }
  • **boolean parkAndCheckInterrupt()**

    1. private final boolean parkAndCheckInterrupt() {
    2. LockSupport.park(this);
    3. // 当park()被打断时,会返回true并清除打断标记
    4. return Thread.interrupted();
    5. }

6 条件变量等待原理

  • 条件变量概述

    • 每一个条件变量就对应着一个等待队列(类似于Monitor的WaitSet)
    • 条件变量的实现类是ConditionObject
    • ConditionObject是AQS的内部类
  • 等待与唤醒

    • 等待

可以在条件变量上调用await()方法,使得当前线程在该条件上等待,进入WAITING状态

  • 唤醒

可以在条件变量上调用signal()signalAll()方法,唤醒条件变量上的一个或全部线程

  • 条件变量的属性
    1. //等待队列的头节点
    2. private transient Node firstWaiter;
    3. //等待队列的尾结点
    4. private transient Node lastWaiter;

条件变量等待方法源码

  • **void await() throws InterruptedException**

    1. public final void await() throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. //将代表当前线程的Node加入当前条件的等待队列中
    5. //等待队列是一个链表,类似于同步器的阻塞队列
    6. Node node = addConditionWaiter();
    7. //将节点对应的持有的锁全部释放掉并唤醒阻塞队列中的线程
    8. int savedState = fullyRelease(node);
    9. int interruptMode = 0;
    10. while (!isOnSyncQueue(node)) {
    11. //进入WAITING状态,等待被唤醒
    12. LockSupport.park(this);
    13. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    14. break;
    15. }
    16. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    17. interruptMode = REINTERRUPT;
    18. if (node.nextWaiter != null) // clean up if cancelled
    19. unlinkCancelledWaiters();
    20. if (interruptMode != 0)
    21. reportInterruptAfterWait(interruptMode);
    22. }
  • **Node addConditionWaiter()**

    1. private Node addConditionWaiter() {
    2. //###如果当前线程没有持有锁,则抛出异常###
    3. //因此await()与Object::wait()方法一样,调用的前提是持有锁
    4. if (!isHeldExclusively())
    5. throw new IllegalMonitorStateException();
    6. Node t = lastWaiter;
    7. // If lastWaiter is cancelled, clean out.
    8. if (t != null && t.waitStatus != Node.CONDITION) {
    9. unlinkCancelledWaiters();
    10. t = lastWaiter;
    11. }
    12. //新建一个代表当前线程的节点并加入到当前条件的等待队列中
    13. Node node = new Node(Node.CONDITION);
    14. if (t == null)
    15. firstWaiter = node;
    16. else
    17. t.nextWaiter = node;
    18. lastWaiter = node;
    19. return node;
    20. }

条件变量等待流程分析

  1. 初始状态,线程0获取锁,多个线程阻塞

image.png

  1. 线程0在一个Condition对象上调用**await()**方法

await()方法会创建一个代表当前线程的Node对象(该Node对象的WaitStatus为-2,表示在Condition的等待队列中等待),之后将该Node加入Condition对象的等待队列中

  • 该等待队列类似于AQS的阻塞队列,但是没有头节点

image.png

  1. 线程0进入等待队列后,将调用fullRelease(),释放掉线程0的所有锁,并唤醒阻塞队列中的线程

image.png

7 条件变量唤醒原理

条件变量唤醒方法源码

  • **void signal()**

    1. public final void signal() {
    2. //如果不是锁的持有者调用该方法则抛出异常
    3. if (!isHeldExclusively())
    4. throw new IllegalMonitorStateException();
    5. //唤醒Condition等待队列中的第一个线程
    6. Node first = firstWaiter;
    7. if (first != null)
    8. doSignal(first);
    9. }
  • **void doSignal(Node first)**

    1. private void doSignal(Node first) {
    2. do {
    3. //将first节点从等待队列中断开
    4. if ( (firstWaiter = first.nextWaiter) == null)
    5. lastWaiter = null;
    6. first.nextWaiter = null;
    7. //将first节点转移到AQS的阻塞队列中
    8. //如果转移失败,则尝试唤醒first的后一个节点
    9. } while (!transferForSignal(first) &&
    10. (first = firstWaiter) != null);
    11. }
  • **boolean transferForSignal(Node node)**

    1. final boolean transferForSignal(Node node) {
    2. //先将Node的WaitStatus从2转为0
    3. if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
    4. return false;
    5. //enq()方法将node加入到AQS的阻塞队列,如果成功了返回node的前驱节点
    6. Node p = enq(node);
    7. //将前驱节点的WaitStatue改为-1
    8. int ws = p.waitStatus;
    9. if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
    10. LockSupport.unpark(node.thread);
    11. return true;
    12. }

条件变量唤醒流程分析

  1. 初始状态,Thread0在等待队列,Thread1持有锁

image.png

  1. 持有锁的线程1调用Condition对象的signal()方法唤醒该对象等待队列的一个线程

singal()方法调用doSignal()方法将表示线程0的Node从Condition对象的等待队列中断开
image.png

  1. doSignal()方法调用transferForSignal()方法,将代表线程0的Node的状态改为0,并将其插入到AQS阻塞队列的队尾,同时将Node的前驱节点的状态改为-1

image.png

  • 需要注意的是,线程0在整个唤醒的过程中都处于**park()**状态(WAITING状态)没有改变,唤醒仅是让线程0可以参与后续锁的竞争

8 阻塞队列(同步队列)与等待队列的关系

99d9861a09df9b7c33da9feb37198bec_watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly90aGlua3dvbi5ibG9nLmNzZG4ubmV0,size_16,color_FFFFFF,t_70.png

5 Condition接口与Object监视器方法

  • Condition接口与Object监视器方法概述
    • 任何一个Java对象都天然继承于Object类,在线程间实现通信的往往会应用到Object的几个方法,比如wait()wait(long timeout)notify()notifyAll()几个方法实现等待/唤醒机制

同样的, 在JUC包的Lock体系下依然会有同样的方法实现等待/唤醒机制。

  • Object的wait()和notify()/notifyAll()是与Monitor配合完成线程间的等待/唤醒机制

Condition与AQS配合(ConditionObject是AQS的内部类)完成等待通知机制
前者是Java底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。

  • Condition接口与Object监视器方法的对比 | 对比项 | Object Monitor | Condition | | —- | —- | —- | | 前置条件 | 线程获取到锁对象 |
    - 线程调用lock.lock()获取到锁
    - 存在Condition对象
    | | 调用方式 | 锁对象上调用 | Condition对象上调用 | | 等待队列(WaitSet)个数 | 1 | 多个 | | 线程进入等待队列时释放锁 | | | | 线程进入等待队列时响应中断 | √ | √ | | 线程进入等待队列时不响应中断 | ×(必须响应中断) | | | 线程进入等待队列时进入超时等待状态 | √ | √ | | 唤醒等待队列中的一个线程 | √ | √ | | 唤醒等待队列中的全部线程 | √ | √ |

3 ReentrantLock类

1 ReentrantLock简介

  • ReentrantLock(重入锁)类概述

    • ReentrantLock是java.util.concurrent.locks包下的类
    • ReentrantLock是Lock接口的一个实现类,也是在实际编程中使用频率很高的一个锁
  • ReentrantLock基本语法

    1. // 获取锁
    2. reentrantLock.lock();
    3. try {
    4. // 临界区
    5. } finally {
    6. // 释放锁
    7. reentrantLock.unlock();
    8. }
    • synchronized是在Java语法级别(关键字级别)保护临界区,而ReentrantLock则是在对象级别保护临界区,因此在使用ReentrantLock前需要先创建ReentrantLock对象
    • Lock应该确保在finally块中释放锁,否则一旦临界区中的代码抛出异常时,则可能永远不会释放持有的锁

2 ReentrantLock与synchronized

  • ReentrantLock与synchronized功能的区别

ReentrantLock与synchronized一样是可重入的,在基本用法上ReentrantLock也与synchronized很相似,只是代码写法上稍有区别。不过ReentrantLock与synchronized相比增加了一些高级功能,主要是以下四项(a、b实际上是LockSupport的特性)

  1. 超时加锁(主动避免长期等待锁释放)

当持有锁的线程长期不释放锁的时候,正在阻塞等待锁的线程可以选择放弃等待,改为处理其他事情

  1. 可中断加锁(被动避免长期等待锁释放)

等待持有锁的线程释放锁的线程,也可以被中断
可中断可以避免死锁

  1. 公平锁
  • 公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁
  • 非公平锁是指在锁被释放时,任何一个等待锁的线程都有机会获得锁
  • synchronized中的锁是非公平的,ReentrantLock在默认情况下也是非公平的,但可以通过带布尔值的构造函数将ReentrantLock配置为公平锁
  • ReentrantLock一旦使用公平锁,其性能将急剧下降,会明显影响吞吐量
  1. 锁绑定多个条件
  • 一个ReentrantLock对象可以同时绑定多个Condition对象(Condition类位于java.util.concurrent.locks包)
  • 在synchronized中,锁对象的wait()和notify()/notifyAll()方法相配合,可以实现一个隐含的条件,但是无法和多于一个的条件相关联

ReentrantLock则可以通过newCondition()方法与多个条件相关联

  • 换句话说,synchronized只有一个WaitSet,而ReentrantLock可以有多个WaitSet,每个WaitSet对应一个条件,线程可以根据不同的条件进入不同的WaitSet,这样可以做到更为精准的唤醒,减少虚假唤醒
  • ReentrantLock与synchronized的选择
    • 由于JDK6针对synchronized关键字的优化,两者在性能上已相差无几,因此不再是选择的参考因素
    • 在功能上,ReentrantLock是synchronized的超集,如果需要使用ReentrantLock独有的功能肯定要选择ReentrantLock
    • 在synchronized和ReentrantLock都可满足需要时,优先选择synchronized,原因如下
      1. synchronized是在Java语法层面上的同步,足够清晰简单,被更多程序员熟知
      2. Lock必须确保在finally块中释放锁,但这是由程序员自己来保证的,而使用synchronized的话则可以由JVM来确保即使出现异常,锁也能被自动释放
      3. 从长远来看,synchronized更容易被继续优化,而ReentrantLock则难以继续优化

3 ReentrantLock的使用

  1. 可中断加锁

注意如果想让进入BLOCKED状态的线程可被中断,那么在加锁时就不能使用reentrantLock.lock()方法,而必须使用**reentrantLock.lockInterruptibly()**方法

  1. private static ReentrantLock reentrantLock = new ReentrantLock();
  2. public static void main(String[] args) {
  3. Thread t1 = new Thread(() -> {
  4. log.debug("启动...");
  5. try {
  6. //使用可被打断的方式加锁
  7. reentrantLock.lockInterruptibly();
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. log.debug("等锁的过程中被打断");
  11. return;
  12. }
  13. try {
  14. log.debug("获得了锁");
  15. } finally {
  16. reentrantLock.unlock();
  17. }
  18. }, "t1");
  19. reentrantLock.lock();
  20. try {
  21. log.debug("主线程获得了锁");
  22. t1.start();
  23. Thread.sleep(1000);
  24. t1.interrupt();
  25. log.debug("执行打断");
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } finally {
  29. reentrantLock.unlock();
  30. }
  31. }
  • 上述代码输出
    1. 15:48:49 [main] c.Test - 主线程获得了锁
    2. 15:48:49 [t1] c.Test - 启动...
    3. 15:48:50 [main] c.Test - 执行打断
    4. 15:48:50 [t1] c.Test - 等锁的过程中被打断
    5. java.lang.InterruptedException
    6. at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:944)
    7. at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1263)
    8. at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:317)
    9. at Test.lambda$main$0(Test.java:18)
    10. at java.base/java.lang.Thread.run(Thread.java:834)
  1. 超时等待

超时等待需要使用**reentrantLock.tryLock(long timeout)**方法实现

  • 如果该方法不传入参数,则线程在尝试获取锁失败后将直接退出阻塞状态
  • 如果传入参数则会在阻塞时间到达timeout后退出阻塞 ```java private static final ReentrantLock reentrantLock = new ReentrantLock();

public static void main(String[] args) { Thread t1 = new Thread(() -> { log.debug(“启动…”); if (!reentrantLock.tryLock()) { log.debug(“获取立刻失败,返回”); return; } try { log.debug(“获得了锁”); } finally { reentrantLock.unlock(); } }, “t1”);

  1. reentrantLock.lock();
  2. try {
  3. log.debug("主线程获得了锁");
  4. t1.start();
  5. Thread.sleep(2000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } finally {
  9. reentrantLock.unlock();
  10. }

}

  1. - 上述代码输出
  2. ```java
  3. 16:00:10 [main] c.Test - 主线程获得了锁
  4. 16:00:10 [t1] c.Test - 启动...
  5. 16:00:10 [t1] c.Test - 获取立刻失败,返回
  1. 公平锁

如果想将ReentrantLock配置为公平锁,实例化ReentrantLock对象时,给ReentrantLock构造函数传入true即可

  • ReentrantLock构造函数如下
    1. public ReentrantLock() {
    2. sync = new NonfairSync();
    3. }
    4. public ReentrantLock(boolean fair) {
    5. sync = fair ? new FairSync() : new NonfairSync();
    6. }
  1. 条件变量
  • 使用**reentrantLock.newCondition()**可以产生一个新的条件对象Condition
  • 等待

在Condition对象上调用**condition.await()**方法即可在指定条件上等待

  • 在执行condition.await()方法之前必须先获得锁,执行该方法后将释放锁
  • 执行await()方法的线程可以被唤醒也可以被中断(类似wait()),中断将抛出异常
    • 唤醒

在Condition对象上调用**condition.signal()**/**condition.signalAll()**方法即可唤醒某个或所有等待指定条件的线程

  1. static ReentrantLock lock = new ReentrantLock();
  2. static Condition waitCigaretteQueue = lock.newCondition();
  3. static Condition waitBreakfastQueue = lock.newCondition();
  4. static volatile boolean hasCigarette = false;
  5. static volatile boolean hasBreakfast = false;
  6. public static void main(String[] args) throws InterruptedException {
  7. new Thread(() -> {
  8. try {
  9. lock.lock();
  10. while (!hasCigarette) {
  11. try {
  12. waitCigaretteQueue.await();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. log.debug("等到了它的烟");
  18. } finally {
  19. lock.unlock();
  20. }
  21. }).start();
  22. new Thread(() -> {
  23. try {
  24. lock.lock();
  25. while (!hasBreakfast) {
  26. try {
  27. waitBreakfastQueue.await();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. log.debug("等到了它的早餐");
  33. } finally {
  34. lock.unlock();
  35. }
  36. }).start();
  37. Thread.sleep(1000);
  38. //唤醒等待早餐的线程
  39. sendBreakfast();
  40. Thread.sleep(1000);
  41. //唤醒等待香烟的线程
  42. sendCigarette();
  43. }
  44. private static void sendCigarette() {
  45. lock.lock();
  46. try {
  47. log.debug("送烟来了");
  48. hasCigarette = true;
  49. //随机唤醒一个waitCigaretteQueue条件上的线程
  50. waitCigaretteQueue.signal();
  51. } finally {
  52. lock.unlock();
  53. }
  54. }
  55. private static void sendBreakfast() {
  56. lock.lock();
  57. try {
  58. log.debug("送早餐来了");
  59. hasBreakfast = true;
  60. //唤醒所有waitCigaretteQueue条件上的线程
  61. waitBreakfastQueue.signalAll();
  62. } finally {
  63. lock.unlock();
  64. }
  65. }
  • 上述代码输出
    1. 16:38:24 [main] c.Test - 送早餐来了
    2. 16:38:24 [Thread-1] c.Test - 等到了它的早餐
    3. 16:38:25 [main] c.Test - 送烟来了
    4. 16:38:25 [Thread-0] c.Test - 等到了它的烟

4 ReentrantLock原理

  • ReentrantLock类结构图

image.png

  • ReentrantLock内部实现了一个抽象类Sync,其继承自AQS
  • FairSyncNonfairSync继承自Sync,分别实现了公平同步器和非公平同步器’,ReentrantLock实际使用的是这两个同步器
  • FairSync和NonfairSync的区别是对于**tryAcquire()**方法实现的不同,其他方法都继承自Sync

1 ReentrantLock源码

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. abstract static class Sync extends AbstractQueuedSynchronizer{...}
  4. //非公平同步器
  5. static final class NonfairSync extends Sync{...}
  6. //公平同步器
  7. static final class FairSync extends Sync{...}
  8. //ReentrantLock默认为非公平锁
  9. public ReentrantLock() {
  10. sync = new NonfairSync();
  11. }
  12. //可以指定ReentrantLock为公平锁
  13. public ReentrantLock(boolean fair) {
  14. sync = fair ? new FairSync() : new NonfairSync();
  15. }
  16. //直接调用AQS的acquire方法
  17. public void lock() {
  18. sync.acquire(1);
  19. }
  20. public void lockInterruptibly() throws InterruptedException {
  21. sync.acquireInterruptibly(1);
  22. }
  23. public boolean tryLock() {
  24. return sync.nonfairTryAcquire(1);
  25. }
  26. public boolean tryLock(long timeout, TimeUnit unit)
  27. throws InterruptedException {
  28. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  29. }
  30. public void unlock() {
  31. sync.release(1);
  32. }
  33. public Condition newCondition() {
  34. return sync.newCondition();
  35. }
  36. ...
  37. }
  • 从上述代码中可以看到ReentrantLock的核心方法均是调用的非公平同步器或公平同步器的方法

2 NonfairSync源码

  1. static final class NonfairSync extends Sync {
  2. protected final boolean tryAcquire(int acquires) {
  3. return nonfairTryAcquire(acquires);
  4. }
  5. }
  • NonfairSync的方法与Sync的方法基本一致,只是起到了一个包装的作用

3 FairSync源码

  1. static final class FairSync extends Sync {
  2. //公平版本的tryAcquire
  3. @ReservedStackAccess
  4. protected final boolean tryAcquire(int acquires) {
  5. final Thread current = Thread.currentThread();
  6. int c = getState();
  7. if (c == 0) {
  8. //注意这里与nonfairTryAcquire()方法的区别
  9. //当锁可用时,公平同步器会先检查AQS阻塞队列中是否有节点
  10. //如果AQS没有节点则去竞争锁,否则不参与竞争,直接进入阻塞队列
  11. if (!hasQueuedPredecessors() &&
  12. compareAndSetState(0, acquires)) {
  13. setExclusiveOwnerThread(current);
  14. return true;
  15. }
  16. }
  17. else if (current == getExclusiveOwnerThread()) {
  18. int nextc = c + acquires;
  19. if (nextc < 0)
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. return false;
  25. }
  26. }

3 Sync源码

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. //tryAcquire()方法在Sync的两个子类中实现,NonefairSync会调用该方法
  3. //该方法表现为不公平策略
  4. @ReservedStackAccess
  5. final boolean nonfairTryAcquire(int acquires) {
  6. final Thread current = Thread.currentThread();
  7. int c = getState();
  8. //如果当前锁无人获得,则将state设置为1
  9. if (c == 0) {
  10. if (compareAndSetState(0, acquires)) {
  11. setExclusiveOwnerThread(current);
  12. return true;
  13. }
  14. }
  15. //如果当前线程已经获得了锁,则返回true,即发生了锁重入
  16. else if (current == getExclusiveOwnerThread()) {
  17. //锁重入后令state++
  18. int nextc = c + acquires;
  19. if (nextc < 0) // overflow
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. return false;
  25. }
  26. @ReservedStackAccess
  27. protected final boolean tryRelease(int releases) {
  28. //锁释放时,令state--
  29. int c = getState() - releases;
  30. if (Thread.currentThread() != getExclusiveOwnerThread())
  31. throw new IllegalMonitorStateException();
  32. boolean free = false;
  33. //当state为0时才释放锁,因为此时当前线程还持有锁
  34. if (c == 0) {
  35. free = true;
  36. setExclusiveOwnerThread(null);
  37. }
  38. setState(c);
  39. //state大于0,此时并不释放锁,只是将锁重入的计数(state)减1
  40. return free;
  41. }
  42. protected final boolean isHeldExclusively() {
  43. // While we must in general read state before owner,
  44. // we don't need to do so to check if current thread is owner
  45. return getExclusiveOwnerThread() == Thread.currentThread();
  46. }
  47. final ConditionObject newCondition() {
  48. return new ConditionObject();
  49. }
  50. final Thread getOwner() {
  51. return getState() == 0 ? null : getExclusiveOwnerThread();
  52. }
  53. final int getHoldCount() {
  54. return isHeldExclusively() ? getState() : 0;
  55. }
  56. final boolean isLocked() {
  57. return getState() != 0;
  58. }
  59. private void readObject(java.io.ObjectInputStream s)
  60. throws java.io.IOException, ClassNotFoundException {
  61. s.defaultReadObject();
  62. setState(0); // reset to unlocked state
  63. }
  64. }

4 锁重入原理

  • **Sync::nonfairTryAcquire()****Sync::boolean tryRelease()** ```java @ReservedStackAccess final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();

    //如果当前锁无人获得,则将state设置为1 if (c == 0) {

    1. if (compareAndSetState(0, acquires)) {
    2. setExclusiveOwnerThread(current);
    3. return true;
    4. }

    }

    //如果当前线程已经获得了锁,则返回true表示加锁成功,此时发生了锁重入 else if (current == getExclusiveOwnerThread()) {

    1. //锁重入后令state++
    2. int nextc = c + acquires;
    3. if (nextc < 0) // overflow
    4. throw new Error("Maximum lock count exceeded");
    5. setState(nextc);
    6. return true;

    } return false; }

@ReservedStackAccess protected final boolean tryRelease(int releases) { //锁释放时,令state— int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;

  1. //当state为0时才释放锁,因为此时当前线程还持有锁
  2. if (c == 0) {
  3. free = true;
  4. setExclusiveOwnerThread(null);
  5. }
  6. setState(c);
  7. //state大于0,此时并不释放锁,只是将锁重入的计数(state)减1
  8. return free;

}

  1. - 从上述代码中可以看到,**在加锁和释放锁时通过state的值和exclusiveOwnerThread属性来判断**
  2. - 当获取锁时,若state的值大于0,则说明锁已经被线程获取,又判断持有锁的线程和当前线程是否相等,若相等,则**加锁成功,state的值加1**
  3. - 当**释放锁时,state的值减1**,**若此时state的值为0,则释放锁成功并返回true**,否则返回false
  4. <a name="O0fAk"></a>
  5. #### 4 非公平锁原理
  6. - **当ReentrantLock初始化为非公平锁时,使用的同步器是NonfairSync,此时ReentrantLock的属性Sync指向NonfairSync对象**
  7. <br />
  8. - `**ReentrantLock::lock()**`
  9. ```java
  10. public void lock() {
  11. sync.acquire(1);
  12. }
  • 可以看到调用lock时实际调用的是AQS::acquire()方法,而acquire()方法又会调用NonfairSync::tryAcquire()方法,而tryAcquire()方法直接调用Sync::nonfairTryAcquire()
  • **Sync::nonfairTryAcquire()**

    1. @ReservedStackAccess
    2. final boolean nonfairTryAcquire(int acquires) {
    3. final Thread current = Thread.currentThread();
    4. int c = getState();
    5. //如果当前锁无人获得,则将state设置为1
    6. //由于这里直接尝试使用CAS操作去竞争锁,不检查AQS阻塞队列,因此是非公平的策略
    7. if (c == 0) {
    8. if (compareAndSetState(0, acquires)) {
    9. setExclusiveOwnerThread(current);
    10. return true;
    11. }
    12. }
    13. //如果当前线程已经获得了锁,则返回true,即发生了锁重入
    14. else if (current == getExclusiveOwnerThread()) {
    15. //锁重入后令state++
    16. int nextc = c + acquires;
    17. if (nextc < 0) // overflow
    18. throw new Error("Maximum lock count exceeded");
    19. setState(nextc);
    20. return true;
    21. }
    22. return false;
    23. }
    • 从上述代码可以看到,任何线程在调用**nonfairTryAcquire()**方法时都会直接去竞争锁,而不会去检查AQS阻塞队列,这可能导致AQS阻塞队列的线程获取不到锁,最终导致不公平

5 公平锁原理

  • 当ReentrantLock初始化为公平锁时,使用的同步器是FairSync,此时ReentrantLock的属性Sync指向FairSync对象


  • FairSync与NonfairSync的唯一区别就是**tryAcquire()**方法的区别
    • 对于FairSync来说,新的线程尝试获取锁时,如果发现AQS阻塞队列中已经有线程在等待,则直接放弃竞争,进入阻塞队列
    • 具体实现细节见FairSync源码

4 ReentrantReadWriteLock类

1 简介

  • ReentrantReadWriteLock(读写锁)概述
    • 在并发场景中用于解决线程安全的问题,我们几乎会高频率的使用到独占式锁,通常使用Java提供的关键字synchronized或者JUC包中实现了Lock接口的ReentrantLock。

上述两种锁都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁

  • 在一些大部分只是读数据而写数据很少的业务场景中,如果仅仅是读数据的话并不会影响数据正确性(不会出现脏读),而在这种业务场景下依然使用独占锁的话,很显然会影响性能。
  • 针对这种读多写少的情况,JUC包还提供了另外一个实现Lock接口的ReentrantReadWriteLock类。
  • ReentrantReadWriteLock特点
    • 读写锁允许同一时刻被多个读线程访问,但是在写线程访问时所有的读线程和其他的写线程都会被阻塞
    • 读写锁中有两种锁,分别是读锁(ReadLock)写锁(WriteLock),读锁可以用来保护共享数据的读取,写锁用来保护共享数据的写入,读锁与写锁相互配合可以保证
      • 读-读操作并行
      • 读-写操作串行
      • 写-写操作串行
    • 读写锁的其他特性如下
      • 公平锁

与ReentrantLock一样,读写锁默认非公平锁,但是也可以指定为公平锁

  1. - **重入性**

与ReentrantLock一样,支持锁的重入

  1. - **条件变量**

读锁不支持条件变量,写锁支持条件变量

  1. - **锁重入时不支持锁升级**

一个线程获取读锁后不能再获取写锁,必须先释放读锁再获取写锁

  1. - **锁重入时支持锁降级**

一个线程获取写锁后,还可以获取读锁
由于在写锁释放前可以获取读锁,因此线程获取读锁后再释放写锁,就完成了写锁到读锁的降级

2 ReentrantReadWriteLock使用实例

  • 现定义一个线程安全的数据容器类DataContainer,其内部使用ReentrantReadWriteLock的读锁保护读取数据的read()方法,使用写锁来保护写入数据的write()方法

  • DataContainer源码

    1. @Slf4j(topic = "c.DataContainer")
    2. class DataContainer {
    3. private Object data;
    4. //读写锁
    5. private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    6. //读锁
    7. private final ReentrantReadWriteLock.ReadLock rLock = rw.readLock();
    8. //写锁
    9. private final ReentrantReadWriteLock.WriteLock wLock = rw.writeLock();
    10. //读取操作使用读锁保护
    11. public Object read() {
    12. log.debug("获取读锁...");
    13. rLock.lock();
    14. try {
    15. log.debug("读取");
    16. Thread.sleep(1000);
    17. return data;
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. } finally {
    21. log.debug("释放读锁...");
    22. rLock.unlock();
    23. }
    24. return null;
    25. }
    26. //写入操作使用写锁保护
    27. public void write() {
    28. log.debug("获取写锁...");
    29. wLock.lock();
    30. try {
    31. log.debug("写入");
    32. Thread.sleep(1000);
    33. } catch (InterruptedException e) {
    34. e.printStackTrace();
    35. } finally {
    36. log.debug("释放写锁...");
    37. wLock.unlock();
    38. }
    39. }
    40. }
  • 测试

    1. @Slf4j(topic = "c.Test")
    2. public class Test {
    3. static DataContainer dataContainer = new DataContainer();
    4. public static void main(String[] args) throws InterruptedException {
    5. log.debug("读-读并发测试");
    6. readRead();
    7. Thread.sleep(3000); //等待读-读测试完毕
    8. log.debug("读-写并发测试");
    9. readWrite();
    10. }
    11. //读-读测试
    12. static void readRead() {
    13. new Thread(() -> {
    14. dataContainer.read();
    15. }, "r_t1").start();
    16. new Thread(() -> {
    17. dataContainer.read();
    18. }, "r_t2").start();
    19. }
    20. //读-写测试
    21. static void readWrite() throws InterruptedException {
    22. new Thread(() -> {
    23. dataContainer.read();
    24. }, "r_t3").start();
    25. Thread.sleep(100); //等待一下,保证t1获取到读锁
    26. new Thread(() -> {
    27. dataContainer.write();
    28. }, "w_t1").start();
    29. }
    30. }

    测试结果

    1. 16:53:11 [main] c.Test - 读-读并发测试
    2. 16:53:11 [r_t1] c.DataContainer - 获取读锁...
    3. 16:53:11 [r_t2] c.DataContainer - 获取读锁...
    4. 16:53:11 [r_t1] c.DataContainer - 读取
    5. 16:53:11 [r_t2] c.DataContainer - 读取
    6. 16:53:12 [r_t2] c.DataContainer - 释放读锁...
    7. 16:53:12 [r_t1] c.DataContainer - 释放读锁...
    8. 16:53:14 [main] c.Test - 读-写并发测试
    9. 16:53:14 [r_t3] c.DataContainer - 获取读锁...
    10. 16:53:14 [r_t3] c.DataContainer - 读取
    11. 16:53:14 [w_t1] c.DataContainer - 获取写锁...
    12. 16:53:15 [r_t3] c.DataContainer - 释放读锁...
    13. 16:53:15 [w_t1] c.DataContainer - 写入
    14. 16:53:16 [w_t1] c.DataContainer - 释放写锁...
    • 从测试结果可以看到
      • 读-读操作可以并行
      • 读-写操作时,由于已经有线程开始了读操作,执行写操作的线程被阻塞

3 应用 - 实现Redis缓存与数据库一致性

  • 应用实现的功能
    1. 查询数据

查询数据时先看Redis中是否有数据,有的话直接从Redis中查,没有的话则从MySQL中查并将结果存入Redis

  1. 更新数据

更新数据时需要将数据更新到MySQL并将Redis中的缓存删除

1 非线程安全实现

  • 源码

    1. @Service("userService")
    2. public class UserServiceImpl implements UserService {
    3. @Autowired
    4. UserMapper userMapper;
    5. @Autowired
    6. JedisPool jedisPool;
    7. @Override
    8. public User findOne(int id) {
    9. Jedis jedis = jedisPool.getResource();
    10. String key = "person:" + id;
    11. //查询数据时先从Redis缓存中查询
    12. User value = (User) SerializeUtil.unserialize(jedis.get(key.getBytes()));
    13. if (value != null) {
    14. System.out.print("Get from Redis: ");
    15. return value;
    16. }
    17. //Redis中没有查询到,从MySQL中查并将结果存入Redis
    18. value = userMapper.findByCondition(id);
    19. jedis.set(key.getBytes(), SerializeUtil.serialize(value));
    20. System.out.print("Get from MySQL: ");
    21. return value;
    22. }
    23. @Override
    24. public int update(User user) {
    25. Jedis jedis = jedisPool.getResource();
    26. String key = "person:" + user.getId();
    27. //先更新数据库再清理缓存
    28. int rows = userMapper.update(user);
    29. jedis.del(key);
    30. return rows;
    31. }
    32. }
  • 测试 ```java ApplicationContext app = new ClassPathXmlApplicationContext(“applicationContext.xml”); UserService userService = (UserService) app.getBean(“userService”);

System.out.println(“==========> 查询”); System.out.println(userService.findOne(1)); System.out.println(userService.findOne(1)); System.out.println(userService.findOne(1));

System.out.println(“==========> 更新”); User user = new User(1, “常桐华”, “520”); System.out.println(userService.update(user)); System.out.println(userService.findOne(1));

  1. - 上述代码测试结果如下
  2. ```java
  3. ==========> 查询
  4. 14:47:59,308 INFO DruidDataSource:655 - {dataSource-1} inited
  5. Get from MySQL: User{id=1, username='崔奕宸', password='123'}
  6. Get from Redis: User{id=1, username='崔奕宸', password='123'}
  7. Get from Redis: User{id=1, username='崔奕宸', password='123'}
  8. ==========> 更新
  9. 1
  10. Get from MySQL: User{id=1, username='常桐华', password='520'}

上述代码中存在三个问题

  1. 对于Redis的操作是非线程安全的
  2. findOne()方法中,当多个线程同时首次查询同一个数据,多个线程可能都会到MySQL中去查询,导致缓存穿透
  3. update()方法涉及两个重要操作:1. 删除缓存内容 2. 更新数据库,两个操作的顺序不同(即更新策略的不同)会导致不同的线程安全问题,分析如下
  • 更新策略1:先删缓存再更新数据库

image.png

  • 如上图所示的执行顺序,该更新策略可能导致某些线程一直查询到旧值而查询不到新值
    • 更新策略2:先更新数据库再删缓存

image.png

  • 如上图所示的执行顺序,该更新策略可能导致少量的线程查询到旧值,但是该状况不会持续
  • 对于不需要严格线程安全的场景,该更新策略比较适合


2 线程安全实现

  • 对于上述出现的三个问题,可以通过ReentrantReadWriteLock来解决
    • 对MySQL或Redis的写操作使用写锁来保护
    • 对MySQL或Redis的读操作使用读锁来保护

源码如下

  1. @Service("userService")
  2. public class UserServiceImpl implements UserService {
  3. @Autowired
  4. UserMapper userMapper;
  5. @Autowired
  6. JedisPool jedisPool;
  7. @Autowired
  8. ReentrantReadWriteLock lock;
  9. @Override
  10. public User findOne(int id) {
  11. Jedis jedis = jedisPool.getResource();
  12. String key = "person:" + id;
  13. //需要读取Redis,加读锁保护
  14. lock.readLock().lock();
  15. User value;
  16. try {
  17. //查询数据时先从Redis缓存中查询
  18. value = (User) SerializeUtil.unserialize(jedis.get(key.getBytes()));
  19. if (value != null) {
  20. System.out.print("Get from Redis: ");
  21. return value;
  22. }
  23. } finally {
  24. lock.readLock().unlock();
  25. }
  26. lock.writeLock().lock();
  27. try {
  28. //可能多个线程试图查询数据库,因此这里需要双重检查防止缓存穿透
  29. value = (User) SerializeUtil.unserialize(jedis.get(key.getBytes()));
  30. if (value != null) {
  31. System.out.print("Get from Redis: ");
  32. } else {
  33. //Redis中没有查询到,从MySQL中查并将结果存入Redis
  34. value = userMapper.findByCondition(id);
  35. jedis.set(key.getBytes(), SerializeUtil.serialize(value));
  36. System.out.print("Get from MySQL: ");
  37. }
  38. return value;
  39. } finally {
  40. lock.writeLock().unlock();
  41. }
  42. }
  43. @Override
  44. public int update(User user) {
  45. Jedis jedis = jedisPool.getResource();
  46. String key = "person:" + user.getId();
  47. //需要写入数据库,加写锁保护
  48. lock.writeLock().lock();
  49. //先更新数据库再清理缓存
  50. try {
  51. int rows = userMapper.update(user);
  52. jedis.del(key);
  53. return rows;
  54. } finally {
  55. lock.writeLock().unlock();
  56. }
  57. }
  58. }

5 LockSupport类

  • LockSupport类概述
    • LockSupport是java.util.concurrent.locks包下的类
    • LockSupprot提供了线程的阻塞原语,用来阻塞线程和唤醒线程
    • AQS中阻塞队列底层使用LockSupport来阻塞和唤醒线程

由于LockSupport阻塞线程时,具有超时阻塞、可中断阻塞的特性,因此使用AQS的锁也具有超时阻塞、可中断阻塞的特性

1 LockSupport类API

  • LockSupport类中的方法都是静态方法


  • **void LockSupport.park()**

阻塞当前线程,使线程进入WAITING状态,可以通过中断或者unpark()方法返回

  • **void LockSupport.parkNanos(long nanos)**

阻塞当前线程,使线程进入TIMED_WAITING状态,最长不超过nanos纳秒

  • **void LockSupport.parkUntil(long deadline)**

阻塞当前线程,使线程进入TIMED_WAITING状态,直到deadline

  • **void LockSupport.unpark(Thread thread)**

唤醒线程thread,该方法可以提前唤醒thread,即如果先调用**unpark(thread)**,thread再调用**park()**,thread仍然不会被阻塞,因为被提前唤醒了

2 park()/unpark()实现原理

  • 每个线程都会与一个许可证关联

许可证实际上是Threadde native实现中的一个成员,代表线程是否可以阻塞的许可permit
当线程调用**park()**方法时,会检查许可证是否可用

  • 如果不可用线程就阻塞
  • 否则线程不阻塞,然后把许可证设置为不可用并返回

当线程调用**unpark()**方法时,会将许可证设置为可用

  • 注意只有一个许可证,因此多次连续调用unpark()方法和调用一次unpark()方法的效果相同
  • 实际上LockSupport阻塞和唤醒线程的功能是依赖于sun.misc.Unsafe类,其这是一个用C++编写的很底层的类

3 park()/unpark()和wait()/notify()的区别

  1. **wait()**/**notify()**方法必须配合Monitor一起使用

**park()**/**unpark()**可以在任何地方使用

  1. **park()**/**unpark()**使用时没有先后顺序,都可以使线程不阻塞

**wait()**必须在**notify()**前先使用,如果先**notify()****wait()**,则线程会一直等待

  1. **notify()**只能随机释放一个线程,并不能指定某个特定线程,**notifyAll()**是释放锁对象中的所有线程

**unpark()**可以唤醒指定的线程

  1. 调用**wait()**方法会使当前线程释放锁资源,但使用的前提是必须已经获得了锁

**park()**不会释放锁(因为LockSupport与Monitor无关)

4 park()与中断

  • 当中断标志位true时,线程调用**park()**不会被阻塞

park()的伪代码如下

  1. park() {
  2. if(permit > 0) {
  3. permit = 0;
  4. return;
  5. }
  6. if(中断状态 == true) {
  7. return;
  8. }
  9. 阻塞当前线程; // 将来会从这里被唤醒
  10. if(permit > 0) {
  11. permit = 0;
  12. }
  13. }
  • 还可以发现park()方法并不会清除中断标志位,因此只要中断状态为true,线程就不会被**park()**阻塞
  • 中断线程时通过调用**unpark()**来唤醒线程

    • 被中断时,park()方法并不会抛出异常而只是被唤醒,这样就可以由线程自己决定如何处理中断
    • 在调用thread.interrupt()时,该方法还会调用一次thread.unpark(),从而解除thread的阻塞
  • 实例 ```java Thread t1 = new Thread(() -> { log.debug(“线程第一次阻塞”); LockSupport.park(); log.debug(“线程被唤醒”);

    log.debug(“线程第二次阻塞”); LockSupport.park(); log.debug(“阻塞失败”); }, “t1”); t1.start();

Thread.sleep(1000); log.debug(“中断t1”); t1.interrupt();

  1. 上述代码输出
  2. ```java
  3. 17:28:18 [t1] c.Test - 线程第一次阻塞
  4. 17:28:19 [main] c.Test - 中断t1
  5. 17:28:19 [t1] c.Test - 线程被唤醒
  6. 17:28:19 [t1] c.Test - 线程第二次阻塞
  7. 17:28:19 [t1] c.Test - 阻塞失败

3 atomic包

  • 原子类概述
    • 在并发编程中很容易出现并发安全的问题,比如多个线程执行变量更新操作i++,就有可能获取不到正确的值,而针对这个问题,最常用的方法是通过synchronized加锁来达到线程安全的目的。由于synchronized采用的是悲观锁策略,并不是特别高效的一种解决方案。
    • 实际上,在JUC包下的atomic包提供了一系列的操作简单、性能高效,并能保证线程安全的类更新基本类型变量、数组元素、引用类型以及对象中的字段
    • atomic包下的这些原子类都是采用的都是乐观锁策略去更新数据,其内部使用CAS操作具体实现。

1 原子更新基本类型

  • atomic包提供的原子更新基本类型的类主要有以下三个
    1. AtomicBoolean

以原子更新的方式更新boolean类型数据

  1. AtomicInteger

以原子更新的方式更新int类型数据

  1. AtomicLong

以原子更新的方式更新long类型数据
上述三个类的用法基本一致,以AtomicInteger为例进行分析

2 AtomicInteger类API

构造方法

  • **AtomicInteger()**
  • **AtomicInteger(int initialValue)**

构造一个AtomicInteger对象,可以传入一个int类型参数作为AtomicInteger的值

实例方法

  • **boolean atomicInteger.compareAndSet(int expectedValue, int newValue)**

以原子的方式尝试将对象中的值更新为newValue

  • 如果对象中的旧值与expectedValue不相等,则更新失败并返回false
  • 如果对象中的旧值与expectedValue相等,则更新成功并返回true
  • 除了该方法,其他方法基本都是不断尝试操作,直到操作成功,并且不需要提供预期值
  • **int atomicInteger.incrementAndGet() **
  • **int atomicInteger.getAndIncrement()**

以原子的方式将对象中的值进行加1操作,并返回相加后的结果或旧值

  • 功能上等同于**++i****i++**


  • **int atomicInteger.addAndGet(int delta)**
  • **int atomicInteger.getAndAdd(int delta)**

以原子的方式将对象中的值加上传入的delta,并返回相加后的结果或旧值

  • **int atomicInteger.getAndSet(int newValue)**

将对象中的值更新为新值,并返回旧值

  • **int atomicInteger.updateAndGet(IntUnaryOperator updateFunction)**
  • **int atomicInteger.getAndUpdate(IntUnaryOperator updateFunction)**

以原子的方式更新值,并返回更新后的结果或旧值

  • 该方法的参数是一个函数式接口,其内有一个控制具体更新操作的方法,因此我们可以自定义如何更新值

    1. @FunctionalInterface
    2. public interface IntUnaryOperator {
    3. int applyAsInt(int operand);
    4. }
  • 方法源码

    1. public final int updateAndGet(IntUnaryOperator updateFunction) {
    2. //获取旧值
    3. int prev = get();
    4. //创建新值
    5. int next = 0;
    6. for (boolean haveNext = false;;) {
    7. if (!haveNext)
    8. //根据函数式接口中定义的方法计算新值
    9. next = updateFunction.applyAsInt(prev);
    10. //类似于compareAndSet方法,传入预期值和新值,如果设置成功则返回新值,否则重试
    11. if (weakCompareAndSetVolatile(prev, next))
    12. return next;
    13. haveNext = (prev == (prev = get()));
    14. }
    15. }
  • 实例

    1. AtomicInteger i = new AtomicInteger(5);
    2. i.updateAndGet((val) -> val * 10);

3 原子更新引用

  • atomic包提供的原子更新引用类型的类主要有以下三个

注意,更新的是引用本身,即引用不同的实例

  1. AtomicReference

原子更新引用类型

  1. AtomicStampedReference

原子更新带有戳的引用类型,与AtomicReference的区别是可以记录引用被更改的次数

  1. AtomicMarkableReference

原子更新带有标记位的引用类型,与AtomicReference的区别是可以记录引用是否被更改过
上述三个类的用法基本一致,而且与AtomicInteger类也是比较类似的

  • 通过下述实例即可了解AtomicReference的使用 ```java interface DecimalAccount { //获取余额 BigDecimal getBalance();

    //取款 void withdraw(BigDecimal amount);

    /**

    • 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
    • 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(DecimalAccount account) { List ts = new ArrayList<>(); for (int i = 0; i < 1000; i++) {
      1. ts.add(new Thread(() -> {
      2. account.withdraw(BigDecimal.TEN);
      3. }));
      } ts.forEach(Thread::start); ts.forEach(t -> {
      1. try {
      2. t.join();
      3. } catch (InterruptedException e) {
      4. e.printStackTrace();
      5. }
      }); System.out.println(account.getBalance()); } }

class DecimalAccountSafeCas implements DecimalAccount { private AtomicReference balance;

  1. public DecimalAccountSafeCas(BigDecimal balance) {
  2. this.balance = new AtomicReference<>(balance);
  3. }
  4. @Override
  5. public BigDecimal getBalance() {
  6. return balance.get();
  7. }
  8. @Override
  9. public void withdraw(BigDecimal amount) {
  10. while (true) {
  11. BigDecimal prev = balance.get();
  12. BigDecimal next = prev.subtract(amount);
  13. if (balance.compareAndSet(prev, next)) {
  14. break;
  15. }
  16. }
  17. }

}

@Slf4j() public class Test { public static void main(String[] args) { DecimalAccountSafeCas account = new DecimalAccountSafeCas(new BigDecimal(“10000”)); DecimalAccount.demo(account); } }

  1. - **原子引用类型在比较时,使用的是**`**==**`**进行比较,实例如下**
  2. ```java
  3. class User {
  4. String name;
  5. User(String str) {
  6. name = str;
  7. }
  8. @Override
  9. public String toString() {
  10. return "User{" +
  11. "name='" + name + '\'' +
  12. '}';
  13. }
  14. }
  15. @Slf4j()
  16. public class Test {
  17. public static void main(String[] args) {
  18. User user1 = new User("cyc");
  19. User user2 = new User("cyc");
  20. User user3 = new User("qxj");
  21. AtomicReference<User> atomicReference = new AtomicReference<>(user1);
  22. System.out.println(atomicReference.compareAndSet(user2, user3));
  23. System.out.println(atomicReference.get());
  24. }
  25. }
  • 上述代码输出
    1. false
    2. User{name='cyc'}

4 原子更新数组元素

  • atomic包下提供能原子更新数组中元素的类主要有以下三个
    • AtomicIntegerArray

原子更新整型数组中的元素

  • AtomicLongArray

原子更新长整型数组中的元素

  • AtomicReferenceArray

原子更新引用类型数组中的元素
这几个类的用法一致,以AtomicInteger为例进行分析

  • AtomicIntegerArray类API

该类的方法与AtomicInteger类也是比较类似的,只不过在AtomicIntegerArray的方法中会多一个指定数组索引位的参数i,如

  • **boolean atomicIntegerArray.compareAndSet(int i, int expectedValue, int newValue)**

尝试使用CAS操作更新数组中索引为i的元素

  • **int atomicIntegerArray.getAndSet(int i, int newValue)**

以原子更新的方式将数组中索引为i的元素设置为新值并返回旧值

  • **int atomicIntegerArray.addAndGet(int i, int delta)**

以原子更新的方式将数组中索引为i的元素与delta相加并返回计算结果

  • **int atomicIntegerArray.getAndIncrement(int i)**

以原子更新的方式将数组中索引为i的元素自增加1并返回旧值

5 原子累加器

  • 原子累加器概述

    • atom包提供了两个高性能累加器DoubleAddr和LongAddr,其中LongAddr较为常用,因此以LongAddr为例进行解析
    • AtomicLong也提供有累加操作,但是LongAddr的累加性能比AtomicLong好约4-5倍
  • LongAddr加速累加原理

    • AtomicLong的原理是依靠底层的CAS来保障原子性的更新数据,在要添加或者减少的时候,如果出现竞争就会使用死循环不断地CAS直到满足条件,从而达到更新数据的目的
    • LongAddr性能提升的原因很简单,就是在有竞争时设置多个累加单元Cell,多个线程累加自己的单元,如Therad-0累加Cell[0]、Thread-1累加Cell[1],最后将结果汇总

这样多个线程在累加时操作的不同的Cell单元,因此减少了CAS重试失败,从而提高性能

  1. - LongAddr的累加单元会随着冲突的发生而增加,但是累加单元的个数不会超过CPU核心数

LongAddr源码(待)

4 线程安全集合

1 线程安全集合概述

Java中线程安全集合可以分为三大类
image.png

  1. 遗留的线程安全集合
    1. Hashtable
    2. Vector
  • 特点
    • 集合创建的时间比较早
    • 方法由synchronized关键字修饰来保证线程安全性
    • 在高并发的情况下,每次只有一个线程能够获取synchronized锁对象,性能较差
  1. 经过Collenctions方法修饰的线程安全集合
    1. Collections.synchronizedCollection()
    2. Collections.synchronizedList()
    3. Collections.synchronizedMap()
    4. Collections.synchronizedSet()
    5. Collections.synchronizedNavigableMap()
  • 特点
    • Collections中提供了多个可以将线程不安全的类修饰为线程安全类的方法
    • 线程安全的实现简单粗暴,使用装饰器模式,在调用所有方法前需要先获取synchronized锁对象
    • 在高并发的情况下,每次只有一个线程能够获取synchronized锁对象,性能较差
  1. JUC包提供的线程安全集合

JUC包下提供的线程安全集合众多,但是可以根据它们的前缀将它们分为三大类

  1. Blocking(如LinkedBlockingQueue、ArrayBlockingQueue)

名称含有Blocking的容器是阻塞队列的实现,这一类实现基于ReentrantLock锁,并提供用来阻塞线程的方法

  1. CopyOnWrite(如CopyOnWriteArrayList、CopyOnWriteSet)

前缀为CopyOnWrite的容器的实现是通过在修改时拷贝的方式来保证线程安全,适用于读多写少的场景,写操作的开销较重

  1. Concurrent(如ConcurrentHashMap、ConcurrentLinkedQueue)
  • 前缀为Concurrent的容器一般性能都比较高
  • 内部采用CAS操作优化锁并采用多把锁(锁的粒度比较细),提高并发度和吞吐量
  • 性能较高的同时也存在弊端,即弱一致性。Concurrent容器的弱一致性有以下表现
    • 遍历时弱一致性,即当利用迭代器遍历容器时,如果容器的内容发生修改,迭代器仍然可以继续遍历,并且遍历到的值仍然是旧的

注意:对于非线程安全的集合来说,在迭代时如果集合被其他线程修改,则抛出异常,不再继续遍历

  1. - 获取容器大小时弱一致性,即`**size()**`**方法返回的结果未必正确**
  2. - 读取弱一致性,即**可能出现脏读**

1 ConcurrentHashMap

1 简介

  • HashMap和Hashtable的弊端

HashMap

  • HashMap是非线程安全的
  • JDK8之前HashMap在出现冲突时采用头插法,若在多线程情况下扩容可能会出现CPU使用率接近100%的情况,导致机器卡死,此时产生了并发死链问题
  • 在JDK8之后,由于改变了冲突处理的方法(改成了尾插法),同时还改变了计算hash值的方法,因此不会出现并发死链问题,但仍不意味着线程安全,还可能出现丢数据等问题

Hashtable

  • 为了保证线程安全,我们可以使用古老的Hashtable类,该类基本上所有的方法都采用synchronized进行线程安全的控制。
  • 使用Hashtable时,在高并发情况下每次只有一个线程能够获取synchronized锁对象,这样的并发性能很差
  • ConcurrentHashMap概述
    • ConcurrentHashMap是JUC包提供的高性能线程安全的HashMap实现

ConcurrentHashMap和HashMap一样都实现了Map接口,因此其API与HashMap基本一致

  • ConcurrentHashMap大量使用了synchronized关键字以及CAS操作以保证操作的线程安全性
  • 底层数据结构采用数组+链表+红黑树的数据形式
  • 为什么ConcurrentHashMap不用ReentrantLock而是Synchronzied呢?

实际上,synchronzied做了很多的优化,包括偏向锁、轻量级锁、锁自旋等,可以依次向上升级锁状态,因此synchronized和ReentrantLock在性能上已相差无几。
又因为ConcurrentHashMap没有用到ReentrantLock独有的功能,因此多采用简单的synchronized

2 重要属性和内部类

  1. // 控制hash表的初始化和大小调整的标志
  2. // -1: 表示hash表正在初始化
  3. // -(1+调整表大小的线程数量): 表示hash表正在扩容
  4. // 0: ConcurrentHashMap刚构造时的值,此时hash表为null
  5. // 正数值:表示hash表已经初始化(实际可能还没初始化,因为是懒惰初始化)或扩容完毕,
  6. // 值为hash表下一次扩容的阈值
  7. private transient volatile int sizeCtl;
  8. // HashMap实际的底层数据结构,即hash表
  9. // hash表容量总是2的幂,在第一次插入数据时才初始化(懒惰初始化)
  10. // 使用volatile修饰,因此在操作中会使用CAS操作来保证其线程安全性
  11. transient volatile Node<K,V>[] table;
  12. // hash表中存储的元素,表示链表中的节点,其内保存一对键值、hash值、下一个Node的引用
  13. // 键值对即是HashMap中的键和值,hash值是核心,用于快速定位Node
  14. // Node的hash值为正数
  15. static class Node<K,V> implements Map.Entry<K,V> {
  16. final int hash;
  17. final K key;
  18. volatile V val;
  19. volatile Node<K,V> next;
  20. ...
  21. }
  22. // hash表扩容时:新的hash表
  23. private transient volatile Node<K,V>[] nextTable;
  24. /* ---------------- 特殊的Nod -------------- */
  25. // hash表扩容时:放置在原hash表中,标志该位置的链表已经搬迁到新hash表中
  26. // 在将原hash表中的链表搬迁到新hash表时,每搬迁完成一个hash表的位置(或该位置没有链表),
  27. // 便放置一个ForwardingNode
  28. // 在搬迁时如果其他线程调用get()方法并且在hash表相应位置发现是一个ForwardingNode,则
  29. // 该线程便知道应该去新的hash表中去获取数据
  30. // ForwardingNode的hash值为-1,next为null
  31. static final class ForwardingNode<K,V> extends Node<K,V> {...}
  32. // 在compute和computeIfAbsent时用来占位,计算完成后换为普通的Node
  33. static final class ReservationNode<K,V> extends Node<K,V> {...}
  34. // 红黑树(TreeBin)相关的Node
  35. // 当链表长度超过8时,若hash表容量不足64则对hash表扩容,否则将链表转为红黑树
  36. // 当红黑树中的节点小于6时,红黑树又会转换回链表
  37. // 作为红黑树的头节点,存储root和first
  38. // TreeBin的hash值为-2,next为null
  39. static final class TreeBin<K,V> extends Node<K,V> {...}
  40. // 作为红黑树中的普通节点,存储parent、left、right
  41. static final class TreeNode<K,V> extends Node<K,V> {...}

3 实现流程分析

  • ConcurrentHashMap底层数据结构

ConcurrentHashMap底层使用【Node数组 + 链表(Node)或 红黑树(TreeNode)】实现

以下数组简称(table),链表简称(bin)

  1. 初始化table - CAS

使用CAS创建table来保证并发安全,懒惰初始化

  1. 链表树化/table扩容 - synchronize
    • 当bin.length > 8且table.length < 64时,尝试扩容table

扩容时以bin为单位进行(逐个将bin从老table搬到新table),需要使用synchronized锁住bin,但这时妙的是其它竞争线程也不是无事可做,它们会帮助把其它bin进行扩容,扩容时平均只有1/6的节点会被复制到新table中

  • 当table.length超过64时,会将链表树化,树化过程会用synchronized锁住链表头
    1. put - CAS/synchronize
  • 如果该bin尚未创建,只需要使用CAS创建bin
  • 如果已经有了,synchronized锁住链表头进行后续put操作,元素添加至bin的尾部
    1. get

无锁操作,仅需要保证可见性
table扩容过程中如果get操作拿到的是ForwardingNode,它会让get操作在新table 进行搜索

  1. size(算法思想与LongAddr相同)

元素个数保存在baseCount中,并发时的个数变动保存在CounterCell[]当中,最后统计size时累加即可

可以发现线程安全措施主要在以下两个方面

  1. 创建

ConcurrentHashMap有两种创建操作,分别是创建hash表创建链表头节点
创建操作均使用CAS来保证原子性

  1. 更改

ConcurrentHashMap有三种更改操作,分别是向链表中加入新节点修改链表为红黑树hash表扩容
上述更改操作均使用synchronized关键字锁住头节点的方式来保证线程安全性

4 源码分析

1 重要工具方法

  1. // 获取hash表中第i个链表的链表头
  2. static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
  3. // 使用CAS操作修改hash表中第i个链表的链表头的值,c为旧值,v为新值
  4. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
  5. // 直接修改hash表中第i个链表的链表头的值,v为新值
  6. static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

2 构造器源码分析

  • ConcurrentHashMap有多种重载的构造器,以参数最全的构造器为例

    1. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    2. // initialCapacity:ConcurrentHashMap的初始容量
    3. // loadFactor: 负载因子,即hash表扩容时的阈值,默认为0.75
    4. // concurrencyLevel: 并发度
    5. if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
    6. throw new IllegalArgumentException();
    7. // 当初始容量小于并发度时,将初始容量改为并发度大小
    8. if (initialCapacity < concurrencyLevel)
    9. initialCapacity = concurrencyLevel;
    10. // cap是hash表实际的初始容量,需要对指定的初始容量进行两步处理
    11. // size = 初始容量除以负载因子并向上取整
    12. long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    13. // cap = size向上取2的幂,如size=11,则cap=16
    14. int cap = (size >= (long)MAXIMUM_CAPACITY) ?
    15. MAXIMUM_CAPACITY : tableSizeFor((int)size);
    16. // 将hash表下一次大小存入sizeCtl
    17. this.sizeCtl = cap;
    18. }
    • 可以看到hash表实现了懒惰初始化,在构造方法中仅仅计算了table的大小

3 get()源码分析

  1. public V get(Object key) {
  2. Node<K,V>[] tab;
  3. Node<K,V> e, p;
  4. int n, eh;
  5. K ek;
  6. // spread()方法类似于HashMap中的扰动方法hash()
  7. // h为key对应的hash值
  8. int h = spread(key.hashCode());
  9. // 当hash表不为空且其中有元素,则调用tabAt()返回hash表(n - 1) & h位置的链表头节点e
  10. // (n - 1) & h相当于取模运算,比取模运算效率更高
  11. if ((tab = table) != null && (n = tab.length) > 0 &&
  12. (e = tabAt(tab, (n - 1) & h)) != null) {
  13. // 如果头节点就是要找的key,直接返回头节点的val
  14. if ((eh = e.hash) == h) {
  15. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  16. return e.val;
  17. }
  18. // 当头节点的hash值为负数,表示hash表当前位置的节点为ForwardingNode或TreeBin
  19. // 此时需要调用它们的find()方法来查找对应的节点
  20. else if (eh < 0)
  21. return (p = e.find(h, key)) != null ? p.val : null;
  22. //遍历链表来查找目标Node
  23. while ((e = e.next) != null) {
  24. if (e.hash == h &&
  25. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  26. return e.val;
  27. }
  28. }
  29. return null;
  30. }
  • 从源码中可以看到整个get()方法中没有使用任何线程安全措施
  • ConcurrentHashMap与HashMap的get()方法几乎一样,但是HashMap中没有ForwardingNode,而ConcurrentHashMap的get()方法可能调用ForwardingNode的find()方法

4 put()源码分析

  • put()方法的线程安全机制可以使多个线程并行写入hash表的不同位置 ```java public V put(K key, V value) { return putVal(key, value, false); }

final V putVal(K key, V value, boolean onlyIfAbsent) { // onlyIfAbsent: 表示当key已经存在时,是否用新值覆盖旧值

  1. // HashMap允许有空的key或value,而ConcurrentHashMap不允许有空的key或value
  2. if (key == null || value == null) throw new NullPointerException();
  3. // 获取key的hash值
  4. int hash = spread(key.hashCode());
  5. // 链表节点计数(链表长度)
  6. int binCount = 0;
  7. // 进入死循环
  8. for (Node<K,V>[] tab = table;;) {
  9. // f是链表头节点,fh是头节点的hash值,fv是头节点的val
  10. // i是链表在hash表的下标,n是hash表的长度
  11. Node<K,V> f; int n, i, fh; K fk; V fv;
  12. // 当hash表还没有创建,调用initTable()创建hash表
  13. if (tab == null || (n = tab.length) == 0)
  14. // initTable()方法使用CAS操作保证不会有多个线程同时创建hash表
  15. tab = initTable();
  16. // 当链表头节点为null,创建头节点并使用CAS操作将头节点置于hash表相应位置
  17. // 如果成功则退出循环
  18. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  19. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
  20. break;
  21. }
  22. // 当头节点是ForwardingNode,则帮忙扩容(帮助搬迁链表)
  23. else if ((fh = f.hash) == MOVED)
  24. tab = helpTransfer(tab, f);
  25. else if (onlyIfAbsent // check first node without acquiring lock
  26. && fh == hash
  27. && ((fk = f.key) == key || (fk != null && key.equals(fk)))
  28. && (fv = f.val) != null)
  29. return fv;
  30. else {
  31. // 进入到当前else,说明发生了hash值冲突
  32. V oldVal = null;
  33. // 锁住链表头节点(锁的不是整个hash表,锁的粒度很细)
  34. synchronized (f) {
  35. // 再次确认一下hash表当前位置的头节点没有被移动过
  36. if (tabAt(tab, i) == f) {
  37. // 如果头节点的hash值大于0,说明是普通节点(是一个链表)
  38. if (fh >= 0) {
  39. binCount = 1;
  40. // 遍历链表
  41. for (Node<K,V> e = f;; ++binCount) {
  42. K ek;
  43. // 找到相同的key
  44. if (e.hash == hash &&
  45. ((ek = e.key) == key ||
  46. (ek != null && key.equals(ek)))) {
  47. oldVal = e.val;
  48. // 用新的value覆盖老的value
  49. if (!onlyIfAbsent)
  50. e.val = value;
  51. break;
  52. }
  53. Node<K,V> pred = e;
  54. // 到最后一个节点了,仍然没找到
  55. if ((e = e.next) == null) {
  56. // 将当前的key和value追加到链表中
  57. pred.next = new Node<K,V>(hash, key, value);
  58. break;
  59. }
  60. }
  61. }
  62. // 当头节点的hash值小于0,说明是红黑树头节点(或ReservationNode)
  63. else if (f instanceof TreeBin) {
  64. Node<K,V> p;
  65. // 红黑树的binCount固定为2
  66. binCount = 2;
  67. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  68. value)) != null) {
  69. oldVal = p.val;
  70. if (!onlyIfAbsent)
  71. p.val = value;
  72. }
  73. }
  74. else if (f instanceof ReservationNode)
  75. throw new IllegalStateException("Recursive update");
  76. }
  77. // 释放锁
  78. }
  79. // 当binCount!=0,说明之前发生了hash冲突
  80. if (binCount != 0) {
  81. // 检查链表长度是否超过阈值(默认为8),如果超过阈值则需要将链表转换为红黑树
  82. if (binCount >= TREEIFY_THRESHOLD)
  83. // treeifyBin()不一定直接将链表转为红黑树,可能会先尝试扩容hash表
  84. treeifyBin(tab, i);
  85. if (oldVal != null)
  86. return oldVal;
  87. break;
  88. }
  89. }
  90. }
  91. // 基于LongAddr累加器,增加size的计数
  92. addCount(1L, binCount);
  93. return null;

}

  1. <a name="BCFTG"></a>
  2. #### 5 initTable()源码分析
  3. ```java
  4. // 该方法需要保证多个线程调用该方法时,只有一个线程可以成功创建hash表
  5. private final Node<K,V>[] initTable() {
  6. Node<K,V>[] tab; int sc;
  7. // 进入循环,条件是hash表还没有被创建
  8. while ((tab = table) == null || tab.length == 0) {
  9. // 当发现sizeCtl变为了负数,说明有其他线程在创建hash表,则让出当前CPU时间片
  10. // 该操作是防止没有执行创建hash表操作的线程在while循环中长期占用CPU
  11. if ((sc = sizeCtl) < 0)
  12. Thread.yield();
  13. // 尝试使用CAS操作将sizeCtl的值改为-1,表示正在初始化hash表
  14. // 此过程只可能有一个线程进入else if代码块执行,其他线程执行失败将进入下一轮while循环
  15. else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
  16. try {
  17. if ((tab = table) == null || tab.length == 0) {
  18. // 如果sizeCtl值大于0,则hash表的容量为sizeCtl,否则为默认值
  19. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  20. // 创建容量为n的Node数组并赋给hash表
  21. @SuppressWarnings("unchecked")
  22. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  23. table = tab = nt;
  24. // 重新计算sizeCtl的值,表示下一次扩容时hash表的容量
  25. sc = n - (n >>> 2);
  26. }
  27. } finally {
  28. sizeCtl = sc;
  29. }
  30. break;
  31. }
  32. }
  33. return tab;
  34. }

6 addCount()源码解析

  • put()方法会调用该方法来累加size,由于可能有多个线程累加size,因此该方法内有LongAddr的思想
    1. private final void addCount(long x, int check) {
    2. CounterCell[] cs; long b, s;
    3. if ((cs = counterCells) != null ||
    4. !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    5. CounterCell c; long v; int m;
    6. boolean uncontended = true;
    7. if (cs == null || (m = cs.length - 1) < 0 ||
    8. (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
    9. !(uncontended =
    10. U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
    11. fullAddCount(x, uncontended);
    12. return;
    13. }
    14. if (check <= 1)
    15. return;
    16. s = sumCount();
    17. }
    18. if (check >= 0) {
    19. Node<K,V>[] tab, nt; int n, sc;
    20. while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    21. (n = tab.length) < MAXIMUM_CAPACITY) {
    22. int rs = resizeStamp(n);
    23. if (sc < 0) {
    24. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    25. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    26. transferIndex <= 0)
    27. break;
    28. if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
    29. transfer(tab, nt);
    30. }
    31. else if (U.compareAndSetInt(this, SIZECTL, sc,
    32. (rs << RESIZE_STAMP_SHIFT) + 2))
    33. transfer(tab, null);
    34. s = sumCount();
    35. }
    36. }
    37. }

2 LinkedBlockingQueue

1 简介

  • LinkedBlockingQueue底层数据结构

顾名思义,LinkedBlockingQueue是底层是链表的阻塞队列

  • LinkedBlockingQueue的大小

LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE,当然也可以在构造函数的参数中指定大小

  • LinkedBlockingQueue三种添加元素方法

三种方法都是向阻塞队列队尾添加元素,不同的是当队列满时三种方法有不同的处理方式

  • **boolean add(E e)**

队列满时抛出IllegalStateException异常

  • **boolean offer(E e)**

队列满时返回false

  • **void put(E e) throws InterruptedException**

队列满时将会进入Condition对象notFull中等待

  • LinkedBlockingQueue三种取元素方法

三种方法都是从阻塞队列队头取元素,不同的是当队列为空时三种方法有不同的处理方式

  • **E remove()**

队列空时抛出NoSuchElementException异常

  • **E poll()**

队列空时返回null

  • **E take() throws InterruptedException**

队列空时进入Condition对象notEmpty中等待

2 重要属性和内部类

  1. // 链表中的节点,其中包含节点关联的对象item以及指向下一个节点的指针
  2. static class Node<E> {
  3. E item;
  4. Node<E> next;
  5. Node(E x) { item = x; }
  6. }
  7. // 阻塞队列中元素的个数
  8. private final AtomicInteger count = new AtomicInteger();
  9. // 链表的头节点,头节点不关联对象(head.item == null)
  10. transient Node<E> head;
  11. // 链表的尾节点
  12. private transient Node<E> last;
  13. // 保护从队列取元素操作的锁
  14. private final ReentrantLock takeLock = new ReentrantLock();
  15. // 等待取元素的等待队列,当队列为空时,想要取元素的线程将进入该Condition等待
  16. private final Condition notEmpty = takeLock.newCondition();
  17. // 保护向阻塞队列放元素操作的锁
  18. private final ReentrantLock putLock = new ReentrantLock();
  19. // 等待存元素的等待队列,当队列满时,想要存元素的线程将进入该Condition等待
  20. private final Condition notFull = putLock.newCondition();

3 实现流程分析

  1. 初始化链表last = head = new Node<E>(null)

Dummy节点用来占位,item为null
image.png

  1. 当一个节点入队last = last.next = node

image.png
再来一个节点入队,last = last.next = node
image.png

  1. 出队

h = head
image.png
first = h.next
image.png
h.next = h,这样做用于帮助GC
image.png
head = first,临时变量first和h没有用了
image.png
E x = first.item;``first.item = null;``return x;,此时first指向的节点成为Dummy
image.png

4 加锁分析

  • LinkedBlockingQueue保证线程安全时的巧妙之处

LinkedBlockingQueue保证线程安全且尽量保持性能的核心在于使用了两把锁和Dummy节点

  • 如果使用一把锁,同一时刻最多只允许有一个线程(生产者或消费者,二选一)执行
  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行,此时消费者与消费者线程仍然串行;生产者与生产者线程仍然串行
  • Dummy节点的引入是让两把锁保护不同的对象,避免竞争
  • LinkedBlockingQueue两把锁保护行为分析
    • 当链表节点总数大于等于2时(包括Dummy节点),putLock保证的是last节点相关操作的线程安全,takeLock保证的是head节点相关操作的线程安全,两把锁合作保证了入队和出队的线程安全
    • 当节点总数等于1时(就一个 dummy 节点),两把锁保护的是同一个对象,但是此时取元素的线程会被notEmpty条件阻塞,因此也不存在竞争

5 源码分析

1 put()源码分析

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. final int c;
  4. // 将放入队列的元素e包装为Node
  5. final Node<E> node = new Node<E>(e);
  6. final ReentrantLock putLock = this.putLock;
  7. // 维护队列中元素的个数
  8. final AtomicInteger count = this.count;
  9. // 操作尾结点前先上锁,上锁时可被打断
  10. putLock.lockInterruptibly();
  11. try {
  12. // 判断队列是否已满,满了就进入notFull等待
  13. while (count.get() == capacity) {
  14. notFull.await();
  15. }
  16. // 入队,就是将node加入队列尾部并让last指向node
  17. enqueue(node);
  18. // 元素个数增1
  19. c = count.getAndIncrement();
  20. // 如果队列中还有空位,叫醒其他put()线程
  21. // 这一操作的原因是消费者在消费后使用的并不是singleAll()去叫醒所有生产者
  22. // 因此每一个生产者在生产后还需要负责唤醒另一个生产者
  23. if (c + 1 < capacity)
  24. notFull.signal();
  25. } finally {
  26. putLock.unlock();
  27. }
  28. // 如果队列中有一个元素,唤醒一个消费者线程
  29. if (c == 0)
  30. // 这里使用signal()而不是signalALL()是为了减少竞争
  31. signalNotEmpty();
  32. }

2 take()源码分析

  • put()方法的思路基本一致,只不过是反过来的
    1. public E take() throws InterruptedException {
    2. final E x;
    3. final int c;
    4. final AtomicInteger count = this.count;
    5. final ReentrantLock takeLock = this.takeLock;
    6. takeLock.lockInterruptibly();
    7. try {
    8. while (count.get() == 0) {
    9. notEmpty.await();
    10. }
    11. // 出队方法
    12. x = dequeue();
    13. c = count.getAndDecrement();
    14. // 消费者唤醒另一个消费者
    15. if (c > 1)
    16. notEmpty.signal();
    17. } finally {
    18. takeLock.unlock();
    19. }
    20. // 如果队列中只有一个空位时,唤醒生产者线程
    21. if (c == capacity)
    22. signalNotFull();
    23. return x;
    24. }

6 LinkedBlockingQueue与ArrayBlockingQueue的对比

  1. Linked支持有界,Array强制有界
  2. Linked底层实现是链表,Array底层实现是数组
  3. Linked节点的创建是懒惰的,而Array需要提前初始化Node数组

因此LinkedBlockingQueue比较节省内存

  1. Linked每次入队会生成新Node,而Array的Node是提前创建好的
  2. Linked两把锁,Array一把锁(因为只有一个数组,因此只能一把锁)

因此LinkedBlockingQueue的性能是优于ArrayBlockingQueue

7 LinkedBlockingQueue与ConcurrentLinkedQueue的对比

ConcurrentLinkedQueue的设计与LinkedBlockingQueue非常像,也是

  • 两把“锁”,同一时刻可以允许两个线程同时(一个生产者与一个消费者)执行
  • 引入了Dummy节点

不同的是,ConcurrentLinkedQueue的“锁”实际上是使用CAS来实现的,无阻塞

3 CopyOnWriteArrayList

  • CopyOnWriteArrayList概述

    • CopyOnWriteArrayList底层实现采用了写入时拷贝的思想,即增删改操作时会先将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,实现读写分离
    • 读写锁只能做到读-读并发,而CopyOnWriteArrayList可以做到读-写并发
    • CopyOnWriteArrayList适合读多写少的场景
  • CopyOnWriteArrayList的弱一致性

    • CopyOnWriteArrayList写入时拷贝的思想难以避免弱一致性
    • 并发高和一致性是矛盾的,需要权衡