一、Executor&ExecutorService简介

虽然大多数情况下,我们更喜欢将Executor或ExecutorService直接称之为“线程池”,但是事实上这两个接口只定义了任务(Runnable/Callable)被提交执行的相关接口。

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

ExecutorService接口继承自Executor接口,并且提供了更多用于任务提交和管理的一些方法,比如停止任务的执行等

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. List<Runnable> shutdownNow();
  4. boolean isShutdown();
  5. boolean isTerminated();
  6. boolean awaitTermination(long timeout, TimeUnit unit)
  7. throws InterruptedException;
  8. <T> Future<T> submit(Callable<T> task);
  9. <T> Future<T> submit(Runnable task, T result);
  10. Future<?> submit(Runnable task);
  11. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  12. throws InterruptedException;
  13. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  14. long timeout, TimeUnit unit)
  15. throws InterruptedException;
  16. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  17. throws InterruptedException, ExecutionException;
  18. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  19. long timeout, TimeUnit unit)
  20. throws InterruptedException, ExecutionException, TimeoutException;
  21. }

二、ThreadPoolExecutor详解

线程池主要解决了两个不同的问题:由于任务的异步提交,因此在执行大量的异步任务时可以提升系统性能;另外它还提供了限制和管理资源的方法,包括线程池中的工作线程、线程池任务队列中的任务,除此之外,每一个ThreadPoolExecutor还维护了一些基本的统计信息,比如已经完成的任务数量等。

public abstract class AbstractExecutorService implements ExecutorService {
    ...
}

2.1、ThreadPoolExecutor的主要方法

public class ThreadPoolExecutor extends AbstractExecutorService {


        public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    ...
}

2.1、ThreadPoolExecutor的简单使用

    public static void main(String[] args) throws  Exception{

        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(3,4,30,
                TimeUnit.SECONDS,new ArrayBlockingQueue<>(4)
                ,Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());

        for (int i=0;i<7;i++){
            threadPoolExecutor.execute(
                    ()->{
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName());
                    }
            );
        }
        System.out.println(threadPoolExecutor.getActiveCount());
        System.out.println(threadPoolExecutor.getQueue().isEmpty());
        threadPoolExecutor.shutdown();
    }

2.2、构造参数详解

  • corePoolSize:用于指定在线程池中维护的核心线程数量,即使当前线程池中的核心线程不工作,核心线程的数量也不会减少(在JDK1.6版本及以后可以通过设置允许核心线程超时的方法allowCoreThreadTimeOut来改变这种情况)。
  • maximumPoolSize:用于设置线程池中允许的线程数量的最大值。
  • keepAliveTime:当线程池中的线程数量超过核心线程数并且处于空闲时,线程池将回收一部分线程让出系统资源,该参数可用于设置超过corePoolSize数量的线程在多长时间后被回收,与unit配合使用。
  • TimeUnit:用于设定keepAliveTime的时间单位。
  • workQueue:用于存放已提交至线程池但未被执行的任务。
  • ThreadFactory:用于创建线程的工厂,开发者可以通过自定义ThreadFactory来创建线程,比如,根据业务名为线程命名、设置线程优先级、设置线程是否为守护线程等、设置线程所属的线程组等。
  • RejectedExecutionHandler:当任务数量超过阻塞队列边界时,这个时候线程池就会拒绝新增的任务,该参数主要用于设置拒绝策略。

ThreadPoolExecutor的构造比较复杂,除了其对每一个构造参数都有一定的要求之外(比如,不能为null),个别构造参数之间也存在一定的约束关系。

  • TimeUnit、workQueue、ThreadFactory、RejectedExecutionHandler不能为null。
  • corePoolSize可以设置为0,但不能小于0,并且corePoolSize不能大于线程的最大数量(maximumPoolSize)。

2.4、执行任务方法详解

