我们先看 Java 开发手册上说的:
image.png
我们可以看一下源码:
image.png
这里的 ThreadPoolExecutor 的构造函数如下:
/*
Creates a new {@code ThreadPoolExecutor} with the given initial
parameters and default thread factory and rejected execution handler.
It may be more convenient to use one of the {@link Executors} factory
methods instead of this general purpose constructor.

@param corePoolSize the number of threads to keep in the pool, even
if they are idle, unless {@code allowCoreThreadTimeOut} is set
@param maximumPoolSize the maximum number of threads to allow in the
pool
@param keepAliveTime when the number of threads is greater than
the core, this is the maximum time that excess idle threads
will wait for new tasks before terminating.
@param unit the time unit for the {@code keepAliveTime} argument
@param workQueue the queue to use for holding tasks before they are
executed. This queue will hold only the {@code Runnable}
tasks submitted by the {@code execute} method.
@throws IllegalArgumentException if one of the following holds:

{@code corePoolSize < 0}

{@code keepAliveTime < 0}

{@code maximumPoolSize <= 0}

{@code maximumPoolSize < corePoolSize}
@throws NullPointerException if {@code workQueue} is null
/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

参数说明:

image.png

类图结构:

image.png
Executors的创建线程池的方法,创建出来的线程池都实现了ExecutorService接口。常用方法有以下几个:

  • newFiexedThreadPool(int Threads):创建固定数目线程的线程池。
  • newCachedThreadPool():创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
  • newSingleThreadExecutor():创建一个单线程化的Executor。
  • newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

类看起来功能还是比较强大的,又用到了工厂模式、又有比较强的扩展性,重要的是用起来还比较方便,如:
ExecutorService executor = Executors.newFixedThreadPool(nThreads) ;
即可创建一个固定大小的线程池。

执行原理

线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。
当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSize和setMaximumPoolSize进行动态更改。
这段话详细了描述了线程池对任务的处理流程,这里用个图总结一下
image.png

使用 Executors 创建四种类型的线程池

newCachedThreadPool是Executors工厂类的一个静态函数,用来创建一个可以无限扩大的线程池。
而Executors工厂类一共可以创建四种类型的线程池,通过Executors.newXXX即可创建。下面就分别都介绍一下。

1. FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
image.jpeg

  • 它是一种固定大小的线程池;
  • corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
  • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
  • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
  • 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
  • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

    2. CachedThreadPool

    public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue());
    }
    image.jpeg

  • 它是一个可以无限扩大的线程池;

  • 它比较适合处理执行时间比较小的任务;
  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
  • keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
  • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

    3. SingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
    }
    image.jpeg

  • 它只会创建一条工作线程处理任务;

  • 采用的阻塞队列为LinkedBlockingQueue;

    4. ScheduledThreadPool

    它用来处理延时任务或定时任务。
    image.jpeg

  • 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:

  1. scheduledAtFixedRate
  2. scheduledWithFixedDelay
  • SchduledFutureTask接收的参数:
  1. time:任务开始的时间
  2. sequenceNumber:任务的序号
  3. period:任务执行的时间间隔
  • 它采用DelayQueue存储等待的任务
  • DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
  • DelayQueue也是一个无界队列;
  • 工作线程的执行过程:
  • 工作线程会从DelayQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayQueue

    Executors存在什么问题

    在阿里巴巴Java开发手册中提到,使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出),但是并没有说明为什么,那么接下来我们就来看一下到底为什么不允许使用Executors?
    我们先来一个简单的例子,模拟一下使用Executors导致OOM的情况。
    /*
    @author Hollis
    */
    public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
    for (int i = 0; i < Integer.MAX_VALUE; i++) {
    executor.execute(new SubThread());
    }
    }
    }

class SubThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
}
}
通过指定JVM参数:-Xmx8m -Xms8m 运行以上代码,会抛出OOM:
Exception in thread “main” java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
以上代码指出,ExecutorsDemo.java的第16行,就是代码中的executor.execute(new SubThread());。

Executors为什么存在缺陷

通过上面的例子,我们知道了Executors创建的线程池存在OOM的风险,那么到底是什么原因导致的呢?我们需要深入Executors的源码来分析一下。
其实,在上面的报错信息中,我们是可以看出蛛丝马迹的,在以上的代码中其实已经说了,真正的导致OOM的其实是LinkedBlockingQueue.offer方法。
Exception in thread “main” java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
如果读者翻看代码的话,也可以发现,其实底层确实是通过LinkedBlockingQueue实现的:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
如果读者对Java中的阻塞队列有所了解的话,看到这里或许就能够明白原因了。
Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。
而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。
上面提到的问题主要体现在newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并不是说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式创建的最大线程数可能是Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM。

创建线程池的正确姿势

避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException,这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好。
除了自己定义ThreadPoolExecutor外。还有其他方法。这个时候第一时间就应该想到开源类库,如apache和guava等。
作者推荐使用guava提供的ThreadFactoryBuilder来创建线程池。
public class ExecutorsDemo {

  1. private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()<br /> .setNameFormat("demo-pool-%d").build();
  2. private static ExecutorService pool = new ThreadPoolExecutor(5, 200,<br /> 0L, TimeUnit.MILLISECONDS,<br /> new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  3. public static void main(String[] args) {
  4. for (int i = 0; i < Integer.MAX_VALUE; i++) {<br /> pool.execute(new SubThread());<br /> }<br /> }<br />}<br />通过上述方式创建线程时,不仅可以避免OOM的问题,还可以自定义线程名称,更加方便的出错的时候溯源。