01 | Lock和Condition

并发编程中两大核心问题:

  • 互斥:同一时刻只允许一个线程访问共享资源
  • 同步:线程之间如何通信、协作

Java SDK并发包通过Lock和Condition两个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题。

Lock

  1. public interface Lock {
  2. /**
  3. * 获取锁
  4. */
  5. void lock();
  6. /**
  7. * 获取锁,可响应中断
  8. */
  9. void lockInterruptibly() throws InterruptedException;
  10. /**
  11. * 仅在调用时锁为空闲状态才获取该锁,可以响应中断
  12. * @return true 获取成功, false 获取失败
  13. */
  14. boolean tryLock();
  15. /**
  16. * 在给定的空闲时间内如果锁空闲并且未被中断则获取锁
  17. * @param time 空闲时间
  18. * @param unit 时间单位
  19. * @return true 获取成功, false 获取失败
  20. */
  21. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  22. /**
  23. * 释放锁
  24. */
  25. void unlock();
  26. /**
  27. * 返回绑定到此 Lock 实例的新 Condition 实例
  28. * @return
  29. */
  30. Condition newCondition();
  31. }

Condition

condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。

  1. public interface Condition {
  2. /**
  3. * 当前线程一直处于等待状态直到收到信号(signalled)或者当前线程被中断
  4. */
  5. void await() throws InterruptedException;
  6. /**
  7. * await()基础上如超过设定的指定时间之前处于等待状态
  8. */
  9. boolean await(long time, TimeUnit unit) throws InterruptedException;
  10. /**
  11. * 当前线程一直处于等待状态直到收到信号(signalled)(中断不敏感)
  12. */
  13. void awaitUninterruptibly();
  14. /**
  15. * await(long time, TimeUnit unit)基础上返回剩余超时时间
  16. */
  17. long awaitNanos(long nanosTimeout) throws InterruptedException;
  18. /**
  19. * 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false
  20. */
  21. boolean awaitUntil(Date deadline) throws InterruptedException;
  22. /**
  23. * 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁
  24. */
  25. void signal();
  26. /**
  27. * 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
  28. */
  29. void signalAll();
  30. }

Lock和Synchronized的区别

  • Lock提供了超时机制
  • Lock阻塞的线程可以响应中断
  • Lock支持非阻塞的获取锁
  • Lock+Condition可以支持多个条件
  • Synchronized为非公平锁,Lock两者都可以(默认为非公平锁)

    锁的最佳实践

  • 永远只在更新对象的成员变量时加锁

  • 永远只在访问可变的成员变量时加锁
  • 永远不在调用其他对象的方法时加锁

02 | Semaphore

信号量模型

信号量模型可以简单概括为:

  • 一个计数器
  • 一个等待队列
  • 三个方法
    • init():设置计数器的初始值
    • down():计数器值减1;如果此时计数器的值小于0,则当前线程将被阻塞
    • up():计数器值加1;如果此时计数器的值小于或等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除

image.png
Java SDK中,信号量模型有java.util.concurrent.Semaphore实现

  1. public class Semaphore {
  2. /**
  3. * 等同于init(),初始化信号量大小
  4. * @param permits
  5. */
  6. public Semaphore(int permits){}
  7. /**
  8. * 等同于down()
  9. */
  10. public void acquire();
  11. /**
  12. * 等同于up()
  13. */
  14. public void release();
  15. }

Semaphore能够保证这三个方法都是原子操作(通过Lock实现)

03 | ReadWriteLock

读写锁的基本原则:

  • 允许多个线程同时读共享变量
  • 只允许一个线程写共享变量
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量
  • 读写锁不允许升级(读锁升级为写锁),但可以降级(写锁降级为读锁)

适用场景:读多写少

锁模式

  • 读锁
  • 写锁

允许多个线程同时获取读锁,但是只允许一个线程获取写锁,写锁和读锁是互斥的。
ReentrantReadWriteLock实现原理

04 | StampedLock

锁模式

  • 写锁
  • 悲观读锁
  • 乐观读

写锁与悲观读锁语义与ReadWriteLock的写锁和读锁语义相似,不同的是StampedLock中的写锁和悲观读锁加锁成功之后都会返回一个stamp,解锁时,需要传入这个stamp。
StampedLock的性能之所以比ReadWriteLock要好,关键StampedLock支持乐观读。ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而StampedLock提供的乐观读(实质为乐观读这个操作是无锁的),所有允许一个线程获取写锁,并不会阻塞所有的写操作,但因此可能会导致数据不一起的情况,StampedLock通过提供validate(stamp)可以验证读期间是否存在写操作,如存在可升级为悲观读锁保证数据一致。推荐实现如下:

  1. class Point {
  2. private int x, y;
  3. final StampedLock sl =
  4. new StampedLock();
  5. //计算到原点的距离
  6. int distanceFromOrigin() {
  7. // 乐观读
  8. long stamp =
  9. sl.tryOptimisticRead();
  10. // 读入局部变量,
  11. // 读的过程数据可能被修改
  12. int curX = x, curY = y;
  13. //判断执行读操作期间,
  14. //是否存在写操作,如果存在,
  15. //则sl.validate返回false
  16. if (!sl.validate(stamp)){
  17. // 升级为悲观读锁
  18. stamp = sl.readLock();
  19. try {
  20. curX = x;
  21. curY = y;
  22. } finally {
  23. //释放悲观读锁
  24. sl.unlockRead(stamp);
  25. }
  26. }
  27. return Math.sqrt(
  28. curX * curX + curY * curY);
  29. }
  30. }

StampedLock注意事项

  • 不支持重入
  • 不支持条件变量
  • 不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly(),否则会导致CPU飙升


