关于Future的使用,务必掌握,掌握了Future就掌握了未来

Callable

直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有缺陷:不能返回一个返回值;不能抛出checked Exception。
Callable的call方法可以有返回值,可以声明抛出异常。和Callable配合的有一个Future类,通过Future可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable做不到的,Callable的功能要比Runnable强大。
image.png
image.png

Future

主要功能

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

  1. public interface Future<V> {
  2. // 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
  3. boolean cancel(boolean mayInterruptIfRunning);
  4. // 任务是否已经取消,任务正常完成前将其取消,则返回 true
  5. boolean isCancelled();
  6. // 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
  7. boolean isDone();
  8. // 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常,
  9. //ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
  10. V get() throws InterruptedException, ExecutionException;
  11. // 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的
  12. //单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
  13. V get(long timeout, TimeUnit unit)
  14. throws InterruptedException, ExecutionException, TimeoutException;
  15. }

FutureTask

利用FutureTask创建Future
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过FutureTask存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的FutureTask被转型为Future接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
image.png
把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这 个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。

  1. public class FutureTaskDemo {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. Task task = new Task();
  4. //构建futureTask
  5. FutureTask<Integer> futureTask = new FutureTask<>(task);
  6. //作为Runnable入参
  7. new Thread(futureTask).start();
  8. System.out.println("task运行结果:"+futureTask.get());
  9. }
  10. static class Task implements Callable<Integer> {
  11. @Override
  12. public Integer call() throws Exception {
  13. System.out.println("子线程正在计算");
  14. int sum = 0;
  15. for (int i = 0; i < 100; i++) {
  16. sum += i;
  17. }
  18. return sum;
  19. }
  20. }
  21. }

注:
1.当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
2.Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
3.Future task.get()阻塞的是主线程

局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束 后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身 也确实存在着许多限制:
并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以只能等待结果;
无法对多个任务进行链式调用:如果希望在计算任务完成后执行特定动作,Future却没有提供这样的能 力;
无法组合多个任务:如果运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中 这是无能为力的;
没有异常处理:Future接口中没有关于异常处理的方法。
问题: Callable和Future是否产生了新的线程?
没有。Future表示一个异步计算结果,实现任务和线程解耦

CompletionService

Callable+Future 可以实现多个task并行执行,但是如果遇到task执行较慢时需要阻塞等待前面的task执 行完后面task才能取得结果。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。 让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
image.png

原理

内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一 个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
适合批量的task写操作,写的时候不会阻塞。take(),阻塞队列取任务,再get()

使用案例

询价应用:向不同电商平台询价,并保存价格

  • 采用 ThreadPoolExecutor + Future 的方案:异步执行询价然后再保存

    1. @Slf4j
    2. public class CompletionServiceDemo {
    3. public static void main(String[] args) throws InterruptedException,ExecutionException {
    4. // 创建线程池
    5. ExecutorService executor = Executors.newFixedThreadPool(10);
    6. // 异步向电商S1询价
    7. Future<Integer> f1 = executor.submit(() -> getPriceByS1());
    8. // 异步向电商S2询价
    9. Future<Integer> f2 = executor.submit(() -> getPriceByS2());
    10. // 获取电商S1报价并异步保存
    11. executor.execute(() -> save(f1.get()));
    12. // 获取电商S2报价并异步保存
    13. executor.execute(() -> save(f2.get());
    14. executor.shutdown();
    15. }
    16. private static void save(Integer r) {
    17. log.debug("保存询价结果:{}",r);
    18. }
    19. private static Integer getPriceByS1() throws InterruptedException {
    20. TimeUnit.MILLISECONDS.sleep(5000);
    21. log.debug("电商S1询价信息1200");
    22. return 1200;
    23. }
    24. private static Integer getPriceByS2() throws InterruptedException {
    25. TimeUnit.MILLISECONDS.sleep(8000);
    26. log.debug("电商S2询价信息1000");
    27. return 1000;
    28. }
    29. private static Integer getPriceByS3() throws InterruptedException {
    30. TimeUnit.MILLISECONDS.sleep(3000);
    31. log.debug("电商S3询价信息800");
    32. return 800;
    33. }
    34. }

    打印结果:
    image.png
    如果获取电商S1报价的耗时很长,即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。

    1. @Slf4j
    2. public class CompletionServiceDemo {
    3. public static void main(String[] args) throws InterruptedException,ExecutionException {
    4. //创建线程池
    5. ExecutorService executorService =
    6. Executors.newFixedThreadPool(10);
    7. //创建completionService
    8. CompletionService<Integer> completionService = new
    9. ExecutorCompletionService >(executorService);
    10. //异步查询电商价格 函数式接口,不知道怎么用的时候就new
    11. completionService.submit(new Callable<Integer>() {
    12. @Override
    13. public Integer call() throws Exception {
    14. return getPriceByS1();
    15. }
    16. });
    17. completionService.submit(() > getPriceByS2());
    18. completionService.submit(CompleteServiceDemo :getPriceByS3);
    19. //将询价结果异步保存到数据库
    20. for (int i = 0; i < 3; i +) {
    21. Integer result = completionService.take().get();
    22. executorService.execute(() > save(result));
    23. }
    24. }
    25. private static void save(Integer r) {
    26. log.debug("保存询价结果:{}",r);
    27. }
    28. private static Integer getPriceByS1() throws InterruptedException {
    29. TimeUnit.MILLISECONDS.sleep(5000);
    30. log.debug("电商S1询价信息1200");
    31. return 1200;
    32. }
    33. private static Integer getPriceByS2() throws InterruptedException {
    34. TimeUnit.MILLISECONDS.sleep(8000);
    35. log.debug("电商S2询价信息1000");
    36. return 1000;
    37. }
    38. private static Integer getPriceByS3() throws InterruptedException {
    39. TimeUnit.MILLISECONDS.sleep(3000);
    40. log.debug("电商S3询价信息800");
    41. return 800;
    42. }
    43. }

    打印结果:
    image.png

    应用场景总结

  • 当需要批量提交异步任务的时候使用CompletionService。

CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批 量异步任务的管理更简单。

  • CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,可 以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
  • 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个 应用的风险。

    CompletableFuture

    CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。
    image.png
    CompletionStage接口: 默认的是ForkJoinPool.commonPool()公共线程池,最好自己指定线程池,否则可能公共线程池耗尽导致出现饥饿死锁。

    应用场景

    描述依赖关系:
    thenApply()把前面异步任务的结果,交给后面的Function
    thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
    描述and聚合关系:
    thenCombine: 任务合并,有返回值
    thenAccepetBoth: 两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
    runAfterBoth: 两个任务都执行完成后,执行下一步操作(Runnable)。
    描述or聚合关系:
    applyToEither: 两个任务谁执行的快,就使用那一个结果,有返回值。
    acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
    runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
    并行执行:
    CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行

    创建异步操作

    CompletableFuture 提供了四个静态方法来创建一个异步操作
    image.png默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option: - Djava.util.concurrent.ForkJoinPool.common.parallelism来设置 ForkJoinPool 线程池的线程数)。
    如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,尽量要根据不同的业务类型创建不同的线程池,以避免互相干扰 。

    使用

    runAsync&supplyAsync

    image.png
    打印结果:
    image.png
    获取结果
    join&get
    join()和get()方法都是用来获取CompletableFuture异步之后的返回值。

  • join() 方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。

  • get() 方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)

