JDK在Executors中提供了四种线程池。它们底层都是使用ThreadPoolExecutor实现的,只是传入构造函数的参数不一样而已。这四种线程池有一些共同点。

  1. 都提供两种获取方式,不同点在于是否自定义线程构造工厂,一般而言使用默认的就足够了。
  2. 都是用默认的拒绝策略,即抛出异常RejectedExectionException。

    newFixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
  3. new LinkedBlockingQueue<Runnable>());
  4. }
  5. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  6. return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
  7. new LinkedBlockingQueue<Runnable>(),threadFactory);
  8. }

newFixedThreadPool是一个固定长度的线程池。初始时线程数量为0,每提交一个任务就创建一个线程,直到达到线程池的最大数,这时线程池的规模不再变化(如果某个线程发生了预期之外的Exception而结束,那么线程池会补充一个新的线程。感觉是在执行新任务时才会补充,因为底层是ThreadPoolExecutor。)。由于队列长度固定,因此核心线程数和最大线程数相等,最大存活时间也为0。

newCachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
  3. new SynchronousQueue<Runnable>());
  4. }
  5. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  6. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
  7. new SynchronousQueue<Runnable>(),threadFactory);
  8. }

newCachedThreadPool是一个可缓存的线程池。如果线程池的当前规模超过了处理需求,则回收空闲的线程;当需求增加时,则添加新的线程,线程池的规模不存在任何限制。实际上有限制,最大线程池数量为Integer.MAX_VALUE。但一般在线程数量达到限制之前,内存已经耗尽。因此注意,当无法估计同事执行任务的规模时,最好不要使用它,不然很有可能耗尽虚拟机资源。空闲线程的回收时间为60秒。

newSingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>()));
  5. }
  6. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  7. return new FinalizableDelegatedExecutorService
  8. (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue<Runnable>(),threadFactory));
  10. }

newSingleThreadExector是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来代替。单线程的Executor提供了大量的内部同步机制,从而确保任务执行的任务内存写入操作对于后续任务来说都是可见的。这意味着,即使这个线程会不时被另一个替代,但对象总可以安全地封装在“任务线程”中。
为什么newSingleThreadExecutor是Executor结尾,而其它三个线程都是Pool结尾?既然有了固定长度线程池,那为何还要有newSingleThreadExecutor呢?Executors.newFixedThreadPool(1)和newSingleThreadExecutor有什么区别?为什么newSingleThreadExecutor在ThreadPoolExecutor外面要包装一层FinalizableDelegatedExecutorService?答案在这里

newScheduleThreadPool

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public static ScheduledExecutorService newScheduledThreadPool(
  5. int corePoolSize, ThreadFactory threadFactory) {
  6. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  7. }

可以看到newScheduleThreadPool两个方法的画风和其他的不太一样。
那ScheduledExecutorService和ScheduledThreadPoolExecutor又是啥?

  1. public interface ScheduledExecutorService extends ExecutorService {
  2. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
  3. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
  4. TimeUnit unit);
  5. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
  6. long period, TimeUnit unit);
  7. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
  8. long delay, TimeUnit unit);
  9. }
  10. public class ScheduledThreadPoolExecutor
  11. extends ThreadPoolExecutor
  12. implements ScheduledExecutorService {
  13. ......
  14. }

ScheduledExecutorService是一个继承了ExecutorService的接口,扩展了4个方法。
ScheduledThreadPoolExecutor则是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService。
那么这就清楚了,newScheduleThreadPool底层还是ThreadPoolExecutor实现的,只是扩展了一些方法。这些方法就与newScheduleThreadPool的使用相关了。

newScheduleThreadPool创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务。

四种扩展方法的使用

注意下面的等待几秒只是为了说明任务延迟执行和阻塞,可能由于各种原因,并不一定恰好等待这么多秒。