05 | CountDownLatch和CyclicBarrier

CountDownLatch

使用场景:一个线程等待多个线程

CyclicBarrier

使用场景:一组线程互相等待

06 | 并发容器

并发容器关系图

image.png

List

CopyOnWriteArrayList

CopyOnWriteArrayList内部维护了一个数组array,所有的读操作都是基于这个数组array进行的,读操作是完全无锁的,如果在遍历array的同时,同时执行了写操作,如增加元素,CopyOnWriteArrayList会将array复制一份,然后在新复制的数据上执行元素增加的操作,执行完成之后再将array指向这个新的数据。需注意,在执行写操作,复制->增加元素->array指向新的数组这个过程中数据会出现短暂的不一致情况。
image.png

Map

ConcurrentHashMap

阅读资料

ConcurrentSkipListMap

ConcurrentHashMap和ConcurrentSkipListMap的主要区别为前者的key是无序的而后者的key是有序的

Set

CopyOnWriteArraySet

ConcurrentSkipListSet

Queue

单端阻塞队列:
  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • LinkedTransferQueue
  • PriorityBlockingQueue
  • DelayQueue

    双端阻塞队列
  • LinkedBlockingDeque

    单端非阻塞队列
  • ConcurrentLinkedQueue

    双端非阻塞队列
  • ConcurrentLinkedDeque


07 | 无锁原子类

无锁方案原理:CAS

CAS指令包含3个参数:共享变量的内存地址A、用于比较的值B和共享变脸的新值C;并且只有当内存中的地址A处的值等于B时,才能将内存中地址A处的值更新为新值C。作为一条CPU指令,CAS指令本身能够保证原子性

原子类概览

image.png

08 | Executor

线程池使用注意事项(by 阿里巴巴Java开发手册

  • 创建线程或线程池时指定有意义的线程名称,方便出错时回溯
  • 线程资源应通过线程池提供
  • 线程池不允许使用Executors去创建,使用弊端如下
    • newSingleThreadExecutor和newFixedThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
    • newCachedThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

09 | CompletableFuture:异步编程

Java1.8版本提供了CompletableFuture来支持异步编程,使用Completable无需再手动维护线程,也无需关注分配线程的工作,只需关注业务逻辑相关代码,语义更清晰。

创建

可通过CompletableFuture中的以下4个静态方法创建CompletableFuture对象:

  • CompletableFuture runAsync(Runnable runnable); // 使用默认线程池且异步任务返回值
  • CompletableFuture runAsync(Runnable runnable,Executor executor); // 指定线程池且异步线程返回值
  • CompletableFuture supplyAsync(Supplier supplier); // 使用默认线程池且异步任务包含返回值
  • CompletableFuture supplyAsync(Supplier supplier,Executor executor); // 指定线程池且异步线程包含返回

示例:

  1. CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
  2. System.out.println("runAsync with:" + Thread.currentThread().getName());
  3. });
  4. CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
  5. String threadName = Thread.currentThread().getName();
  6. System.out.println("supplyAsync with:" + threadName);
  7. return "result with:"+threadName;
  8. });
  9. System.out.println("main thread is Running");
  10. String result = supplyAsync.get();
  11. System.out.println(result);
  12. // result
  13. // main thread is Running
  14. // runAsync with:ForkJoinPool.commonPool-worker-3
  15. // supplyAsync with:ForkJoinPool.commonPool-worker-5
  16. // result with:ForkJoinPool.commonPool-worker-5

CompletionStage

作用:描述任务之间的时序关系
image.png

  • 串行关系
    • CompletionStage thenApply(fn);
    • CompletionStage thenApplyAsync(fn);
    • CompletionStage thenAccept(consumer);
    • CompletionStage thenAcceptAsync(consumer);
    • CompletionStage thenRun(action);
    • CompletionStage thenRunAsync(action);
    • CompletionStage thenCompose(fn);
    • CompletionStage thenComposeAsync(fn);
  • AND汇聚关系
    • CompletionStage thenCombine(other, fn);
    • CompletionStage thenCombineAsync(other, fn);
    • CompletionStage thenAcceptBoth(other, consumer);
    • CompletionStage thenAcceptBothAsync(other, consumer);
    • CompletionStage runAfterBoth(other, action);
    • CompletionStage runAfterBothAsync(other, action);
  • OR汇聚关系
    • CompletionStage applyToEither(other, fn);
    • CompletionStage applyToEitherAsync(other, fn);
    • CompletionStage acceptEither(other, consumer);
    • CompletionStage acceptEitherAsync(other, consumer);
    • CompletionStage runAfterEither(other, action);
    • CompletionStage runAfterEitherAsync(other, action);
  • 异常处理
    • CompletionStage exceptionally(fn);
    • CompletionStage whenComplete(consumer);
    • CompletionStage whenCompleteAsync(consumer);
    • CompletionStage handle(fn);
    • CompletionStage handleAsync(fn);

注: 方法不以Async结尾,意味着任务相同的线程执行,而Async可能会使用不同的线程执行(如使用相同的线程池,也可能被同一个线程选中执行)。

10 | CompletionService:批量执行异步任务

执行流程:
image.png
CompletionService将线程池Executor和阻塞队列BlockingQueue融合一起,能够让批量异步任务的执行结果有序化,能够让批量异步任务管理更加简单
核心方法

  1. Future<V> submit(Callable<V> task);
  2. Future<V> submit(Runnable task, V result);
  3. Future<V> take()throws InterruptedException;
  4. Future<V> poll();
  5. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

并行任务如何选择:

  • 简单并行任务:线程池+Future
  • 任务之间包含聚合关系:CompletableFuture
  • 批量的并行任务或对执行结果有有序化要求:CompletionService