悲观锁和乐观锁
悲观锁总是假设当前就是出于并发状态,直接对临界资源进行上锁,保证并发性;
乐观锁在操作临界资源时,不会一上来就加锁,而是先检测当前的一个数据状态,如果存在冲突,在进入自旋,直到可以更新数据。
悲观锁和乐观锁存在的问题
悲观锁存在的问题
- 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题
- 一个线程持有锁后,会导致其他所有抢占此锁的线程挂起
- 如果一个优先级高的线程等待一个优先级低的线程释放锁,就会导致线程的优先级倒置,从而引发性能风险
以上这些悲观锁存在的问题就可以使用乐观锁进行替代,提升性能。
乐观锁存在的问题
- 并发度很高的情况下,除了获取锁的线程,其他线程会进入无限循环的自旋状态,导致 CPU 资源浪费
- 由于 CPU 会通过 MESI 协议保证变量的缓存一致性,那么就需要通过总线在不同的内核上来回通信。在高度并发的情况下,总线就会成为性能瓶颈,造成所谓的“总线风暴”现象
使用 CAS 实现乐观锁
乐观锁的操作分为两步:
- 冲突检测
- 数据更新
Java 提供的 CAS 原子操作可以用来实现乐观锁。 CAS 操作也可以分为两个步骤:
- 检测内存位置的值是否为期待值
- 如果是,就将该内存位置更新为最新值;否则不要更改该位置
实际上,如果需要完成数据的最终更新,仅仅进行一次 CAS 操作是不够的,一般情况下,需要进行自旋操作,即不断地循环重试 CAS 操作直到成功,这也叫 CAS 自旋。
什么是自旋锁
当一个线程在获取锁的时候,如果锁已经被其他线程获取,调用者就一直在那里循环检查该锁是否已经被释放,一直到获取到锁才会退出循环。这就是自旋锁的定义。
CAS 自旋锁的实现原理为:抢锁线程不断进行 CAS 自旋操作去更新锁的 owner(拥有者),如果更新成功,就表明已经抢锁成功,退出抢锁方法。如果锁已经被其他线程获取(也就是 owner 为其他线程),调用者就一直在那里循环进行 owner 的 CAS 更新操作,一直到成功才会退出循环。
不可重入的自旋锁
public class NoReentrantSpinLock implements Lock {
/**
* 当前锁的拥有者
*/
private AtomicReference<Thread> owner = new AtomicReference<>();
@Override
public void lock() {
Thread t = Thread.currentThread();
// 进入自旋,不可重入
while (owner.compareAndSet(null, t)) {
// 让出CPU时间片
Thread.yield();
}
}
@Override
public void unlock() {
Thread t = Thread.currentThread();
if (t == owner.get()) {
owner.set(null);
}
}
......
}
可重入的自旋锁
public class ReentrantSpinLock implements Lock {
/**
* 当前锁的拥有者
*/
private AtomicReference<Thread> owner = new AtomicReference<>();
/**
* 记录一个线程重复获取到锁的次数
* <p>
* 此变量为同一个线程在操作,没有必要加上volatile保证可见性和有序性
*/
private int count = 0;
@Override
public void lock() {
Thread t = Thread.currentThread();
if (t == owner.get()) {
count++;
return;
}
// 进入自旋,不可重入
while (owner.compareAndSet(null, t)) {
// 让出CPU时间片
Thread.yield();
}
}
@Override
public void unlock() {
Thread t = Thread.currentThread();
if (t == owner.get()) {
if (count > 0) {
// 如果重入次数大于0,减少重入计数
--count;
} else {
owner.set(null);
}
}
}
......
}
可以看到,可重入与不可重入的区别在于:可重入引入了一个重入次数计数器,只有在计数器为 0 时才表名当前锁处于被释放状态,其他线程才能进行获取。
什么是总线风暴
目前的 CPU 架构大体可以分为三类:
- 多处理器结构(Symmetric Multi-Processor,SMP)
- 非一致存储访问结构(Non-Uniform Memory Access,NUMA)
- 海量并行处理结构(Massive Parallel Processing,MPP)
CAS 对应的硬件指令为 cmpxchg,在多核 CPU 下,就会在 cmpxchg 指令前加上 lock 前缀指令。lock 前缀指令有以下3个作用:
- 将当前 CPU 缓存行的数据立即写回系统内存
- lock 前缀指令会引起在其他 CPU 中缓存了该内存地址的数据无效
- lock 前缀指令禁止指令重排
常见的 PC、手机、老式服务器都是 SMP 架构,其架构简单,但拓展性能非常差。在 SMP 架构的 CPU 平台上,所有的 Core(内核)会共享一条总线(BUS),靠此总线连接主存。每个核都有自己的高速缓存,各核相对于 BUS 对称分布。因此,这种结构称为“对称多处理器”。一个 8 核的 SMP 架构 CPU 大致如下图所示:
假设 Core 1 和 Core 2 可同时把某个变量加载到自己的高速缓存中,当 Core 1 在自己的高速缓存中修改这个位置的值时,会通过总线使 Core 2 中 L1 高速缓存对应的值“失效”,而 Core 2 一旦发现自己缓存中的值失效,就会通过总线从内存中读取最新的值,使得 Core 2 和 Core 1 中的值再次一致,这样 CPU 就保障了变量的“缓存一致性”。
CPU会通过MESI协议保障变量的缓存一致性。为了保障“缓存一致性”,不同的内核需要通过总线来回通信,因而所产生的流量一般被称为“缓存一致性流量”。因为总线被设计为固定的“通信能力”,如果缓存一致性流量过大,总线将成为瓶颈,这就是所谓的“总线风暴”。
注意:在 SMP 架构上会产生“总线风暴”是因为 SMP 架构采用了 lock 前缀指令产生了缓存一致性流量,而在很多线程同时执行 lock 前缀指令时导致流量过剩。而其他架构的 CPU 并不一定会产生“总线风暴”。
那 CAS 怎么避免总线风暴呢?答案就是使用队列对抢锁线性进行排序,最大程度上削减 CAS 操作数量。
CLH 自旋锁
CLH 锁其实就是一种基于队列(具体为单向链表)排队的自旋锁,由于是 Craig、Landin 和 Hagersten 三人一起发明的,因此被命名为 CLH 锁,也叫 CLH 队列锁。
简单的 CLH 锁可以基于单向链表实现,申请加锁的线程首先会通过 CAS 操作在单向链表的尾部增加一个节点,之后该线程只需要在其前驱节点上进行普通自旋,等待前驱节点释放锁即可。由于 CLH 锁只有在节点入队时进行一下 CAS 的操作,在节点加入队列之后,抢锁线程不需要进行 CAS 自旋,只需普通自旋即可。因此,在争用激烈的场景下,CLH 锁能大大减少 CAS 操作的数量,以避免 CPU 的总线风暴。
public class CLHLock implements Lock {
/**
* 当前节点的本地线程变量
*/
private static ThreadLocal<Node> curNodeLocal = new ThreadLocal<>();
/**
* CLHLock队列的尾指针,使用AtomicReference,方便进行CAS操作
*/
private AtomicReference<Node> tail = new AtomicReference<>();
public CLHLock() {
// 操作1
tail.getAndSet(Node.EMPTY);
}
@Override
public void lock() {
// 线程加锁,locked为true
Node curNode = new Node(true, null);
Node preNode = tail.get();
// 操作2 CAS自旋,将当前节点插入到队列的尾部
while (!tail.compareAndSet(preNode, curNode)) {
preNode = tail.get();
}
// 操作2中设置了链表的尾节点,这里将上一个为节点设置为当前节点的前驱结点
curNode.setPreview(preNode);
// 操作3
// 监听前驱节点的locked变量,直到其值为false
// 若前继节点的locked状态为true,则表示前一线程还在抢占或者占有锁
while (curNode.getPreview().isLocked()) {
//让出CPU时间片,提高性能
Thread.yield();
}
// 能执行到这里,说明当前线程获取到了锁
// Print.tcfo("获取到了锁!!!");
//设置在线程本地变量中,用于释放锁
curNodeLocal.set(curNode);
}
@Override
public void unlock() {
// 操作4
Node curNode = curNodeLocal.get();
curNode.setPreview(null); // help for GC
curNodeLocal.set(null);
// 释放自旋锁,上面的while循环将得到执行,即下一个Node开始执行
curNode.setLocked(false);
}
......
static class Node {
/**
* true:正在抢锁或者已经拥有锁
* false:当前线程已经释放锁,下一个线程可以占有锁
*/
private boolean locked;
/**
* 上一个节点,需要监听locked字段
*/
private Node preview;
private static final Node EMPTY = new Node(false, null);
}
}
- 操作1 - tail 属性使用 AtomicReference 类型是为了使得多个线程并发操作tail时不会发生线程安全问题
- 操作2 - Thread 在抢锁时会创建一个新的 Node 加入等待队列尾部:tail 指向新的 Node,同时新的 Node 的 preview 属性指向 tail 之前指向的节点,并且以上操作通过 CAS 自旋完成,以确保操作成功
- 操作3 - Thread 加入抢锁队列之后,会在前驱节点上自旋:循环判断前驱节点的 locked 属性是否为 false,如果为 false 就表示前驱节点释放了锁,当前线程抢占到锁
- 操作4 - Thread 抢到锁之后,它的 locked 属性一直为 true,一直到临界区代码执行完,然后调用
unlock()
方法释放锁,释放之后其 locked 属性才为 false。释放锁操作为:线程从本地变量 curNodeLocal 中获取当前节点 curNode,将其状态设置为 false,以便其后继节点能获得锁。
CLH 锁的抢占过程:
假如有这么一个场景:有三个并发线程同时抢占 CLHLock 锁,三个线程的实际执行顺序为 Thread A<—Thread B<—Thread C。
- 线程A开始执行了 lock 操作,创建一个节点 nodeA,设置它的 locked 状态为 true,然后设置它的前驱为 CLHLock.tail(此时为 EMPTY),并将 CLHLock.tail 设置为 nodeA,之后线程 A 开始在它的前驱节点上进行普通自旋。如下图所示:
- 线程 B 开始执行 lock 操作,创建一个节点 nodeB,设置它的 locked 状态为 true,然后设置它的前驱节点为 CLHLock.tail(此时为 nodeA),并将 CLHLock.tail 设置为 nodeB,之后线程 B 开始在它的前驱节点上进行普通自旋。如下图所示:
- 线程 C 开始执行 lock 操作,创建一个节点 nodeC,设置它的 locked 状态为 true,然后设置它的前驱节点为 CLHLock.tail(此时为 nodeB),并将 CLHLock.tail 设置为 nodeC,之后线程 C 开始在它的前驱节点上进行普通自旋。如下图所示:
- 线程 A 执行完临界区代码后开始 unlock(释放)操作,设置 nodeA 的前驱引用为 null,锁状态 locked 为 false。如下图所示:
- 线程 B 执行抢到锁并且完成临界区代码的执行后,开始 unlock(释放)操作,设置 nodeB 的前驱引用为 null,锁状态 locked 为 false。如下图所示:
- 线程 C 执行抢到锁并且完成临界区代码的执行后,开始 unlock(释放)操作,设置 nodeC 的前驱引用为 null,锁状态 locked 为 false。如下图所示:
CLH 锁是一种队列锁,其优点是空间复杂度低。如果有 N 个线程、L 个锁,每个线程每次只获取一个锁,那么需要的存储空间是 O(L+N),N 个线程有 N 个Node,L 个锁有 L 个 Tail。
CLH 队列锁的一个显著缺点是它在 NUMA 架构的 CPU 平台上性能很差。CLH 队列锁在 NUMA 架构的 CPU 平台上,每个 CPU 内核有自己的内存,如果前驱节点在不同的 CPU 内核上,它的内存位置比较远,在自旋判断前驱节点的 locked 属性时,性能将大打折扣。然而,CLH 锁在 SMP 架构的 CPU 平台上则不存在这个问题,性能还是挺高的。
公平锁与非公平锁
- 公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁,抢锁成功的次序体现为 FIFO(先进先出)顺序。
- 非公平锁
非公平锁是指多个线程获取锁的顺序并不一定是其申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,抢锁成功的次序不一定体现为 FIFO(先进先出)顺序。非公平锁的优点在于吞吐量比公平锁大,它的缺点是有可能会导致线程优先级反转或者线程饥饿现象。
ReentrantLock 默认为非公平锁,但是也可以根据构造器传入的 boolean 类型来选择创建的锁的类型:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可中断锁与不可中断锁
可中断锁是指抢占过程可以被中断的锁,例如 ReentrantLock;可中断锁是指抢占过程不可以被中断的锁,例如 synchronized。
在 Lock 接口中,提供了两个方法用来中断抢占:
- lockInterruptibly()
- tryLock(long timeout,TimeUnit unit)
共享锁与独占锁
在访问共享资源之前进行加锁操作,在访问完成之后进行解锁操作。按照“是否允许在同一时刻被多个线程持有”来区分,锁可以分为共享锁与独占锁。
独占锁
在访问共享资源之前进行加锁操作,在访问完成之后进行解锁操作。按照“是否允许在同一时刻被多个线程持有”来区分,锁可以分为共享锁与独占锁。
Java 中的 Synchronized 内置锁和 ReentrantLock 显式锁都是独占锁。
共享锁
共享锁就是在同一时刻允许多个线程持有的锁。当然,获得共享锁的线程只能读取临界区的数据,不能修改临界区的数据。
JUC 中的 Semaphore(信号量)、ReadLock(读锁)以及 CountDownLatch(倒数闩)都属于共享锁。
Semaphore 概述
Semaphore 可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。Semaphore 维护了一组虚拟许可,它的数量可以通过构造器的参数指定。线程在访问共享资源前必须调用Semaphore 的 acquire()
方法获得许可,如果许可数量为 0,该线程就一直阻塞。线程访问完资源后,必须调用 Semaphore 的 release()
方法释放许可。更形象的说法是:Semaphore是一个许可管理器。
Semaphore 常用方法
/**
* 构造一个Semaphore实例,初始化其管理的许可数量为permits参数值
*
* 默认为非公平锁
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 获取 Semaphore 对象可用的许可数量
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* 当前线程尝试获取 Semaphore 对象的一个许可。此过程是阻塞的,线程会一直等待 Semaphore 发放
* 一个许可,直到发生以下任意一件事:
*
* 1. 当前线程获取了一个可用的许可
* 2. 当前线程被中断,就会抛出 InterruptedException 异常,并停止等待,继续往下执行
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 当前线程尝试阻塞地获取 permits 个许可。此过程是阻塞的,线程会一直等待 Semaphore 发放
* permits 个许可。如果没有足够的许可而当前线程被中断,就会抛出InterruptedException异常
* 并终止阻塞
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 当前线程尝试阻塞地获取一个许可,阻塞的过程不可中断,直到成功获取一个许可
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 当前线程尝试阻塞地获取 permits 个许可,阻塞的过程不可中断,直到成功获取 permits 个许可
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// 当前线程尝试获取一个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程
// 成功获取了一个许可,就返回 true;如果当前线程没有获得许可,就返回 false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 当前线程尝试获取 permits 个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果线程
// 成功获取了 permits 个许可,就返回 true;如果当前线程没有获得 permits 个许可,就返回false
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* 限时获取一个许可。此过程是阻塞的,会一直等待许可,直到发生以下任意一件事:
*
* 1. 当前线程获取了一个许可,则会停止等待,继续执行,并返回 true
* 2. 当前线程等待 timeout 后超时,则会停止等待,继续执行,并返回 false
* 3. 当前线程在 timeout 时间内被中断,则会抛出 InterruptedException 异常,并停止等待,继续执行
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 限时获取 permits 个许可。此过程是阻塞的,会一直等待许可,直到发生以下任意一件事:
*
* 1. 当前线程获取了 permits 个许可,则会停止等待,继续执行,并返回 true
* 2. 当前线程等待 timeout 后超时,则会停止等待,继续执行,并返回 false
* 3. 当前线程在 timeout 时间内被中断,则会抛出 InterruptedException 异常,并停止等待,继续执行
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// 当前线程释放一个可用的许可
public void release() {
sync.releaseShared(1);
}
// 当前线程释放permits个可用的许可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// 当前线程获得剩余的所有可用许可
public int drainPermits() {
return sync.drainPermits();
}
// 判断当前 Semaphore 对象上是否存在正在等待许可的线程
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 获取当前 Semaphore 对象上正在等待许可的线程数量
public final int getQueueLength() {
return sync.getQueueLength();
}
Semaphore 使用示例
public class SemaphoreTest {
private static final int TOTAL_COUNT = 10;
private static final int PERMIT_COUNT = 2;
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(TOTAL_COUNT);
// 创建信号量,包含两个许可
Semaphore semaphore = new Semaphore(PERMIT_COUNT);
AtomicInteger atomicInteger = new AtomicInteger();
Runnable runnable = () -> {
try {
semaphore.acquire(1);
System.out.print(System.currentTimeMillis() + "-");
System.out.println(Thread.currentThread().getName() + " 的执行结果:" + atomicInteger.incrementAndGet());
// 模拟耗时
Thread.sleep(1000);
semaphore.release(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
for (int i = 0; i < TOTAL_COUNT; i++) {
new Thread(runnable, "线程" + i).start();
}
countDownLatch.await();
}
}
输出结果:
1628579429890-线程0 的执行结果:11628579429891-线程1 的执行结果:2 1628579430898-1628579430898-线程2 的执行结果:3 线程3 的执行结果:4 1628579431902-1628579431902-线程4 的执行结果:5 线程5 的执行结果:6 1628579432904-1628579432904-线程7 的执行结果:7 线程6 的执行结果:8 1628579433906-线程8 的执行结果:9 1628579433906-线程9 的执行结果:10
根据输出结果可以看出,每次都有两个线程能够获取到 Semaphore 的许可得到执行。