1 Future接口

  1. 相关链接:[https://blog.csdn.net/u014209205/article/details/80598209](https://blog.csdn.net/u014209205/article/details/80598209)

4.4.19 线程池相关 - 图1

Future表示一个可能还没有完成的异步任务的结果;我们可以判断是否完成、是否取消、取消、获取返回值(获取返回值是阻塞的)

1.1 RunnableFuture 接口

实现了Future 和 Runnable 两个接口,同时具有 【执行某个任务】和【取消任务、判断是否取消、判断是否完成、获取执行结果】的能力;

其主要实现类是FutureTask;

2 CompetablFuture 类

这个链接不错:[https://blog.csdn.net/finalheart/article/details/87615546](https://blog.csdn.net/finalheart/article/details/87615546)

之前future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。自动等待上一个任务的结束再执行、将两个任务进行合并等操作;值得注意的是同步、异步的情况;部分场景有点小坑;用的时候参考一下帖子;


线程池,常见的四种线程池和区别

简述

为了彻底了解线程池的时候,我们需要弄清楚线程池创建的几个参数

  • corepollsize : 核心池的大小,默认情况下,在创建线程池后,每当有新的任务来的时候,如果此时线程池中的线程数小于核心线程数,就会去创建一个线程执行(就算有空线程也不复用),当创建的线程数达到核心线程数之后,再有任务进来就会放入任务缓存队列中。当任务缓存队列也满了的时候,就会继续创建线程,知道达到最大线程数。如果达到最大线程数之后再有任务过来,那么就会采取拒绝服务策略
  • Maximumpoolsize : 线程池中最多可以创建的线程数
  • keeplivetime : 线程空闲状态时,最多保持多久的时间会终止。默认情况下,当线程池中的线程数大于corepollsize 时,才会起作用 ,直到线程数不大于 corepollsize 。
  • workQuque: 阻塞队列,用来存放等待的任务
  • rejectedExecutionHandler :任务拒绝处理器(这个注意一下),有四种

(1)abortpolicy丢弃任务,抛出异常
(2)discardpolicy拒绝执行,不抛异常
(3)discardoldestpolicy 丢弃任务缓存队列中最老的任务
(4)CallerRunsPolicy 线程池不执行这个任务,主线程自己执行。

LinkedBlockingDeque(链表同步阻塞队列)、 LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。 注:这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE。 newFixedThreadPool线程池使用了这个队列,按FIFO排序任务 ArrayBlockingQueue(数组同步阻塞队列)、 ArrayBlockingQueue是一个有界缓存等待队列。可以指定缓存队列的大小,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。,按FIFO排序量。 SynchronousQueue(同步阻塞队列) 总结一下:当我们使用同步阻塞队列时,同步阻塞队列没有大小,当runnable的数量小于等于线程池的大小时每执行一次 threadPoolExecutor.execute(runnable);线程池的大小会加1,当超过线程池的的大小时不再执行多余的任务并抛出RejectedExecutionException异常。 SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。 newCachedThreadPool线程池使用了这个队列。 使用说明:通常使用ArrayBlockingQueue就满足要求了,若在高并发下,建议使用LinkedBlockingQueue jdk提供的线程池实现中,都是用的 LinkedBlockingQueue 和 SynchronousQueue;没有用ArrayBlockingQueue


1、newFixedThreadPool 定长线程池

一个有指定的线程数的线程池,有核心的线程,里面有固定的线程数量,响应的速度快。正规的并发线程,多用于服务器。固定的线程数由系统资源设置。核心线程是没有超时机制的,队列大小没有限制,除非线程池关闭了核心线程才会被回收。

2、newCachedThreadPool 可缓冲线程池

只有非核心线程,最大线程数很大,每新来一个任务,当没有空余线程的时候就会重新创建一个线程,这边有一个超时机制,当空闲的线程超过60s内没有用到的话,就会被回收,它可以一定程序减少频繁创建/销毁线程,减少系统开销,适用于执行时间短并且数量多的任务场景。
用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。

3、ScheduledThreadPool 周期线程池

创建一个定长线程池,支持定时及周期性任务执行,通过过schedule方法可以设置任务的周期执行

4、newSingleThreadExecutor 单任务线程池

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,每次任务到来后都会进入阻塞队列,然后按指定顺序执行。

  1. newCachedThreadPool:
  2. newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
  3. newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
  4. newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
  5. newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行

随着任务数量的增加,会增加活跃的线程数。
当活跃的线程数 = 核心线程数,此时不再增加活跃线程数,而是往任务队列里堆积。
当任务队列堆满了,随着任务数量的增加,会在核心线程数的基础上加开线程。
直到活跃线程数 = 最大线程数,就不能增加线程了。
如果此时任务还在增加,则: 任务数11 > 最大线程数8 + 队列长度2 ,抛出异常RejectedExecutionException,拒绝任务

四种线程池的特色

关键源码解读

1、execute方法
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //判断当前线程个数是否小于corePoolSize(核心线程数),如果小于的话,就再创建一个线程,每个线程都被封装成一个Worker,
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果大于核心线程的话就尝试把任务加入缓存队列,这里增加了状态出现异常的确认判断,
    //如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果队列放不了,只能采用默认的拒绝服务策略了,
    else if (!addWorker(command, false))
        reject(command);
    }

源码中出现ctl的次数比较多,那么这是个什么呢?
我们可以看看它的定义

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
方法
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

它实质上就是一个线程安全的32位的Integer,用前三位表示线程池的状态,后29位来表示线程的个数,所以在计算个数的时候用到了workerCountOf,忽略前三位带来的影响。

2、Worker中的run方法

Worker就是对线程的封装,线程池中维护了一个HashSet的一个集合来存储工作线程,每次addWork的时候就往这个里面加,因为HashSet是不安全的,所以加了ReentrantLock来做同步

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
    }

其实这个操作很简单,就是一个while不断的去getTask,获得任务之后,就依次执行
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
(发现每次执行任务的时候都加了锁,有点奇怪,这里我还要看一下)
那么假设任务队列中没有了呢?那这里就用到了我们定义的keeplivetime ,在getTask中有这样一段代码,

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
也就是当超过keeplivetime 没有拿到就会返回null,这个时候循环就会截止,这个线程Wo也就会结束,所以说keepAliveTime指的是最长的poll时间