关于Future的使用,务必掌握,掌握了Future就掌握了未来
Callable
直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有缺陷:不能返回一个返回值;不能抛出checked Exception。
Callable的call方法可以有返回值,可以声明抛出异常。和Callable配合的有一个Future类,通过Future可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable做不到的,Callable的功能要比Runnable强大。
Future
主要功能
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
public interface Future<V> {
// 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isCancelled();
// 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
boolean isDone();
// 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常,
//ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
V get() throws InterruptedException, ExecutionException;
// 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的
//单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask
利用FutureTask创建Future
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过FutureTask存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的FutureTask被转型为Future接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这 个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Task task = new Task();
//构建futureTask
FutureTask<Integer> futureTask = new FutureTask<>(task);
//作为Runnable入参
new Thread(futureTask).start();
System.out.println("task运行结果:"+futureTask.get());
}
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程正在计算");
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
}
注:
1.当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
2.Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
3.Future task.get()阻塞的是主线程
局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束 后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身 也确实存在着许多限制:
并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以只能等待结果;
无法对多个任务进行链式调用:如果希望在计算任务完成后执行特定动作,Future却没有提供这样的能 力;
无法组合多个任务:如果运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中 这是无能为力的;
没有异常处理:Future接口中没有关于异常处理的方法。
问题: Callable和Future是否产生了新的线程?
没有。Future表示一个异步计算结果,实现任务和线程解耦
CompletionService
Callable+Future 可以实现多个task并行执行,但是如果遇到task执行较慢时需要阻塞等待前面的task执 行完后面task才能取得结果。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。 让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
原理
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一 个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
适合批量的task写操作,写的时候不会阻塞。take(),阻塞队列取任务,再get()
使用案例
询价应用:向不同电商平台询价,并保存价格
采用 ThreadPoolExecutor + Future 的方案:异步执行询价然后再保存
@Slf4j
public class CompletionServiceDemo {
public static void main(String[] args) throws InterruptedException,ExecutionException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(() -> getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 = executor.submit(() -> getPriceByS2());
// 获取电商S1报价并异步保存
executor.execute(() -> save(f1.get()));
// 获取电商S2报价并异步保存
executor.execute(() -> save(f2.get());
executor.shutdown();
}
private static void save(Integer r) {
log.debug("保存询价结果:{}",r);
}
private static Integer getPriceByS1() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(5000);
log.debug("电商S1询价信息1200");
return 1200;
}
private static Integer getPriceByS2() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(8000);
log.debug("电商S2询价信息1000");
return 1000;
}
private static Integer getPriceByS3() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(3000);
log.debug("电商S3询价信息800");
return 800;
}
}
打印结果:
如果获取电商S1报价的耗时很长,即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。@Slf4j
public class CompletionServiceDemo {
public static void main(String[] args) throws InterruptedException,ExecutionException {
//创建线程池
ExecutorService executorService =
Executors.newFixedThreadPool(10);
//创建completionService
CompletionService<Integer> completionService = new
ExecutorCompletionService >(executorService);
//异步查询电商价格 函数式接口,不知道怎么用的时候就new
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return getPriceByS1();
}
});
completionService.submit(() > getPriceByS2());
completionService.submit(CompleteServiceDemo :getPriceByS3);
//将询价结果异步保存到数据库
for (int i = 0; i < 3; i +) {
Integer result = completionService.take().get();
executorService.execute(() > save(result));
}
}
private static void save(Integer r) {
log.debug("保存询价结果:{}",r);
}
private static Integer getPriceByS1() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(5000);
log.debug("电商S1询价信息1200");
return 1200;
}
private static Integer getPriceByS2() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(8000);
log.debug("电商S2询价信息1000");
return 1000;
}
private static Integer getPriceByS3() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(3000);
log.debug("电商S3询价信息800");
return 800;
}
}
应用场景总结
当需要批量提交异步任务的时候使用CompletionService。
CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批 量异步任务的管理更简单。
- CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,可 以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个 应用的风险。
CompletableFuture
CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。
CompletionStage接口: 默认的是ForkJoinPool.commonPool()公共线程池,最好自己指定线程池,否则可能公共线程池耗尽导致出现饥饿死锁。应用场景
描述依赖关系:
thenApply()把前面异步任务的结果,交给后面的Function
thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
描述and聚合关系:
thenCombine: 任务合并,有返回值
thenAccepetBoth: 两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
runAfterBoth: 两个任务都执行完成后,执行下一步操作(Runnable)。
描述or聚合关系:
applyToEither: 两个任务谁执行的快,就使用那一个结果,有返回值。
acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
并行执行:
CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行创建异步操作
CompletableFuture 提供了四个静态方法来创建一个异步操作
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option: - Djava.util.concurrent.ForkJoinPool.common.parallelism来设置 ForkJoinPool 线程池的线程数)。
如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,尽量要根据不同的业务类型创建不同的线程池,以避免互相干扰 。使用
runAsync&supplyAsync
打印结果:
获取结果
join&get
join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join() 方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。
- get() 方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
结果处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
Action的类型是BiConsumer,它可以处理正常的计算结果,或者异常情况。
⽅法不以Async结尾,意味着Action使⽤相同的线程执⾏,⽽Async可能会使⽤其它的线程去执⾏ (如果使⽤相同的线程池,也可能会被同⼀个线程选中执⾏)。
这⼏个⽅法都会返回CompletableFuture,当Action执⾏完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。
whenComplete&exceptionally
Disruptor
简介
JUC的BlockingQueue存在的问题
1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。
2. 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。
设计方案
Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
利用缓存行填充解决了伪共享的问题
index为long占8字节,缓存行大小是64字节,因此额外填充了7个long,加上index刚好64字节。
实现了基于事件驱动的生产者消费者模型(观察者模式)
消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费。
RingBuffer数据结构
使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:
- Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出:
index=sequence % entries.length
也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
- 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉
问题:能覆盖数据是否会导致数据丢失呢?
当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的:
BlockingWaitStrategy策略,常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景。
SleepingWaitStrategy策略,会在循环中不断等待数据。先进行自旋等待如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志。
YieldingWaitStrategy策略,这个策略用于低延时的场合。消费者线程会不断循环监 控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。
BusySpinWaitStrategy策略: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用
单个生产者写数据的流程
- 申请写入m个元素;
2. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素;
3. 若是返回的正确,则生产者开始写入元素。
多个生产者写数据的流程
问题:多个生产者的情况下,如何防止多个线程重复写同一个元素?
Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。
问题:但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。
Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。(即2个环形数组同时工作,RingBuffer存放消息,AvaliableBuffer仅仅用于读写标志站位)
当某个位置写入成功 的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
生产者写数据
多个生产者写入的时候:
1. 申请写入m个元素;
2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空 间;
3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪 些位置是已经写入成功的。
如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。
多个消费者读数据的流程
生产者多线程写入的情况下读数据会复杂很多:
1. 申请读取到序号n;
2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始 读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
3. 消费者读取元素。
如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为 7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。
核心概念
RingBuffer(环形缓冲区)
基于数组的内存级别缓存,是创建sequencer(序号)与定 义WaitStrategy(拒绝策略)的入口。
Disruptor(总体执行入口)
对RingBuffer的封装,持有RingBuffer、消费者线程池 Executor、消费之集合ConsumerRepository等引用。 Sequence(序号分配器)
对RingBuffer中的元素进行序号标记,通过顺序递增的方 式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进 度,同时还能消除伪共享。
Sequencer(数据传输器)
Sequencer里面包含了Sequence,是Disruptor的核 心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、 MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之 间快速、正确传递数据的并发算法。
SequenceBarrier(消费者屏障)
用于控制RingBuffer的Producer和Consumer之 间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
WaitStrategy(消费者等待策略)
决定了消费者如何等待生产者将Event生产进 Disruptor,WaitStrategy有多种实现策略。
Event
从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
EventHandler
由用户自定义实现,就是我们写消费者逻辑的地方,代表了 Disruptor中的一个消费者的接口。
EventProcessor
这是个事件处理器接口,实现了Runnable,处理主要事件循环, 处理Event,拥有消费者的Sequence。