悲观锁和乐观锁

悲观锁总是假设当前就是出于并发状态,直接对临界资源进行上锁,保证并发性;

乐观锁在操作临界资源时,不会一上来就加锁,而是先检测当前的一个数据状态,如果存在冲突,在进入自旋,直到可以更新数据。

悲观锁和乐观锁存在的问题

悲观锁存在的问题

  1. 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题
  2. 一个线程持有锁后,会导致其他所有抢占此锁的线程挂起
  3. 如果一个优先级高的线程等待一个优先级低的线程释放锁,就会导致线程的优先级倒置,从而引发性能风险

以上这些悲观锁存在的问题就可以使用乐观锁进行替代,提升性能。

乐观锁存在的问题

  1. 并发度很高的情况下,除了获取锁的线程,其他线程会进入无限循环的自旋状态,导致 CPU 资源浪费
  2. 由于 CPU 会通过 MESI 协议保证变量的缓存一致性,那么就需要通过总线在不同的内核上来回通信。在高度并发的情况下,总线就会成为性能瓶颈,造成所谓的“总线风暴”现象

使用 CAS 实现乐观锁

乐观锁的操作分为两步:

  1. 冲突检测
  2. 数据更新

Java 提供的 CAS 原子操作可以用来实现乐观锁。 CAS 操作也可以分为两个步骤:

  1. 检测内存位置的值是否为期待值
  2. 如果是,就将该内存位置更新为最新值;否则不要更改该位置

实际上,如果需要完成数据的最终更新,仅仅进行一次 CAS 操作是不够的,一般情况下,需要进行自旋操作,即不断地循环重试 CAS 操作直到成功,这也叫 CAS 自旋。

什么是自旋锁

当一个线程在获取锁的时候,如果锁已经被其他线程获取,调用者就一直在那里循环检查该锁是否已经被释放,一直到获取到锁才会退出循环。这就是自旋锁的定义。

CAS 自旋锁的实现原理为:抢锁线程不断进行 CAS 自旋操作去更新锁的 owner(拥有者),如果更新成功,就表明已经抢锁成功,退出抢锁方法。如果锁已经被其他线程获取(也就是 owner 为其他线程),调用者就一直在那里循环进行 owner 的 CAS 更新操作,一直到成功才会退出循环。

不可重入的自旋锁

  1. public class NoReentrantSpinLock implements Lock {
  2. /**
  3. * 当前锁的拥有者
  4. */
  5. private AtomicReference<Thread> owner = new AtomicReference<>();
  6. @Override
  7. public void lock() {
  8. Thread t = Thread.currentThread();
  9. // 进入自旋,不可重入
  10. while (owner.compareAndSet(null, t)) {
  11. // 让出CPU时间片
  12. Thread.yield();
  13. }
  14. }
  15. @Override
  16. public void unlock() {
  17. Thread t = Thread.currentThread();
  18. if (t == owner.get()) {
  19. owner.set(null);
  20. }
  21. }
  22. ......
  23. }

可重入的自旋锁

  1. public class ReentrantSpinLock implements Lock {
  2. /**
  3. * 当前锁的拥有者
  4. */
  5. private AtomicReference<Thread> owner = new AtomicReference<>();
  6. /**
  7. * 记录一个线程重复获取到锁的次数
  8. * <p>
  9. * 此变量为同一个线程在操作,没有必要加上volatile保证可见性和有序性
  10. */
  11. private int count = 0;
  12. @Override
  13. public void lock() {
  14. Thread t = Thread.currentThread();
  15. if (t == owner.get()) {
  16. count++;
  17. return;
  18. }
  19. // 进入自旋,不可重入
  20. while (owner.compareAndSet(null, t)) {
  21. // 让出CPU时间片
  22. Thread.yield();
  23. }
  24. }
  25. @Override
  26. public void unlock() {
  27. Thread t = Thread.currentThread();
  28. if (t == owner.get()) {
  29. if (count > 0) {
  30. // 如果重入次数大于0,减少重入计数
  31. --count;
  32. } else {
  33. owner.set(null);
  34. }
  35. }
  36. }
  37. ......
  38. }

可以看到,可重入与不可重入的区别在于:可重入引入了一个重入次数计数器,只有在计数器为 0 时才表名当前锁处于被释放状态,其他线程才能进行获取。

什么是总线风暴

