14 | Lock和Condition(上):隐藏在并发包中的管程

并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

再造管程的理由

破坏死锁的其中一个方案,破坏不可抢占条件,但是synchronized 没有办法解决。原因是synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?我觉得有三种方案。

  1. 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

这三种方案可以全面弥补 synchronized 的问题。到这里相信你应该也能理解了,这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法。详情如下:

  1. // 支持中断的API
  2. void lockInterruptibly() throws InterruptedException;
  3. // 支持超时的API
  4. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  5. // 支持非阻塞获取锁的API
  6. boolean tryLock();

如何保证可见性

  1. class X {
  2. private final Lock rtl = new ReentrantLock();
  3. int value;
  4. public void addOne() {
  5. // 获取锁
  6. rtl.lock();
  7. try {
  8. value+=1;
  9. } finally {
  10. // 保证锁能释放
  11. rtl.unlock();
  12. }
  13. }
  14. }

Java SDK 里面 Lock 的使用,有一个经典的范例(如上),就是try{}finally{},可见性是如何保证的? Java 里多线程的可见性是通过 Happens-Before 规则保证的,而 synchronized 之所以能够保证可见性,也是因为有一条 synchronized 相关的规则:synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。那 Java SDK 里面 Lock 靠什么保证可见性呢?

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。 ```shell

class SampleLock { volatile int state; // 加锁 lock() { // 省略代码无数 state = 1; } // 解锁 unlock() { // 省略代码无数 state = 0; } }

  1. > **可重入锁**,顾名思义,指的是线程可以重复获取同一把锁。
  2. > **可重入函数**,指的是多个线程可以同时调用该函数(说明线程安全)。
  3. <br />
  4. <a name="z9WER"></a>
  5. ## 课后思考
  6. ```shell
  7. class Account {
  8. private int balance;
  9. private final Lock lock = new ReentrantLock();
  10. // 转账
  11. void transfer(Account tar, int amt){
  12. while (true) {
  13. if(this.lock.tryLock()) {
  14. try {
  15. if (tar.lock.tryLock()) {
  16. try {
  17. this.balance -= amt;
  18. tar.balance += amt;
  19. } finally {
  20. tar.lock.unlock();
  21. }
  22. }//if
  23. } finally {
  24. this.lock.unlock();
  25. }
  26. }//if
  27. }//while
  28. }//transfer
  29. }

出现活锁问题了,两个线程同时执行,线程A获取了u1.trylock,线程B获取了u2.trylock,线程A尝试获取u2.trylock,不能成功,线程A结束,同时线程B尝试获取u1.trylock,不能成功,线程B结束,又开始新的一轮,这样一直循环下去!

