1 Future接口
相关链接:[https://blog.csdn.net/u014209205/article/details/80598209](https://blog.csdn.net/u014209205/article/details/80598209)
Future表示一个可能还没有完成的异步任务的结果;我们可以判断是否完成、是否取消、取消、获取返回值(获取返回值是阻塞的)
1.1 RunnableFuture 接口
实现了Future 和 Runnable 两个接口,同时具有 【执行某个任务】和【取消任务、判断是否取消、判断是否完成、获取执行结果】的能力;
其主要实现类是FutureTask;
2 CompetablFuture 类
这个链接不错:[https://blog.csdn.net/finalheart/article/details/87615546](https://blog.csdn.net/finalheart/article/details/87615546)
之前future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。自动等待上一个任务的结束再执行、将两个任务进行合并等操作;值得注意的是同步、异步的情况;部分场景有点小坑;用的时候参考一下帖子;
线程池,常见的四种线程池和区别
简述
为了彻底了解线程池的时候,我们需要弄清楚线程池创建的几个参数
- corepollsize : 核心池的大小,默认情况下,在创建线程池后,每当有新的任务来的时候,如果此时线程池中的线程数小于核心线程数,就会去创建一个线程执行(就算有空线程也不复用),当创建的线程数达到核心线程数之后,再有任务进来就会放入任务缓存队列中。当任务缓存队列也满了的时候,就会继续创建线程,知道达到最大线程数。如果达到最大线程数之后再有任务过来,那么就会采取拒绝服务策略。
- Maximumpoolsize : 线程池中最多可以创建的线程数
- keeplivetime : 线程空闲状态时,最多保持多久的时间会终止。默认情况下,当线程池中的线程数大于corepollsize 时,才会起作用 ,直到线程数不大于 corepollsize 。
- workQuque: 阻塞队列,用来存放等待的任务
- rejectedExecutionHandler :任务拒绝处理器(这个注意一下),有四种
(1)abortpolicy丢弃任务,抛出异常
(2)discardpolicy拒绝执行,不抛异常
(3)discardoldestpolicy 丢弃任务缓存队列中最老的任务
(4)CallerRunsPolicy 线程池不执行这个任务,主线程自己执行。LinkedBlockingDeque(链表同步阻塞队列)、 LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。 注:这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE。 newFixedThreadPool线程池使用了这个队列,按FIFO排序任务 ArrayBlockingQueue(数组同步阻塞队列)、 ArrayBlockingQueue是一个有界缓存等待队列。可以指定缓存队列的大小,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。,按FIFO排序量。 SynchronousQueue(同步阻塞队列) 总结一下:当我们使用同步阻塞队列时,同步阻塞队列没有大小,当runnable的数量小于等于线程池的大小时每执行一次 threadPoolExecutor.execute(runnable);线程池的大小会加1,当超过线程池的的大小时不再执行多余的任务并抛出RejectedExecutionException异常。 SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。 newCachedThreadPool线程池使用了这个队列。 使用说明:通常使用ArrayBlockingQueue就满足要求了,若在高并发下,建议使用LinkedBlockingQueue jdk提供的线程池实现中,都是用的 LinkedBlockingQueue 和 SynchronousQueue;没有用ArrayBlockingQueue
1、newFixedThreadPool 定长线程池
一个有指定的线程数的线程池,有核心的线程,里面有固定的线程数量,响应的速度快。正规的并发线程,多用于服务器。固定的线程数由系统资源设置。核心线程是没有超时机制的,队列大小没有限制,除非线程池关闭了核心线程才会被回收。
2、newCachedThreadPool 可缓冲线程池
只有非核心线程,最大线程数很大,每新来一个任务,当没有空余线程的时候就会重新创建一个线程,这边有一个超时机制,当空闲的线程超过60s内没有用到的话,就会被回收,它可以一定程序减少频繁创建/销毁线程,减少系统开销,适用于执行时间短并且数量多的任务场景。
用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。
3、ScheduledThreadPool 周期线程池
创建一个定长线程池,支持定时及周期性任务执行,通过过schedule方法可以设置任务的周期执行
4、newSingleThreadExecutor 单任务线程池
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,每次任务到来后都会进入阻塞队列,然后按指定顺序执行。
- newCachedThreadPool:
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
- newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
- newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
- newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行
随着任务数量的增加,会增加活跃的线程数。
当活跃的线程数 = 核心线程数,此时不再增加活跃线程数,而是往任务队列里堆积。
当任务队列堆满了,随着任务数量的增加,会在核心线程数的基础上加开线程。
直到活跃线程数 = 最大线程数,就不能增加线程了。
如果此时任务还在增加,则: 任务数11 > 最大线程数8 + 队列长度2 ,抛出异常RejectedExecutionException,拒绝任务
关键源码解读
1、execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前线程个数是否小于corePoolSize(核心线程数),如果小于的话,就再创建一个线程,每个线程都被封装成一个Worker,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果大于核心线程的话就尝试把任务加入缓存队列,这里增加了状态出现异常的确认判断,
//如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列放不了,只能采用默认的拒绝服务策略了,
else if (!addWorker(command, false))
reject(command);
}
源码中出现ctl的次数比较多,那么这是个什么呢?
我们可以看看它的定义
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
方法
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
它实质上就是一个线程安全的32位的Integer,用前三位表示线程池的状态,后29位来表示线程的个数,所以在计算个数的时候用到了workerCountOf,忽略前三位带来的影响。
2、Worker中的run方法
Worker就是对线程的封装,线程池中维护了一个HashSet的一个集合来存储工作线程,每次addWork的时候就往这个里面加,因为HashSet是不安全的,所以加了ReentrantLock来做同步
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其实这个操作很简单,就是一个while不断的去getTask,获得任务之后,就依次执行
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
(发现每次执行任务的时候都加了锁,有点奇怪,这里我还要看一下)
那么假设任务队列中没有了呢?那这里就用到了我们定义的keeplivetime ,在getTask中有这样一段代码,
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
也就是当超过keeplivetime 没有拿到就会返回null,这个时候循环就会截止,这个线程Wo也就会结束,所以说keepAliveTime指的是最长的poll时间