目前的 CPU 架构大体可以分为三类:

  1. 多处理器结构(Symmetric Multi-Processor,SMP)
  2. 非一致存储访问结构(Non-Uniform Memory Access,NUMA)
  3. 海量并行处理结构(Massive Parallel Processing,MPP)

CAS 对应的硬件指令为 cmpxchg,在多核 CPU 下,就会在 cmpxchg 指令前加上 lock 前缀指令。lock 前缀指令有以下3个作用:

  1. 将当前 CPU 缓存行的数据立即写回系统内存
  2. lock 前缀指令会引起在其他 CPU 中缓存了该内存地址的数据无效
  3. lock 前缀指令禁止指令重排

常见的 PC、手机、老式服务器都是 SMP 架构,其架构简单,但拓展性能非常差。在 SMP 架构的 CPU 平台上,所有的 Core(内核)会共享一条总线(BUS),靠此总线连接主存。每个核都有自己的高速缓存,各核相对于 BUS 对称分布。因此,这种结构称为“对称多处理器”。一个 8 核的 SMP 架构 CPU 大致如下图所示:
image.png

假设 Core 1 和 Core 2 可同时把某个变量加载到自己的高速缓存中,当 Core 1 在自己的高速缓存中修改这个位置的值时,会通过总线使 Core 2 中 L1 高速缓存对应的值“失效”,而 Core 2 一旦发现自己缓存中的值失效,就会通过总线从内存中读取最新的值,使得 Core 2 和 Core 1 中的值再次一致,这样 CPU 就保障了变量的“缓存一致性”。

CPU会通过MESI协议保障变量的缓存一致性。为了保障“缓存一致性”,不同的内核需要通过总线来回通信,因而所产生的流量一般被称为“缓存一致性流量”。因为总线被设计为固定的“通信能力”,如果缓存一致性流量过大,总线将成为瓶颈,这就是所谓的“总线风暴”。

注意:在 SMP 架构上会产生“总线风暴”是因为 SMP 架构采用了 lock 前缀指令产生了缓存一致性流量,而在很多线程同时执行 lock 前缀指令时导致流量过剩。而其他架构的 CPU 并不一定会产生“总线风暴”。

那 CAS 怎么避免总线风暴呢?答案就是使用队列对抢锁线性进行排序,最大程度上削减 CAS 操作数量。

CLH 自旋锁

CLH 锁其实就是一种基于队列(具体为单向链表)排队的自旋锁,由于是 Craig、Landin 和 Hagersten 三人一起发明的,因此被命名为 CLH 锁,也叫 CLH 队列锁。