15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?

Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。
利用两个条件变量快速实现阻塞队列例子:

  1. public class BlockedQueue<T>{
  2. final Lock lock = new ReentrantLock();
  3. // 条件变量:队列不满
  4. final Condition notFull = lock.newCondition();
  5. // 条件变量:队列不空
  6. final Condition notEmpty = lock.newCondition();
  7. // 入队
  8. void enq(T x) {
  9. lock.lock();
  10. try {
  11. while (队列已满){
  12. // 等待队列不满
  13. notFull.await(); // 是会释放lock锁的
  14. }
  15. // 省略入队操作...
  16. //入队后,通知可出队
  17. notEmpty.signal();
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. // 出队
  23. void deq(){
  24. lock.lock();
  25. try {
  26. while (队列已空){
  27. // 等待队列不空
  28. notEmpty.await();
  29. }
  30. // 省略出队操作...
  31. //出队后,通知可入队
  32. notFull.signal();
  33. } finally {
  34. lock.unlock();
  35. }
  36. }
  37. }

Dubbo 源码分析

TCP 协议本身就是异步的,我们工作中经常用到的 RPC 调用,在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的。可能你会觉得奇怪,平时工作中的 RPC 调用大多数都是同步的啊?这是怎么回事呢?
RPC异步转同步:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。

  1. // 创建锁与条件变量
  2. private final Lock lock = new ReentrantLock();
  3. private final Condition done = lock.newCondition();
  4. // 调用方通过该方法等待结果
  5. Object get(int timeout) {
  6. long start = System.nanoTime();
  7. lock.lock();
  8. try {
  9. while (!isDone()) {
  10. done.await(timeout);
  11. long cur = System.nanoTime();
  12. if (isDone() || cur-start > timeout) {
  13. break;
  14. }
  15. }
  16. } finally {
  17. lock.unlock();
  18. }
  19. if (!isDone()) {
  20. throw new TimeoutException();
  21. }
  22. return returnFromResponse();
  23. }
  24. // RPC结果是否已经返回
  25. boolean isDone() {
  26. return response != null;
  27. }
  28. // RPC结果返回时调用该方法
  29. private void doReceived(Response res) {
  30. lock.lock();
  31. try {
  32. response = res;
  33. if (done != null) {
  34. done.signal();
  35. }
  36. } finally {
  37. lock.unlock();
  38. }
  39. }

16 | Semaphore:如何快速实现一个限流器?

信号量模型

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。你可以结合下图来形象化地理解。

14-21 并发工具类 - 图1
这三个方法详细的语义具体如下所示。

  1. init():设置计数器的初始值。
  2. down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
  3. up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()

如何使用信号量

  1. static int count;
  2. //初始化信号量
  3. static final Semaphore s = new Semaphore(1);
  4. //用信号量保证互斥
  5. static void addOne() {
  6. s.acquire();
  7. try {
  8. count+=1;
  9. } finally {
  10. s.release();
  11. }
  12. }

快速实现一个限流器

既然有 Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区
对象池的示例代码

  1. class ObjPool<T, R> {
  2. final List<T> pool;
  3. // 用信号量实现限流器
  4. final Semaphore sem;
  5. // 构造函数
  6. ObjPool(int size, T t){
  7. pool = new Vector<T>(){};
  8. for(int i=0; i<size; i++){
  9. pool.add(t);
  10. }
  11. sem = new Semaphore(size);
  12. }
  13. // 利用对象池的对象,调用func
  14. R exec(Function<T,R> func) {
  15. T t = null;
  16. sem.acquire();
  17. try {
  18. t = pool.remove(0);
  19. return func.apply(t);
  20. } finally {
  21. pool.add(t);
  22. sem.release();
  23. }
  24. }
  25. }
  26. // 创建对象池
  27. ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
  28. // 通过对象池获取t,之后执行
  29. pool.exec(t -> {
  30. System.out.println(t);
  31. return t.toString();
  32. });

信号量和管程

信号量在 Java 语言里面名气并不算大,但是在其他语言里却是很有知名度的。Java 在并发编程领域走的很快,重点支持的还是管程模型。 管程模型理论上解决了信号量模型的一些不足,主要体现在易用性和工程化方面,例如用信号量解决我们曾经提到过的阻塞队列问题,就比管程模型麻烦很多

17 | ReadWriteLock:如何快速实现一个完备的缓存?

我们介绍了管程和信号量这两个同步原语在 Java 语言中的实现,理论上用这两个同步原语中任何一个都可以解决所有的并发问题。Java SDK 并发包里为什么还有很多其他的工具类呢?原因很简单:分场景优化性能,提升易用性。
针对读多写少这种并发场景,Java SDK 并发包提供了读写锁——ReadWriteLock,非常容易使用,并且性能很好。
Cache 这个工具类

  1. class Cache<K,V> {
  2. final Map<K, V> m = new HashMap<>();
  3. final ReadWriteLock rwl = new ReentrantReadWriteLock();
  4. // 读锁
  5. final Lock r = rwl.readLock();
  6. // 写锁
  7. final Lock w = rwl.writeLock();
  8. // 读缓存
  9. V get(K key) {
  10. r.lock();
  11. try {
  12. return m.get(key);
  13. }
  14. finally {
  15. r.unlock();
  16. }
  17. }
  18. // 写缓存
  19. V put(K key, V value) {
  20. w.lock();
  21. try {
  22. return m.put(key, v);
  23. }
  24. finally {
  25. w.unlock();
  26. }
  27. }
  28. }

18 | StampedLock:有没有比读写锁更快的锁?

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁、悲观读锁和乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下。

  1. final StampedLock sl = new StampedLock();
  2. // 获取/释放悲观读锁示意代码
  3. long stamp = sl.readLock();
  4. try {
  5. //省略业务相关代码
  6. } finally {
  7. sl.unlockRead(stamp);
  8. }
  9. // 获取/释放写锁示意代码
  10. long stamp = sl.writeLock();
  11. try {
  12. //省略业务相关代码
  13. } finally {
  14. sl.unlockWrite(stamp);
  15. }

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。 ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
但是 StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。StampedLock 在命名上并没有增加 Reentrant,想必你已经猜测到 StampedLock 应该是不可重入的。事实上,的确是这样的,StampedLock 不支持重入。这个是在使用中必须要特别注意的。另外,StampedLock 的悲观读锁、写锁都不支持条件变量,这个也需要你注意。
StampedLock 读模板:

  1. final StampedLock sl = new StampedLock();
  2. // 乐观读
  3. long stamp = sl.tryOptimisticRead();
  4. // 读入方法局部变量
  5. ......
  6. // 校验stamp
  7. if (!sl.validate(stamp)){
  8. // 升级为悲观读锁
  9. stamp = sl.readLock();
  10. try {
  11. // 读入方法局部变量
  12. .....
  13. } finally {
  14. //释放悲观读锁
  15. sl.unlockRead(stamp);
  16. }
  17. }
  18. //使用方法局部变量执行业务操作
  19. ......

StampedLock 写模板:

  1. long stamp = sl.writeLock();
  2. try {
  3. // 写共享变量
  4. ......
  5. } finally {
  6. sl.unlockWrite(stamp);
  7. }

19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?

14-21 并发工具类 - 图2需求如图

  1. 代码块同步执行,效率过低

14-21 并发工具类 - 图3

  1. 将1和2开子线程,异步执行

14-21 并发工具类 - 图4

  1. while(存在未对账订单){
  2. // 查询未对账订单
  3. Thread T1 = new Thread(()->{
  4. pos = getPOrders();
  5. });
  6. T1.start();
  7. // 查询派送单
  8. Thread T2 = new Thread(()->{
  9. dos = getDOrders();
  10. });
  11. T2.start();
  12. // 等待T1、T2结束
  13. T1.join();
  14. T2.join();
  15. // 执行对账操作
  16. diff = check(pos, dos);
  17. // 差异写入差异库
  18. save(diff);
  19. }
  1. 用 CountDownLatch 实现线程等待。2方案中while 循环里面每次都会创建新的线程,而创建线程可是个耗时的操作。所以最好是创建出来的线程能够循环利用。但是线程池就无法使用Thread.join来实现了,因为线程不会销毁。 ```java

// 创建2个线程的线程池 Executor executor = Executors.newFixedThreadPool(2); while(存在未对账订单){ // 计数器初始化为2 CountDownLatch latch = new CountDownLatch(2); // 查询未对账订单 executor.execute(()-> { pos = getPOrders(); latch.countDown(); }); // 查询派送单 executor.execute(()-> { dos = getDOrders(); latch.countDown(); });

// 等待两个查询操作结束 latch.await();

// 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); }

  1. 4. 进一步优化:在执行对账操作的时候,可以同时去执行下一轮的查询操作。用 CyclicBarrier 实现线程同步
  2. ![](https://cdn.nlark.com/yuque/0/2021/png/2109626/1614685063766-e335e1f9-4889-4a25-ac0f-f12c24e4b049.png#align=left&display=inline&height=273&margin=%5Bobject%20Object%5D&originHeight=624&originWidth=1142&size=0&status=done&style=none&width=500)
  3. ```java
  4. // 订单队列
  5. Vector<P> pos;
  6. // 派送单队列
  7. Vector<D> dos;
  8. // 执行回调的线程池
  9. Executor executor = Executors.newFixedThreadPool(1);
  10. final CyclicBarrier barrier =
  11. new CyclicBarrier(2, ()->{
  12. executor.execute(()->check());
  13. });
  14. void check(){
  15. P p = pos.remove(0);
  16. D d = dos.remove(0);
  17. // 执行对账操作
  18. diff = check(p, d);
  19. // 差异写入差异库
  20. save(diff);
  21. }
  22. void checkAll(){
  23. // 循环查询订单库
  24. Thread T1 = new Thread(()->{
  25. while(存在未对账订单){
  26. // 查询订单库
  27. pos.add(getPOrders());
  28. // 等待
  29. barrier.await();
  30. }
  31. });
  32. T1.start();
  33. // 循环查询运单库
  34. Thread T2 = new Thread(()->{
  35. while(存在未对账订单){
  36. // 查询运单库
  37. dos.add(getDOrders());
  38. // 等待
  39. barrier.await();
  40. }
  41. });
  42. T2.start();
  43. }

20 | 并发容器:都有哪些“坑”需要我们填?

14-21 并发工具类 - 图5

List

List 里面只有一个实现类就是 CopyOnWriteArrayList。
如果在遍历 array 的同时,还有一个写操作,例如增加元素,CopyOnWriteArrayList 是如何处理的呢?CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。

Map

14-21 并发工具类 - 图6

Set

Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的

Queue

Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。

单端阻塞队列

其实现有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和 DelayQueue。
14-21 并发工具类 - 图7

双端阻塞队列

其实现是 LinkedBlockingDeque
14-21 并发工具类 - 图8

单端非阻塞队列

其实现是 ConcurrentLinkedQueue

双端非阻塞队列

其实现是 ConcurrentLinkedDeque。

Java 容器的快速失败机制

课后思考

在 1.8 之前并发执行 HashMap.put() 可能会导致 CPU 飙升到 100%原因
https://my.oschina.net/alexqdjay/blog/1377268?utm_medium=referral
https://blog.csdn.net/qq_35688140/article/details/100772864

21 | 原子类:无锁工具类的典范

14-21 并发工具类 - 图9

原子类

原子化的基本数据类型

  1. getAndIncrement() //原子化i++
  2. getAndDecrement() //原子化的i--
  3. incrementAndGet() //原子化的++i
  4. decrementAndGet() //原子化的--i
  5. //当前值+=delta,返回+=前的值
  6. getAndAdd(delta)
  7. //当前值+=delta,返回+=后的值
  8. addAndGet(delta)
  9. //CAS操作,返回是否成功
  10. compareAndSet(expect, update)
  11. //以下四个方法
  12. //新值可以通过传入func函数来计算
  13. getAndUpdate(func)
  14. updateAndGet(func)
  15. getAndAccumulate(x,func)
  16. accumulateAndGet(x,func)

原子化的对象引用类型

无锁方案的实现原理

其实原子类性能高的秘密很简单,硬件支持而已。CPU 为了解决并发问题,提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)。CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的

使用 CAS 来解决并发问题,一般都会伴随着自旋,而所谓自旋,其实就是循环尝试。例如,实现一个线程安全的count += 1操作,“CAS+ 自旋”的实现方案如下所示(范式do while):

  1. class SimulatedCAS{
  2. volatile int count;
  3. // 实现count+=1
  4. addOne() {
  5. do {
  6. newValue = count+1; //①
  7. } while(count != cas(count,newValue) //②
  8. }
  9. }

引申问题

ABA 的问题。解决 ABA 问题的思路其实很简单,增加一个版本号维度就可以了(版本号递增的)。