程池被成功构造后,其内部的运行线程并不会立即被创建,ThreadPoolExecutor的核心线程将会采用一种Lazy(懒)的方式来创建并且运行,当线程池被创建,并且首次调用执行任务方法时才会创建,并且运行线程。

▪ 线程池核心线程数量大于0,并且首次提交任务时,线程池会立即创建线程执行该任务,并且该任务不会被存入任务队列之中。
▪ 当线程池中的活跃(工作)线程大于等于核心线程数量并且任务队列未满时,任务队列中的任务不会立即执行,而是等待工作线程空闲时轮询任务队列以获取任务。
▪ 当任务队列已满且工作线程小于最大线程数量时,线程池会创建线程执行任务,但是线程数量不会超过最大线程数,下面将上一段代码的最大循环数修改为N(最大线程数+任务队列size),会发现同时有maximumPoolSize个线程在工作。
▪ 当任务队列已满且线程池中的工作线程达到最大线程数量,并且此刻没有空闲的工作线程时,会执行任务拒绝策略,任务将以何种方式被拒绝完全取决于构造ThreadExecutorPool时指定的拒绝策略。若将执行任务的循环最大次数更改为15,再次执行时会发现只有14个任务被执行,第15个任务被丢弃(这里指定的拒绝策略为丢弃)。
▪ 若线程池中的线程是空闲的且空闲时间达到指定的keepAliveTime时间,线程会被线程池回收(最多保留corePoolSize数量个线程),当然如果设置允许线程池中的核心线程超时,那么线程池中所有的工作线程都会被回收。

2.5、ThreadFactory详解

在ThreadExecutorPool的构造参数中提供了一个接口ThreadFactory,用于定义线程池中的线程(Thread),我们可以通过该接口指定线程的命名规则、优先级、是否为daemon守护线程等信息

public interface ThreadFactory {

    Thread newThread(Runnable r);
}

2.6、拒绝策略RejectedExecutionHandler

当线程池中没有空闲的工作线程,并且任务队列已满时,新的任务将被执行拒绝策略,在ThreadPoolExecutor中提供了4种形式的拒绝策略,当然它还允许开发者自定义拒绝策略。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  • DiscardPolicy:丢弃策略,任务会被直接无视丢弃而等不到执行,因此该策略需要慎重使用。
  • AbortPolicy:中止策略,在线程池中使用该策略,在无法受理任务时会抛出拒绝执行异常RejectedExecutionException(运行时异常)。
  • DiscardOldestPolicy:丢弃任务队列中最老任务的策略(最早进入任务队列中的任务并不一定是最早(老)的,比如,优先级阻塞队列会根据排序规则来决定将哪个任务放在队头)。
  • CallerRunsPolicy:调用者线程执行策略,前面的三种拒绝策略要么会在执行execute方法时抛出异常,要么会将任务丢弃。该策略不会导致新任务的丢失,但是任务会在当前线程中被阻塞地执行,也就是说任务不会由线程池中的工作线程执行。

2.7、ThreadPoolExecutor的其他方法

public class ThreadPoolExecutor extends AbstractExecutorService {


}

image.png

三、ScheduledExecutorService详解

ScheduledExecutorService继承自ExecutorService,并且提供了任务被定时执行的特性,我们可以使用ScheduledExecutorService的实现ScheduledThreadPoolExecutor来完成某些特殊的任务执行,比如使某任务根据设定的周期来运行,或者在某个指定的时间来执行任务等。

3.1、

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

                        /**
                         *     该方法是一个one-shot方法(只执行一次),
                         *     任务(callable)会在单位(unit)时间(delay)后被执行,
                         *     并且立即返回ScheduledFuture,
                         *     在稍后的程序中可以通过Future获取异步任务的执行结果。
                         */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

         /**
         * 该方法同样是一个one-shot方法(只执行一次),
         * 任务(runnable)会在单位(unit)时间(delay)后被执行,虽然也会返回ScheduledFuture,
         * 但是并不会包含任何执行结果,因为Runnable接口的run方法本身就是无返回值类型的接口方法,
         * 不过可以通过该Future判断任务是否执行结束。
         */
     public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

     /**
         * 任务(command)会根据固定的速率(period,时间单位为unit)在时间(initialDelay,时间单位为unit)后不断地被执行。
         */
       public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

    //该方法与前一个方法比较类似,只不过该方法将以固定延迟单位时间的方式执行任务。
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

}