简单的 CLH 锁可以基于单向链表实现,申请加锁的线程首先会通过 CAS 操作在单向链表的尾部增加一个节点,之后该线程只需要在其前驱节点上进行普通自旋,等待前驱节点释放锁即可。由于 CLH 锁只有在节点入队时进行一下 CAS 的操作,在节点加入队列之后,抢锁线程不需要进行 CAS 自旋,只需普通自旋即可。因此,在争用激烈的场景下,CLH 锁能大大减少 CAS 操作的数量,以避免 CPU 的总线风暴。

  1. public class CLHLock implements Lock {
  2. /**
  3. * 当前节点的本地线程变量
  4. */
  5. private static ThreadLocal<Node> curNodeLocal = new ThreadLocal<>();
  6. /**
  7. * CLHLock队列的尾指针,使用AtomicReference,方便进行CAS操作
  8. */
  9. private AtomicReference<Node> tail = new AtomicReference<>();
  10. public CLHLock() {
  11. // 操作1
  12. tail.getAndSet(Node.EMPTY);
  13. }
  14. @Override
  15. public void lock() {
  16. // 线程加锁,locked为true
  17. Node curNode = new Node(true, null);
  18. Node preNode = tail.get();
  19. // 操作2 CAS自旋,将当前节点插入到队列的尾部
  20. while (!tail.compareAndSet(preNode, curNode)) {
  21. preNode = tail.get();
  22. }
  23. // 操作2中设置了链表的尾节点,这里将上一个为节点设置为当前节点的前驱结点
  24. curNode.setPreview(preNode);
  25. // 操作3
  26. // 监听前驱节点的locked变量,直到其值为false
  27. // 若前继节点的locked状态为true,则表示前一线程还在抢占或者占有锁
  28. while (curNode.getPreview().isLocked()) {
  29. //让出CPU时间片,提高性能
  30. Thread.yield();
  31. }
  32. // 能执行到这里,说明当前线程获取到了锁
  33. // Print.tcfo("获取到了锁!!!");
  34. //设置在线程本地变量中,用于释放锁
  35. curNodeLocal.set(curNode);
  36. }
  37. @Override
  38. public void unlock() {
  39. // 操作4
  40. Node curNode = curNodeLocal.get();
  41. curNode.setPreview(null); // help for GC
  42. curNodeLocal.set(null);
  43. // 释放自旋锁,上面的while循环将得到执行,即下一个Node开始执行
  44. curNode.setLocked(false);
  45. }
  46. ......
  47. static class Node {
  48. /**
  49. * true:正在抢锁或者已经拥有锁
  50. * false:当前线程已经释放锁,下一个线程可以占有锁
  51. */
  52. private boolean locked;
  53. /**
  54. * 上一个节点,需要监听locked字段
  55. */
  56. private Node preview;
  57. private static final Node EMPTY = new Node(false, null);
  58. }
  59. }
  1. 操作1 - tail 属性使用 AtomicReference 类型是为了使得多个线程并发操作tail时不会发生线程安全问题
  2. 操作2 - Thread 在抢锁时会创建一个新的 Node 加入等待队列尾部:tail 指向新的 Node,同时新的 Node 的 preview 属性指向 tail 之前指向的节点,并且以上操作通过 CAS 自旋完成,以确保操作成功
  3. 操作3 - Thread 加入抢锁队列之后,会在前驱节点上自旋:循环判断前驱节点的 locked 属性是否为 false,如果为 false 就表示前驱节点释放了锁,当前线程抢占到锁
  4. 操作4 - Thread 抢到锁之后,它的 locked 属性一直为 true,一直到临界区代码执行完,然后调用 unlock() 方法释放锁,释放之后其 locked 属性才为 false。释放锁操作为:线程从本地变量 curNodeLocal 中获取当前节点 curNode,将其状态设置为 false,以便其后继节点能获得锁。

CLH 锁的抢占过程:

假如有这么一个场景:有三个并发线程同时抢占 CLHLock 锁,三个线程的实际执行顺序为 Thread A<—Thread B<—Thread C。

  1. 线程A开始执行了 lock 操作,创建一个节点 nodeA,设置它的 locked 状态为 true,然后设置它的前驱为 CLHLock.tail(此时为 EMPTY),并将 CLHLock.tail 设置为 nodeA,之后线程 A 开始在它的前驱节点上进行普通自旋。如下图所示:

image.png

  1. 线程 B 开始执行 lock 操作,创建一个节点 nodeB,设置它的 locked 状态为 true,然后设置它的前驱节点为 CLHLock.tail(此时为 nodeA),并将 CLHLock.tail 设置为 nodeB,之后线程 B 开始在它的前驱节点上进行普通自旋。如下图所示:

image.png

  1. 线程 C 开始执行 lock 操作,创建一个节点 nodeC,设置它的 locked 状态为 true,然后设置它的前驱节点为 CLHLock.tail(此时为 nodeB),并将 CLHLock.tail 设置为 nodeC,之后线程 C 开始在它的前驱节点上进行普通自旋。如下图所示:

image.png

  1. 线程 A 执行完临界区代码后开始 unlock(释放)操作,设置 nodeA 的前驱引用为 null,锁状态 locked 为 false。如下图所示:

image.png

  1. 线程 B 执行抢到锁并且完成临界区代码的执行后,开始 unlock(释放)操作,设置 nodeB 的前驱引用为 null,锁状态 locked 为 false。如下图所示:

image.png

  1. 线程 C 执行抢到锁并且完成临界区代码的执行后,开始 unlock(释放)操作,设置 nodeC 的前驱引用为 null,锁状态 locked 为 false。如下图所示:

image.png

CLH 锁是一种队列锁,其优点是空间复杂度低。如果有 N 个线程、L 个锁,每个线程每次只获取一个锁,那么需要的存储空间是 O(L+N),N 个线程有 N 个Node,L 个锁有 L 个 Tail。

CLH 队列锁的一个显著缺点是它在 NUMA 架构的 CPU 平台上性能很差。CLH 队列锁在 NUMA 架构的 CPU 平台上,每个 CPU 内核有自己的内存,如果前驱节点在不同的 CPU 内核上,它的内存位置比较远,在自旋判断前驱节点的 locked 属性时,性能将大打折扣。然而,CLH 锁在 SMP 架构的 CPU 平台上则不存在这个问题,性能还是挺高的。

