线程池

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。

线程池是一种多线程处理形式,在系统启动时即创建大量空闲的线程,处理过程中程序将任务添加到队列,线程池就会启动一条线程来执行这个任务。执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

Java1.5引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池。而不需要关心该任务是如何执行、被哪个线程执行以及什么时候执行。

线程池的优势

合理利用线程池能够带来三个好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java线程池工作流程

基本组成:
1、线程池管理器 用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;

2、核心线程池 核心线程,也就是正在处理任务的线程。如果设置了线程复用,一个线程在执行完一个任务之后不能直接退出,需要重新去队列任务中获取新的任务来执行,如果任务队列中没有任务,且keepAliveTime没有被设置,那么这个工作线程将一直阻塞下去指导有新的任务可执行,这样就达到了线程复用的目的;

3、工作队列(任务队列) 用于存放没有处理的任务。当正在工作的线程也就是核心线程数达到最大核心线程数时,将任务加入到工作队列进行等待,实现缓冲机制。这样一来,当一些核心线程工作完成且可以复用的时候,就可以从工作队列中获取任务去执行。

线程池工作流程如下图所示:

线程池 - 图1

  1. 线程池判断核心线程池(基本线程池)是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。

  2. 线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

  3. 线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

Java线程池的使用分析

Java提供了线程池的支持-java.util.concurrent.Executor,Executor是一个接口:

  1. /**
  2. * @since 1.5
  3. * @author Doug Lea
  4. */
  5. public interface Executor {
  6. //
  7. void execute(Runnable command);
  8. }

基础架构图如下:
image.png

  • Executor是一个顶级接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
  • ExecutorService,继承并扩展了Executor添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法)
  • AbstractExecutorService分支就是普通的线程池分支,ScheduledExecutorService是用来创建定时任务的。
  • ThreadPoolExecutor是最常用的线程池实现,继承并实现了AbstractExecutorService抽象类
  • Executors 线程池工厂类。

作为线程池,Executor基于”生产者-消费者”模式,提交任务的一方相当于生产者,执行任务的则相当于消费者。
ThreadPoolExecutor是Executor接口的一个重要的实现类,是线程池的具体实现,用来管理执行任务的线程。

ExecutorService的主要方法

ExecutorService作为Executor的扩展,增加了部分关于生命周期的方法。

execute(Runnable)

接收一个Runnable实例(任务),并且异步的执行。
该方法无法获知任务执行结果

  1. void execute(Runnable command);

submit(Runnable)

submit(Runnable)和execute(Runnable)区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕(调用future.get返回null表示执行完毕)

  1. Future<?> submit(Runnable task);

**

submit(Runnable task, T result)

虽然submit传入Runnable不能直接返回内容,但是可以通过submit(Runnable task, T result)传入一个载体,通过这个载体获取返回值。这个其实不能算返回值了,是交给线程处理一下。

  1. <T> Future<T> submit(Runnable task, T result);

submit(Callable)

submit(Callable)和submit(Runnable)类似,也会返回一个Future对象
但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值

  1. <T> Future<T> submit(Callable<T> task);

Submit 和 execute 方法区别 1、有返回值和无返回值 2、Task 不一样 submit创建的是futuretask,execute则是task本身 3、submit内部依旧调用execute

invokeAny(…)

返回第一个执行完毕的任务的Future对象,其余未执行完毕任务被取消

  1. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  2. throws InterruptedException, ExecutionException;
  3. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  4. long timeout, TimeUnit unit)
  5. throws InterruptedException, ExecutionException, TimeoutException;

invokeAll(…)

调用存在于参数集合中的所有 Callable 对象,并且返回一個包含 Future 对象的集合,你可以通过这個返回的集合来管理每个 Callable 的执行结果。

任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这個差异。

  1. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2. throws InterruptedException;
  3. // 超时后没有任务完成则返回null。
  4. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  5. long timeout, TimeUnit unit)
  6. throws InterruptedException;

shutdown()

通知关闭线程池,ExecutorService 并不会马上关闭,而是不再接收新的任务,一但所有的线程执行完成当前任务,ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。

  1. void shutdown();

shutdownNow()

立即关闭线程池,这个方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。
但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能他们真的被关闭掉了,也有可能它会一直执行到任务结束

  1. List<Runnable> shutdownNow();

isShutdown()

判断是否已经在shutdown

  1. boolean isShutdown();

isTerminated()

判断是否已经关闭线程池

  1. boolean isTerminated();


awaitTermination(….)

当使用awaitTermination时,主线程会处于一种等待的状态,等待线程池中所有的线程都运行完毕或超过等待时长后才继续运行。
而当主线程等待超时之后,即便回到主线程,子线程的操作依旧会继续,awaitTermination 是不会中断线程的。

  1. boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException;

