线程池——治理线程的法宝

1、线程池的自我介绍

线程池的重要性:面试和工作当中都会出现
什么是池
软件中的“池”,可以理解为计划经济。
第一个好处:复用线程,
第二个好处:控制资源总量。

如果不使用线程池,每个任务都新开一个线程处理

  • 一个线程
  • for循环
  • 任务数量上升到1000个

Java中的线程对应操作系统中的线程,这样的开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并销毁线程所带来的开销问题。

当存在一个任务,来一个任务就处理一个异常。

  1. public class EveryTaskOneThread {
  2. public static void main(String[] args) {
  3. Thread thread = new Thread(new Task());
  4. thread.start();
  5. }
  6. static class Task implements Runnable {
  7. @Override
  8. public void run() {
  9. System.out.println("执行了任务");
  10. }
  11. }
  12. }

多个任务采用循环方式创建线程

public class ForLoop {

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            Thread thread = new Thread(new Task());
            thread.start();
        }
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }
}

1.1 为什么需要使用线程池?

问题1:反复创建开销大
问题2:过多的线程会占用过多内存

1.2 线程池好处

  • 加快响应速度
  • 合理利用CPU和内存
  • 统一管理

1.3 线程池适合应用场合

服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。

2、创建和停止线程池

2.1 创建线程池

2.1.1 线程池构造函数的参数

image.png
首先我们要理清线程池各个类之间的关系。
image.png

ExecutorService继承自Executor,ThreadPoolExecutor实现了ExcutorService。
Executors并不是线程池的实现,复数形式的Executor其实本质上是一个工具类,能够返回特定功能的线程池,其实内部都是创建的ThreadPoolExecutor对象,参数不同而已,并且方法的返回值是ExecutorService,这个也是没问题的,用接口类接收实现类的对象,是被允许的。

2.1.2 参数中的corePoolSize和maxPoolSize

corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中没有执行任何线程,线程池会等待有任务到来时,再创建新线程去执行任务。

线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这个最大的量maxPoolSize。即使没有任务也不会减少到核心线程数以下。

image.png
添加线程规则

1、如果线程小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。

2、如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列。

3、如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务。

4、如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务。

image.png

是否需要增加线程的判断顺序是:corePoolSize -> workQueue -> maxPoolSize

举个例子:线程池:核心池大小为5,最大池大小为10,队列为100
因为线程中的请求最多会创建五个然后任务将被添加到队列中,直到达到100,当队列满时,将会创建最新的线程maxPoolSize,最多到10个线程,如果再来任务就拒绝

2.1.3 增减线程的特点

  1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
  3. 通过设置maximumPoolSize为很高的值,例如Integer。MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
  4. 是只有在队列填满时才能创建多余corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize;

    2.1.4 keepAliveTime和线程工厂

keepAliveTime:如果线程池当前的线程多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止。

ThreadFactory 用来创建线程,新的线程是由ThreadFactory创建,默认使用Executors.defaultThreadFactory,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那就可以改变线程名、线程组、优先级、是否是守护线程等。
通常我们使用默认的ThreadFactory就可以了,以下是默认线程工厂源码:

 /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

2.1.5 工作队列

有三种最常见的队列类型:
1)直接交接:SynchronousQueue,没有队列进行任务缓存,队列不保存任务,需要把maximumPoolSize设置大一点。
2) 无界队列:LinkedBlockingQueue,队列是无界的,maximumPoolSize设置多大都没用,因为队列永远填不满,所以线程数永远不会超过corePoolSize。
3) 有界的队列:ArrayBlockingQueue,maximumPoolSize有意义了,当线程数达到corePoolSize并且队列满了,就回去创建新线程,直到maximumPoolSize个线程。

2.2 内存溢出的情况

线程池应该手动创建还是自动创建。
设置运行内存,-Xmx8m -Xms8m

  • 手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽风险。
  • 让我们来看看自动创建线程池(也就是直接调用JDK封装好的构造函数)可能带来哪些问题。

newFixedThreadPool
由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,容易造成占用大量的内存,可能会导致OOM。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

newSingleThreadExecutor
可以看出,这里和刚刚的newFiexdThreadPool的原理基本一样,只不过把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool
可缓存的线程池,特点:无界线程池,具有自动回收多余线程的功能。使用SynchronousQueue,可缓存的,一定时间会把线程回收回去,默认60s。
这种线程池存在一定的弊端,这里的弊端在于,第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建非常多的线程,甚至导致OOM。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

image.png

newScheduledThreadPool
支持定时及周期性任务执行的线程池。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

2.3 如何正确创建线程池

正确创建线程池的方法,根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等等。

线程池里的线程数量设定为多少比较合适?

  • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
  • 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于cpu核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:
  • 线程数=CPU核心数 * (1+平均等待时间/平均工作时间)

    3、常见线程池的特点对比

1)固定数量的线程池:FixedThreadPool

image.png
2)可缓存的线程池:CachedThreadPool

image.png
3)单一线程的线程池:SingleThreadPool

单线程的线程池:它只会用唯一的工作线程来执行任务,它的原理和FixedThreadPool是一样的,但是此时的线程数量被设置为了1.

4)定时任务类型的线程池:ScheduledThreadPool

4种线程池参数对比
image.png

阻塞队列分析

Q:FixedThreadPool和SingleThreadExecutor的Queue是LinkedBlockingQueue?
A:因为线程数已经固定了,只能通过无界队列来处理更多的任务。

Q:CachedThreadPool使用的Queue是SynchronousQueue?
A:不需要队列存储任务,直接交给新线程去执行。