公平锁与非公平锁

  1. 公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,抢锁成功的次序体现为 FIFO(先进先出)顺序。

  1. 非公平锁

非公平锁是指多个线程获取锁的顺序并不一定是其申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,抢锁成功的次序不一定体现为 FIFO(先进先出)顺序。非公平锁的优点在于吞吐量比公平锁大,它的缺点是有可能会导致线程优先级反转或者线程饥饿现象。

ReentrantLock 默认为非公平锁,但是也可以根据构造器传入的 boolean 类型来选择创建的锁的类型:

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }
  4. public ReentrantLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. }

可中断锁与不可中断锁

可中断锁是指抢占过程可以被中断的锁,例如 ReentrantLock;可中断锁是指抢占过程不可以被中断的锁,例如 synchronized。

在 Lock 接口中,提供了两个方法用来中断抢占:

  1. lockInterruptibly()
  2. tryLock(long timeout,TimeUnit unit)

共享锁与独占锁

在访问共享资源之前进行加锁操作,在访问完成之后进行解锁操作。按照“是否允许在同一时刻被多个线程持有”来区分,锁可以分为共享锁与独占锁。

独占锁

在访问共享资源之前进行加锁操作,在访问完成之后进行解锁操作。按照“是否允许在同一时刻被多个线程持有”来区分,锁可以分为共享锁与独占锁。

Java 中的 Synchronized 内置锁和 ReentrantLock 显式锁都是独占锁。

共享锁

共享锁就是在同一时刻允许多个线程持有的锁。当然,获得共享锁的线程只能读取临界区的数据,不能修改临界区的数据。

JUC 中的 Semaphore(信号量)、ReadLock(读锁)以及 CountDownLatch(倒数闩)都属于共享锁。

Semaphore 概述

Semaphore 可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。Semaphore 维护了一组虚拟许可,它的数量可以通过构造器的参数指定。线程在访问共享资源前必须调用Semaphore 的 acquire() 方法获得许可,如果许可数量为 0,该线程就一直阻塞。线程访问完资源后,必须调用 Semaphore 的 release() 方法释放许可。更形象的说法是:Semaphore是一个许可管理器。