案例一

    public static void main(String[] args) throws  Exception{
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor=new ScheduledThreadPoolExecutor(1);

        ScheduledFuture<String> schedule = scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("===");
            return "123";
        }, 5, TimeUnit.SECONDS);

        System.out.println(schedule.get());
        scheduledThreadPoolExecutor.shutdown();
    }

案例二

    public static void main(String[] args) throws  Exception{
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor=new ScheduledThreadPoolExecutor(1);

        ScheduledFuture<?> schedule = scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("===");
        }, 5, TimeUnit.SECONDS);

        System.out.println(schedule.get());
        scheduledThreadPoolExecutor.shutdown();
    }

案例三:scheduleAtFixedRate和scheduleWithFixedDelay比较类似

    public static void main(String[] args) throws  Exception{
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor=new ScheduledThreadPoolExecutor(1);

        /**相差2秒
         * Sat Aug 28 16:08:38 CST 2021
         * Sat Aug 28 16:08:40 CST 2021
         * Sat Aug 28 16:08:42 CST 2021
         * Sat Aug 28 16:08:44 CST 2021
         * Sat Aug 28 16:08:46 CST 2021
         * Sat Aug 28 16:08:48 CST 2021
         */
        ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new Date());
        }, 5, 2, TimeUnit.SECONDS);

         /** 相差4s
         * Sat Aug 28 16:07:33 CST 2021
         * Sat Aug 28 16:07:37 CST 2021
         * Sat Aug 28 16:07:41 CST 2021
         * Sat Aug 28 16:07:45 CST 2021
         * Sat Aug 28 16:07:49 CST 2021
         * Sat Aug 28 16:07:53 CST 2021
         */
        ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new Date());
        }, 5, 2, TimeUnit.SECONDS);
    }

四、关闭ExecutorService

如果ExecutorService在接下来的程序执行中将不再被使用,则需要将其关闭以释放工作线程所占用的系统资源,ExecutorService接口定义了几种不同形式的关闭方式

4.1、有序关闭(shutdown)

shutdown提供了一种有序关闭ExecutorService的方式,当该方法被执行后新的任务提交将会被拒绝,但是工作线程正在执行的任务以及线程池任务(阻塞)队列中已经被提交的任务还是会执行,当所有的提交任务都完成后线程池中的工作线程才会销毁进而达到ExecutorService最终被关闭的目的。

该方法是立即返回方法,它并不会阻塞等待所有的任务处理结束及ExecutorService最终的关闭,因此如果你想要确保线程池彻底被关闭之后才进行下一步的操作,那么这里可以配合另外一个等待方法awaitTermination使当前线程进入阻塞等待ExecutorService关闭结束后再进行下一步的动作。

4.2、立即关闭(shutdownNow)

shutdownNow方法首先会将线程池状态修改为shutdown状态,然后将未被执行的任务挂起并从任务队列中排干,其次会尝试中断正在进行任务处理的工作线程,最后返回未被执行的任务,当然,对一个执行了shutdownNow的线程池提交新的任务同样会被拒绝。

4.3、组合关闭(shutdown&shutdownNow)

五、Executors详解

要创建一个ExecutorService,尤其是ThreadPoolExecutor是比较复杂的,Java并发包中提供了类似于工厂方法的类,用于创建不同的ExecutorService,当然还包括拒绝策略、ThreadFactory等

5.1、FixedThreadPool

5.2、SingleThreadPool

5.3、CachedThreadPool

5.4、ScheduledThreadPool

5.5、WorkStealingPool