合理的操作应该是通过shutdown()方法和awaitTermination(…)方法去关闭线程池和保证线程池与主线程的同步。

ThreadPoolExecutor主要属性

corePoolSize int 核心线程池大小
maximumPoolSize int 最大线程池大小
keepAliveTime long 线程最大空闲时间
unit TimeUnit 时间单位
workQueue BlockingQueue 线程等待队列
threadFactory ThreadFactory 线程创建工厂
handler RejectedExecutionHandler 拒绝策略

JDK四种拒绝策略(饱和策略)

拒绝策略指的是当线程池已满的情况下,对新入任务应该采取的应对措施

  1. public interface RejectedExecutionHandler {
  2. void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
  3. }

JDK默认携带的策略有如下四种,全都实现了RejectedExecutionHandler接口,定义在ThreadPoolExecutor类中:
image.png

1、AbortPolicy (默认)

抛出 RejectedExecutionException异常

2、CallerRunsPolicy

调用当前线程池所在的线程去执行

3、DiscardPolicy

直接丢弃当前任务

4、DiscardOldestPolicy

将最旧的任务丢弃,将当前任务添加到队列。

线程池五种状态

线程池有五种状态,维护在ThreadPoolExecutor类中

  1. /**
  2. *RUNNING: Accept new tasks and process queued tasks
  3. * SHUTDOWN: Don't accept new tasks, but process queued tasks
  4. * STOP: Don't accept new tasks, don't process queued tasks,
  5. * and interrupt in-progress tasks
  6. * TIDYING: All tasks have terminated, workerCount is zero,
  7. * the thread transitioning to state TIDYING
  8. * will run the terminated() hook method
  9. * TERMINATED: terminated() has completed
  10. */
  11. // runState is stored in the high-order bits
  12. private static final int RUNNING = -1 << COUNT_BITS;
  13. private static final int SHUTDOWN = 0 << COUNT_BITS;
  14. private static final int STOP = 1 << COUNT_BITS;
  15. private static final int TIDYING = 2 << COUNT_BITS;
  16. private static final int TERMINATED = 3 << COUNT_BITS;

RUNNING(-1)

线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。

SHUTDOWN(0)

线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务

STOP(1)

线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务

TIDYING(2)

当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

TERMINATED(3)

线程池彻底终止,就变成TERMINATED状态

五种状态间的转换如下:
线程池 - 图4

线程池五种类型