Semaphore 常用方法

  1. /**
  2. * 构造一个Semaphore实例,初始化其管理的许可数量为permits参数值
  3. *
  4. * 默认为非公平锁
  5. */
  6. public Semaphore(int permits) {
  7. sync = new NonfairSync(permits);
  8. }
  9. public Semaphore(int permits, boolean fair) {
  10. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  11. }
  12. /**
  13. * 获取 Semaphore 对象可用的许可数量
  14. */
  15. public int availablePermits() {
  16. return sync.getPermits();
  17. }
  18. /**
  19. * 当前线程尝试获取 Semaphore 对象的一个许可。此过程是阻塞的,线程会一直等待 Semaphore 发放
  20. * 一个许可,直到发生以下任意一件事:
  21. *
  22. * 1. 当前线程获取了一个可用的许可
  23. * 2. 当前线程被中断,就会抛出 InterruptedException 异常,并停止等待,继续往下执行
  24. */
  25. public void acquire() throws InterruptedException {
  26. sync.acquireSharedInterruptibly(1);
  27. }
  28. /**
  29. * 当前线程尝试阻塞地获取 permits 个许可。此过程是阻塞的,线程会一直等待 Semaphore 发放
  30. * permits 个许可。如果没有足够的许可而当前线程被中断,就会抛出InterruptedException异常
  31. * 并终止阻塞
  32. */
  33. public void acquire(int permits) throws InterruptedException {
  34. if (permits < 0) throw new IllegalArgumentException();
  35. sync.acquireSharedInterruptibly(permits);
  36. }
  37. // 当前线程尝试阻塞地获取一个许可,阻塞的过程不可中断,直到成功获取一个许可
  38. public void acquireUninterruptibly() {
  39. sync.acquireShared(1);
  40. }
  41. // 当前线程尝试阻塞地获取 permits 个许可,阻塞的过程不可中断,直到成功获取 permits 个许可
  42. public void acquireUninterruptibly(int permits) {
  43. if (permits < 0) throw new IllegalArgumentException();
  44. sync.acquireShared(permits);
  45. }
  46. // 当前线程尝试获取一个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程
  47. // 成功获取了一个许可,就返回 true;如果当前线程没有获得许可,就返回 false
  48. public boolean tryAcquire() {
  49. return sync.nonfairTryAcquireShared(1) >= 0;
  50. }
  51. // 当前线程尝试获取 permits 个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果线程
  52. // 成功获取了 permits 个许可,就返回 true;如果当前线程没有获得 permits 个许可,就返回false
  53. public boolean tryAcquire(int permits) {
  54. if (permits < 0) throw new IllegalArgumentException();
  55. return sync.nonfairTryAcquireShared(permits) >= 0;
  56. }
  57. /**
  58. * 限时获取一个许可。此过程是阻塞的,会一直等待许可,直到发生以下任意一件事:
  59. *
  60. * 1. 当前线程获取了一个许可,则会停止等待,继续执行,并返回 true
  61. * 2. 当前线程等待 timeout 后超时,则会停止等待,继续执行,并返回 false
  62. * 3. 当前线程在 timeout 时间内被中断,则会抛出 InterruptedException 异常,并停止等待,继续执行
  63. */
  64. public boolean tryAcquire(long timeout, TimeUnit unit)
  65. throws InterruptedException {
  66. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  67. }
  68. /**
  69. * 限时获取 permits 个许可。此过程是阻塞的,会一直等待许可,直到发生以下任意一件事:
  70. *
  71. * 1. 当前线程获取了 permits 个许可,则会停止等待,继续执行,并返回 true
  72. * 2. 当前线程等待 timeout 后超时,则会停止等待,继续执行,并返回 false
  73. * 3. 当前线程在 timeout 时间内被中断,则会抛出 InterruptedException 异常,并停止等待,继续执行
  74. */
  75. public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
  76. throws InterruptedException {
  77. if (permits < 0) throw new IllegalArgumentException();
  78. return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
  79. }
  80. // 当前线程释放一个可用的许可
  81. public void release() {
  82. sync.releaseShared(1);
  83. }
  84. // 当前线程释放permits个可用的许可
  85. public void release(int permits) {
  86. if (permits < 0) throw new IllegalArgumentException();
  87. sync.releaseShared(permits);
  88. }
  89. // 当前线程获得剩余的所有可用许可
  90. public int drainPermits() {
  91. return sync.drainPermits();
  92. }
  93. // 判断当前 Semaphore 对象上是否存在正在等待许可的线程
  94. public final boolean hasQueuedThreads() {
  95. return sync.hasQueuedThreads();
  96. }
  97. // 获取当前 Semaphore 对象上正在等待许可的线程数量
  98. public final int getQueueLength() {
  99. return sync.getQueueLength();
  100. }

Semaphore 使用示例

  1. public class SemaphoreTest {
  2. private static final int TOTAL_COUNT = 10;
  3. private static final int PERMIT_COUNT = 2;
  4. public static void main(String[] args) throws InterruptedException {
  5. CountDownLatch countDownLatch = new CountDownLatch(TOTAL_COUNT);
  6. // 创建信号量,包含两个许可
  7. Semaphore semaphore = new Semaphore(PERMIT_COUNT);
  8. AtomicInteger atomicInteger = new AtomicInteger();
  9. Runnable runnable = () -> {
  10. try {
  11. semaphore.acquire(1);
  12. System.out.print(System.currentTimeMillis() + "-");
  13. System.out.println(Thread.currentThread().getName() + " 的执行结果:" + atomicInteger.incrementAndGet());
  14. // 模拟耗时
  15. Thread.sleep(1000);
  16. semaphore.release(1);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. countDownLatch.countDown();
  21. }
  22. };
  23. for (int i = 0; i < TOTAL_COUNT; i++) {
  24. new Thread(runnable, "线程" + i).start();
  25. }
  26. countDownLatch.await();
  27. }
  28. }

输出结果:

1628579429890-线程0 的执行结果:11628579429891-线程1 的执行结果:2 1628579430898-1628579430898-线程2 的执行结果:3 线程3 的执行结果:4 1628579431902-1628579431902-线程4 的执行结果:5 线程5 的执行结果:6 1628579432904-1628579432904-线程7 的执行结果:7 线程6 的执行结果:8 1628579433906-线程8 的执行结果:9 1628579433906-线程9 的执行结果:10

根据输出结果可以看出,每次都有两个线程能够获取到 Semaphore 的许可得到执行。