结果处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
image.png
Action的类型是BiConsumer,它可以处理正常的计算结果,或者异常情况。
⽅法不以Async结尾,意味着Action使⽤相同的线程执⾏,⽽Async可能会使⽤其它的线程去执⾏ (如果使⽤相同的线程池,也可能会被同⼀个线程选中执⾏)。
这⼏个⽅法都会返回CompletableFuture,当Action执⾏完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

whenComplete&exceptionally

image.png
打印结果:
image.png

Disruptor

简介

JUC的BlockingQueue存在的问题
1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。
2. 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。

设计方案

Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
利用缓存行填充解决了伪共享的问题
index为long占8字节,缓存行大小是64字节,因此额外填充了7个long,加上index刚好64字节。
实现了基于事件驱动的生产者消费者模型(观察者模式)
消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费。

RingBuffer数据结构

使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:
image.png

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出:

index=sequence % entries.length
也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)

  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉

问题:能覆盖数据是否会导致数据丢失呢?
当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的:
BlockingWaitStrategy策略,常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景。
SleepingWaitStrategy策略,会在循环中不断等待数据。先进行自旋等待如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志。
YieldingWaitStrategy策略,这个策略用于低延时的场合。消费者线程会不断循环监 控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。
BusySpinWaitStrategy策略: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用

单个生产者写数据的流程

  1. 申请写入m个元素;
    2. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素;
    3. 若是返回的正确,则生产者开始写入元素。
    image.png

    多个生产者写数据的流程

    问题:多个生产者的情况下,如何防止多个线程重复写同一个元素?
    Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。
    问题:但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。
    Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。(即2个环形数组同时工作,RingBuffer存放消息,AvaliableBuffer仅仅用于读写标志站位)
    当某个位置写入成功 的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
    生产者写数据
    多个生产者写入的时候:
    1. 申请写入m个元素;
    2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空 间;
    3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪 些位置是已经写入成功的。
    如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。
    image.png

    多个消费者读数据的流程

    生产者多线程写入的情况下读数据会复杂很多:
    1. 申请读取到序号n;
    2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始 读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
    3. 消费者读取元素。
    如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为 7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。
    image.png

    核心概念

    RingBuffer(环形缓冲区)
    基于数组的内存级别缓存,是创建sequencer(序号)与定 义WaitStrategy(拒绝策略)的入口。
    Disruptor(总体执行入口)
    对RingBuffer的封装,持有RingBuffer、消费者线程池 Executor、消费之集合ConsumerRepository等引用。 Sequence(序号分配器)
    对RingBuffer中的元素进行序号标记,通过顺序递增的方 式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进 度,同时还能消除伪共享。
    Sequencer(数据传输器)
    Sequencer里面包含了Sequence,是Disruptor的核 心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、 MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之 间快速、正确传递数据的并发算法。
    SequenceBarrier(消费者屏障)
    用于控制RingBuffer的Producer和Consumer之 间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
    WaitStrategy(消费者等待策略)
    决定了消费者如何等待生产者将Event生产进 Disruptor,WaitStrategy有多种实现策略。
    Event
    从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
    EventHandler
    由用户自定义实现,就是我们写消费者逻辑的地方,代表了 Disruptor中的一个消费者的接口。
    EventProcessor
    这是个事件处理器接口,实现了Runnable,处理主要事件循环, 处理Event,拥有消费者的Sequence。
    image.png