ThreadPoolExecutor的构造方法:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }
  9. public ThreadPoolExecutor(int corePoolSize,
  10. int maximumPoolSize,
  11. long keepAliveTime,
  12. TimeUnit unit,
  13. BlockingQueue<Runnable> workQueue,
  14. ThreadFactory threadFactory) {
  15. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  16. threadFactory, defaultHandler);
  17. }
  18. public ThreadPoolExecutor(int corePoolSize,
  19. int maximumPoolSize,
  20. long keepAliveTime,
  21. TimeUnit unit,
  22. BlockingQueue<Runnable> workQueue,
  23. RejectedExecutionHandler handler) {
  24. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  25. Executors.defaultThreadFactory(), handler);
  26. }
  27. /**
  28. * @param corePoolSize 核心线程池最大线程数
  29. * @param maximumPoolSize 线程池最大线程数
  30. * @param keepAliveTime 线程最大等待存活时长
  31. * @param unit 存活时长的时间单位
  32. * @param workQueue 工作队列(任务队列)
  33. * @param threadFactory 线程工厂(用于创建线程)
  34. * @param handler 拒绝策略
  35. public ThreadPoolExecutor(int corePoolSize,
  36. int maximumPoolSize,
  37. long keepAliveTime,
  38. TimeUnit unit,
  39. BlockingQueue<Runnable> workQueue,
  40. ThreadFactory threadFactory,
  41. RejectedExecutionHandler handler) {
  42. // 合法校验
  43. if (corePoolSize < 0 ||
  44. maximumPoolSize <= 0 ||
  45. maximumPoolSize < corePoolSize ||
  46. keepAliveTime < 0)
  47. throw new IllegalArgumentException();
  48. if (workQueue == null || threadFactory == null || handler == null)
  49. throw new NullPointerException();
  50. // AccessControlContext用于根据其封装的上下文做出系统资源访问决策
  51. this.acc = System.getSecurityManager() == null ?
  52. null :
  53. AccessController.getContext();
  54. // 设置基本属性
  55. this.corePoolSize = corePoolSize;
  56. this.maximumPoolSize = maximumPoolSize;
  57. this.workQueue = workQueue;
  58. this.keepAliveTime = unit.toNanos(keepAliveTime);
  59. this.threadFactory = threadFactory;
  60. this.handler = handler;
  61. }

正常来说,我们会通过Executors 线程池工厂类创建线程池,而Executors则提供了创建五种不同类型线程池的方法:

(1) FixedThreadPool—-定长线程池

**

  1. - **corePoolSize == maximumPoolSize
  2. - **workQueue = LinkedBlockingQueue**

**
创建一个线程数量固定的线程池,可控制线程最大并发数,超出的任务会在队列中等待

适用于为了满足资源管理的需求,而需要适当限制当前线程数量的情景,适用于负载比较重的服务器。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
ExecutorService executorService = Executors.newFixedThreadPool(2);

(2) SingleThreadExecutor—-单线程线程池

  - **corePoolSize == maximumPoolSize =1
  - **workQueue = LinkedBlockingQueue**


只有一个线程的线程池,常用于需要让任务顺序执行**,并且在任意时间,只能有一个任务被执行,而不能有多个任务同时执行的场景。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

(3) CachedThreadPool—-可缓存线程池

  - **corePoolSize =0 **
  - **maximumPoolSize =****Integer.MAX_VALU**
  - **workQueue = ****SynchronousQueue**


线程数自增长,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

适用于执行很多短期异步任务的小程序,或者是负载较轻的服务器

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

(4) ScheduledThreadPool—-周期性线程池

  • corePoolSize =** corePoolSize**
  • maximumPoolSize =**Integer.MAX_VALU**
  • workQueue = **DelayedWorkQueue**

定长线程池,支持定时及周期性任务执行

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

同时ScheduledExecutorService还提供几个关于周期执行的方法:

// 延时任务
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

// 循环任务,按照上一次任务的发起时间计算下一次任务的开始时间
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

// 循环任务,以上一次任务的结束时间计算下一次任务的开始时间
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

(5) WorkStealingPool—-抢占式线程池(JDK1.8新增)

创建一个拥有**多个任务队列**的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。

内部会构建ForkJoinPool线程池,利用Work-Stealing(工作窃取)算法,并行地处理任务,不保证处理顺序。它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

并行的线程池,不会保证任务顺序执行,抢占式的工作。

能够合理的使用CPU进行对任务操作(并行操作),适合使用在很耗时的操作

// parallelism 为并行级别,并行级别决定了同一时刻最多有多少个线程在执行
// 默认为CPU核数
public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

ForkJoinPool线程池

ForkJoinPool是自java7开始,jvm提供的一个用于并行执行的任务框架。

使用分治法进行工作,其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果

线程池 - 图5

而工作窃取则是ForkJoinPool线程池的一种实现,工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。

ForkJoinPool里有三个重要的角色:

  • ForkJoinWorkerThread(下文简称worker):包装Thread;
  • WorkQueue:任务队列,双向;
  • ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。

ForkJoinPool使用数组保存所有WorkQueue。
image.png
每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。

  • 没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;
  • 属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。

**
WorkQueue是一个双端队列,同时支持LIFO(last-in-first-out)的push和pop操作,和FIFO(first-in-first-out)的poll操作,分别操作top端和base端。worker操作自己的WorkQueue是LIFO操作(可选FIFO),除此之外,worker会尝试steal其他WorkQueue里的任务,这个时候执行的是FIFO操作。

线程池 - 图7

使用案例

(1) FixedThreadPool—-定长线程池

public static void fixedThreadPoolExample(){

    // 开启只有两个线程的线程池
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    // 构造5个任务并将任务提交到线程池中
    for(int i=1;i<=5;i++){

        final int finalI = i;
        executorService.submit(new Runnable(){
            @Override
            public void run() {
                // 输出执行当前任务的线程名
                System.out.println("执行当前任务"+ finalI +"的是:"+Thread.currentThread().getName());
            }
        });
    }

    // 通知关闭线程池,等待所有任务被执行完
    executorService.shutdown();
    try {
        // 主线程等待线程池中的任务执行
        executorService.awaitTermination(1, TimeUnit.DAYS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

输出如下,从始至终只有两个线程在执行任务
image.png

(2) SingleThreadExecutor—-单线程线程池

public static void singleThreadPoolExample(){

    // 创建只有一个线程的线程池
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    // 构造5个任务并将任务提交到线程池中
    for(int i=1;i<=5;i++){

        final int finalI = i;
        executorService.submit(new Runnable(){
            @Override
            public void run() {
                // 输出执行当前任务的线程名
                System.out.println("执行当前任务"+ finalI +"的是:"+Thread.currentThread().getName());
            }
        });
    }

    // 通知关闭线程池,等待所有任务被执行完
    executorService.shutdown();
    try {
        // 主线程等待线程池中的任务执行
        executorService.awaitTermination(1, TimeUnit.DAYS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

image.png

(3) CachedThreadPool—-可缓存线程池

public static void cacheThreadPoolExample(){

    // 创建没有任何一个线程的线程池
    ExecutorService executorService = Executors.newCachedThreadPool();

    // 构造5个任务并将任务提交到线程池中
    for(int i=1;i<=5;i++){

        final int finalI = i;
        executorService.submit(new Runnable(){
            @Override
            public void run() {
                // 输出执行当前任务的线程名
                System.out.println("执行当前任务"+ finalI +"的是:"+Thread.currentThread().getName());
            }
        });

        try {
            // 创建完2任务和4任务之后进入休眠,过一会儿再新增任务
            if(i/2==0){
                Thread.sleep(i*10);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 通知关闭线程池,等待所有任务被执行完
    executorService.shutdown();
    try {
        // 主线程等待线程池中的任务执行
        executorService.awaitTermination(1, TimeUnit.DAYS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

2和4任务都是用的1号线程执行,因为经过休眠期之后1号线程又空闲了
image.png

(4) ScheduledThreadPool—-周期性线程池

延时任务示例:

// 创建只有两个线程的周期线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

System.out.println("任务执行之前的时间:"+new Date());
executorService.schedule(new Runnable(){
    @Override
    public void run() {
        // 输出执行当前任务的线程名
        System.out.println("任务执行时间:"+new Date()+",执行当前任务的是:"+Thread.currentThread().getName());
    }
},3,TimeUnit.SECONDS); // 延迟三秒后执行任务

// 通知关闭线程池,等待所有任务被执行完
executorService.shutdown();
try {
    // 主线程等待线程池中的任务执行
    executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

image.png
循环任务(scheduleAtFixedRate)示例:

 // 创建只有两个线程的周期线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

System.out.println("任务尚未执行,当前时间:"+new Date());

// 创建任务并丢进线程池
executorService.scheduleAtFixedRate(new Runnable(){
    @Override
    public void run() {
        // 输出执行当前任务的线程名
        System.out.println("任务执行时间:"+new Date()+",执行当前任务的是:"+Thread.currentThread().getName());
    }
},1,3,TimeUnit.SECONDS); // 延迟1秒后开始执行任务,每三秒执行一次

// 主线程休眠3分钟,防止过快关闭线程池,还没开始执行任务就关闭了
try {
    Thread.sleep(30000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

// 通知关闭线程池,等待所有任务被执行完
executorService.shutdown();
try {
    // 主线程等待线程池中的任务执行
    executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

任务提交延时1秒后,开始按照每三秒一次(以任务开始时间为起点计算)的周期去执行任务
image.png
循环任务(scheduleWithFixedDelay)示例:

// 创建只有两个线程的周期线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

System.out.println("任务尚未执行,当前时间:"+new Date());

executorService.scheduleWithFixedDelay(new Runnable(){
    @Override
    public void run() {
        // 输出执行当前任务的线程名
        System.out.println("单次任务开始执行时间:"+new Date()+",执行当前任务的是:"+Thread.currentThread().getName());
        // 假设任务执行中间耗时1秒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("单次任务执行结束时间:"+new Date()+",执行当前任务的是:"+Thread.currentThread().getName());
    }
},1,3,TimeUnit.SECONDS); // 延迟1秒后开始执行任务,每三秒执行一次

// 主线程休眠30秒,防止过快关闭线程池,还没开始执行任务就关闭了
try {
    Thread.sleep(30000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

// 通知关闭线程池,等待所有任务被执行完
executorService.shutdown();
try {
    // 主线程等待线程池中的任务执行
    executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
    e.printStackTrace();
}
}

任务提交延时1秒后,开始按照每单次任务结束后三秒执行一次(以任务结束时间为起点计算)的周期去执行任务
image.png

(5) WorkStealingPool—-抢占式线程池(JDK1.8新增)

 // 创建只允许两个线程同时运行的抢占式线程池
ExecutorService executorService = Executors.newWorkStealingPool(2);

// 构造5个任务并将任务提交到线程池中
for(int i=1;i<=5;i++){

    final int finalI = i;
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            // 假设任务耗时1秒
            try {
                Thread.sleep(finalI * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("执行当前任务" + finalI + "的是:" + Thread.currentThread().getName() + ",耗时 " + (System.currentTimeMillis() - start) + "毫秒");
        }
    });

}

// 主线程休眠等待
try {
    Thread.sleep(60000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

我们debug之后可以看到创建的ForkJoinPool中有的队列数组中有三个队列
image.png
其中队列0没有属于什么线程,里面装着的是还没有分配的任务
image.png
队列1属于线程ForkJoinPool-1-worker-0,目前执行的是i == 2 的任务
image.png
队列3属于线程ForkJoinPool-1-worker-1,目前执行的是i == 1 的任务
image.png
到最后执行完成输出的结果如下,线程执行完本队列线程后,又从队列0”窃取”了任务过来执行

image.png