第一种方法

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(8);
Runnable task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
ScheduledFuture future = scheduledThreadPool.schedule(task, 5, TimeUnit.SECONDS);
try {
    Object obj = future.get();
    if (obj == null) {
        System.out.println("obj is null.");
    } else {
        System.out.println("obj is not null.");
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
scheduledThreadPool.shutdown();
执行结果
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
obj is null.
Process finished with exit code 0

因为Runnable接口没有返回值,所以从future中只能get一个null。

第二种方法

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(8);
Callable<Integer> task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result;
};
ScheduledFuture<Integer> future = scheduledThreadPool.schedule(task, 5, TimeUnit.SECONDS);
try {
    Integer result = future.get();
    System.out.println("Future中get的值:" + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
scheduledThreadPool.shutdown();

// 执行结果
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
Future中get的值:3
Process finished with exit code 0

Callable是有返回值的,可以通过get返回获取返回值。在任务执行完成之前,get会阻塞。

接下来两个方法的使用稍微复杂一点

第三种方法

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period, TimeUnit unit);

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(8);
Runnable task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
};
ScheduledFuture future = scheduledThreadPool.scheduleAtFixedRate(task, 10, 5, TimeUnit.SECONDS);
System.out.println("123");
try {
    Object obj = future.get();
    if (obj == null) {
        System.out.println("obj is null.");
    } else {
        System.out.println("obj is not null.");
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
System.out.println("1234567");
scheduledThreadPool.shutdown();

// 执行结果
123
(等待10秒)
执行计算任务,结果是 3
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
......

// 修改一下代码
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(8);
Runnable task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
};
ScheduledFuture future = scheduledThreadPool.scheduleAtFixedRate(task, 10, 5, TimeUnit.SECONDS);
System.out.println("123");
System.out.println("1234567");
// 执行结果
123
1234567
执行计算任务,结果是 3
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
执行计算任务,结果是 3
(等待5秒)
......

// 最后执行shutDown方法
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(8);
Runnable task = () -> {
    int x = 1;
    int y = 2;
    int result = x + y;
    System.out.println("执行计算任务,结果是 " + result);
};
ScheduledFuture future = scheduledThreadPool.scheduleAtFixedRate(task, 10, 5, 
                                                                 TimeUnit.SECONDS);
System.out.println("123");
System.out.println("1234567");
scheduledThreadPool.shutdown();
//执行结果
123
1234567

Process finished with exit code 0

从第一个例子可以看出,任务仍然会被get方法阻塞。而由于这是一个循环任务,所以会一直阻塞下去。
从第二,三个例子可以看出,如果线程池没有被shutDown也会一直执行下去,shutDown之后则会停止。

该方法的注释
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after {@code initialDelay} then {@code initialDelay+period}, then {@code initialDelay + 2 period}, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.
渣渣翻译:
创建并执行一个周期性的动作。该动作会在给出的初始延迟后变为可执行,并且接下来会在给出的周期循环执行。这就是说,这个执行会在initialDelay(第二个入参)时间后开始,然后会在initialDelay + period(第三个入参)时间后再次执行,在initialDelay + 2
period时间后第三次执行,以此类推。如果任何任务执行时碰到一个异常,接下里的执行会被禁止。否则,任务只会被取消动作(task有一个cancel方法)或执行者的终止动作(比如executor的shutDown)而终止。如果该任务的某次执行时间比周期间隔更长,那么后续的执行动作会被推迟,并不会同时执行。

好啦,通过注释,能部分解释上面的三个例子。同样可以看到这三个例子没有覆盖所有的情况,但注释已经说得很清楚了。

第四种方法

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                     long delay, TimeUnit unit);

参考scheduleAtFixedRate的三个样例,执行结果一样。但这两个方法在某些细节上有所不同。

该方法的注释
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
渣渣翻译:
创建并执行一个周期性的动作。该动作会在给出的初始延迟后变成可执行,并且接下来的执行动作会在给出的延迟之后完成。这个延迟则指的是前一次执行的结束和后一次执行的开始之前的这段时间。如果在任何任务执行时碰到一个异常,则接下里的执行动作会被禁止。否则,任务只会被取消动作(task有一个cancel方法)或执行者的终止动作(比如executor的shutDown)而终止。

从注释中可以看出scheduleWithFixedDelay的两次任务执行间隔是固定的,会从前一次任务执行结束之后才开始计算。而scheduleAtFixedRate则会在固定的时间点(initialDelay + n * period)开始下一次任务。如果达到时间点时,前面某个任务(不一定是前一个,还可能是前面N个)还没有执行完成,则该任务会推迟。在前面任务执行完成后会立即开始。

那么出现一个问题,在scheduleAtFixedRate中,顺序执行任务1,2,3,4,5……,period是3秒。现在任务1由于某种阻塞,执行了10秒钟。那么任务1结束后,立即执行的是任务2还是任务5呢?如果任务2和任务5完全一样,那就没有区别。但如果task代码块中某个变量本来就会随着时间的改变而改变。比如说某个变量在任务1中值为1,在任务2中值为2……任务5中值为5,那么搞清楚这个问题就有必要了。暂时存个疑,这种变量可不可能存在以及这个问题有没有意义,等之后思考和看了源码再来解答。