Q:ScheduledThreadPool来说,它使用的是延迟队列DelayedWorkQueue
A:根据时间先后做延迟,符合它的应用场景。

WorkStealingPool是JDK1.8加入的。
这个线程池和之前的几种线程池都有很大的不同,不能保证执行顺序,其他线程会窃取子任务,如果使用递归,会有子任务产生,适用于这种情况,不过很少使用。

4、停止线程池的正确方法

1、shutdown 执行完正在执行的任务,并且执行完队列里的任务。就会停止,并且,执行完shutdown就不允许在向线程池中提交任务,会将其他任务拒绝

2、isShutDown 是不是已经停止了,不是完全停止,是不是进入停止状态了。

3、isTerminated 返回是否线程池完全终止了。

4、awaitTermination 方法作用弱,等待一段时间判断线程池是否执行完毕,执行完毕返回true,没有执行完毕返回false.

5、shutdownNow 立刻把线程池关闭掉。用中断信号interrupt(优雅停止)。正在队列中等待的任务,会直接返回,这个方法有返回值,会返回一个列表。

5、线程池拒绝策略

5.1 拒绝时机

1、当Executor关闭时,提出拒绝新任务会被拒绝。
2、以及当Executor对最大线程和工作队列容量使用有边界并且已经饱和。

image.png

5.2 四种拒绝策略

  • AbortPlicy: 直接抛出异常丢弃任务
  • DiscardPolicy: 丢弃任务,默默丢弃,不会得到通知
  • DiscardOldestPolicy: 丢弃最老的任务。
  • CallerRunsPolicy: 让提交任务的线程去执行,这种策略比较聪明,两点好处,第一,任务无损失,第二(重要)降低提交速度,这是一种负反馈。这段时间主线程也会去执行完一些任务。

    6、添加钩子方法

    每个任务执行之前后,执行beforeExecute方法、afterExecute方法,来进行日志、统计等操作。

实例增加线程池暂停功能:

/**
 * 演示每个任务执行前后放钩子函数,在执行任务前后做日志或者做统计可以通过这种方式做到这一点。
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    /**
     * 是不是已经暂停
     */
    private boolean isPaused;
    /**
     * 为暂停标记添加锁
     */
    private final ReentrantLock lock = new ReentrantLock();
    /**
     * 配合lock使用的Condition
     */
    private Condition unpaused = lock.newCondition();


    // 以下四个构造方法都是自动生成的
    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }


    /**
     * 任务执行之前的方法,这个方法就是线程池的钩子方法
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            // 判断是否暂停
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }

    /**
     * 暂停线程池
     */
    public void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 恢复线程
     */
    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            // 唤醒全部的
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool =
                new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {

            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // 放到线程池中执行
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }

        Thread.sleep(1500);
        // 暂停线程池
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");

        Thread.sleep(1500);
        // 恢复线程池
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");
    }
}

7、线程池实现原理

7.1 线程池组成部分

  • 线程池管理器
  • 工作队列
  • 任务队列
  • 任务接口(Task)

任务队列要选择线程安全的BlockingQueue

image.png

7.2 Executor家族梳理

image.png

Executors是工具类。可去快速创建线程池。
image.png
image.png
创建的是ThreadPoolExecutor,返回的是ExecutorService,因为ExecutorService是ThreadPoolExecutor的父级接口。所以说ExecutorService也可以成为线程池。

7.3 线程池实现任务复用的原理

相同的线程执行不同的任务。下面是线程复用的核心方法。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    int c = ctl.get();
    // 如果小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 增加工作线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

随后进入addWorker方法。之后找到Worker类

w = new Worker(firstTask);

进入Worker类查看runWorker代码。在方法中把Runnable对象拿到,然后调用他的run方法,就可以实现相同的线程把,不同的人物run方法反复执行,而getTask就是从阻塞队列里取出来的,while循环就是整个worker不会停止,在循环中执行,执行完一个任务再执行下一个任务,并且相同的线程可以执行不同的任务。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 拿到Task

    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循环中执行,去拿下一个
        // 只要拿到的任务不为空,去执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

8、线程池状态和注意点

8.1 线程池状态

这五种状态在ThreadExecutorPool中都有定义。

  • RUNNING:接收新任务并处理排队任务
  • SHUTDOWN:不接受新任务,但是处理排队任务
  • STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。
  • TIDYING:中文是整洁,理解了中文就容易理解这个状态了:

所有任务都已终止,workerCount为零时,线程会转化到TIDYING状态,并将运行terminate()钩子方法。

  • TERMINATED:terminate()运行完成

8.2 execute()方法分析

主要分三步处理:必须传入command,取到ctl获取到线程状态和线程数。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    int c = ctl.get();
    // 如果小于核心线程,则添加worker
    if (workerCountOf(c) < corePoolSize) {
        // 添加任务,true的话就是判断在增加线程时判断是否少于核心的数量
        // 如果传入false就表示增加线程时,去判断是否少于最大线程数
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 检查线程池状态看是否被终止,如果运行中就向队列中放任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次做检查,如果不是运行状态,则删掉任务并拒绝
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 线程如果是0,可能抛异常停止了,也需要创建一个新的线程,防止没有线程执行
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 执行到这一步说明,队列也放不进去了,线程数也已经达到核心线程数了
    // 增加到最大线程数如果增加到最大线程数,如果增加失败了,那么就执行拒绝的逻辑。
    else if (!addWorker(command, false))
        reject(command);
}

8.3 注意点

避免任务堆积
避免线程数过度增加。
排查线程泄露,可能因为任务结束不了。