线程池,其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源
**

image.png

线程池主要参数

  • corePoolSize:线程池的核心大小,也可以理解为最小的线程池大小。
  • maximumPoolSize:最大线程池大小。
  • keepAliveTime:空余线程存活时间,指的是超过corePoolSize的空余线程达到多长时间才进行销毁。
  • unit:销毁时间单位。
  • workQueue:存储等待执行线程的工作队列。
  • threadFactory:创建线程的工厂,一般用默认即可。
  • handler:拒绝策略,当工作队列、线程池全已满时如何拒绝新任务,默认抛出异常。

1.webp

  1. 线程池流程

如何配置线程池

CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。

IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。

混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。


ThreadPoolExecutor

论创建那种线程池 必须要调用ThreadPoolExecutor

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, 
int maximumPoolSize, 
long keepAliveTime, 
TimeUnit unit, 
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

方法

shutdown(),平滑的关闭线程池.(如果还有未执行完的任务,就等待它们执行完)

shutdownNow(),简单粗暴的关闭线程池.(没有执行完的任务也直接关闭)

setCorePoolSize(),设置/更改核心池的大小.

setMaximumPoolSize(),设置/更改线程池中最大线程的数量限制.

getActiveCount()  活跃的线程数

getCorePoolSize()  核心的线程数

getPoolSize()   线程池的大小

getQueue().size()  队列大小

execute 和 submit

execute():Executor中声明的方法,向线程池提交一个任务,交由线程池去执行,没有返回值

submit() :ExecutorService中声明的方法。向线程池提交一个任务,交由线程池去执行,可以接受 ( Callable.call() )回调函数的返回值,适用于需要处理返回着或者异常的业务场景,实际上还是调用的execute()方法,只不过它利用了 Future 来获取任务执行结果

schedule

schedule: 延时执行任务

scheduleAtFixedRate : 以固定频率来执行一个任务,按照上一次任务的发起时间计算下一次任务的开始时间

scheduleWithFixedDelay : 以上一次任务的结束时间计算下一次任务的开始时间,在你不能预测调度任务的执行时长时是很有用

线程池的关闭

shutdown() :不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

shutdownNow() :立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

任务队列 BlockingQueue

BlockingQueue workQueue 取值

ArrayBlockingQueue :有界的数组队列,先进先出,此队列创建时必须指定大小

SynchronousQueue : 同步的阻塞队列,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务

LinkedBlockingQueue :基于链表的先进先出队列,可支持有界/无界的队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE

PriorityBlockingQueue :优先队列,可以针对任务排序

自定义线程池

package com.study;

import java.util.Locale;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ExecutorServiceHelper {

    /**
     * 获取活跃的cpu数量
     */
    private static int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();

    private final static BlockingQueue<Runnable> queue;

    private final static long KEEP_ALIVE_TIME = 3L;

    private final static TimeUnit KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;
    private static ThreadFactory mThreadFactory;

    static {
        /**
         * 基于链表的队列,如果没有指定大小,则默认值是 Integer.MAX_VALUE
         */
        queue = new LinkedBlockingQueue<Runnable>();
        //默认的工厂方法将新创建的线程命名为:pool-[虚拟机中线程池编号]-thread-[线程编号]
        //mThreadFactory= Executors.defaultThreadFactory();
        mThreadFactory = new NamedThreadFactory();
        //System.out.println("NUMBER_OF_CORES:"+NUMBER_OF_CORES);
    }

    public static void execute(Runnable runnable) {
        if (runnable == null) {
            return;
        }
        /**
         * 1.当线程池小于 corePoolSize 时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
         * 2.当线程池达到 corePoolSize 时,新提交任务将被放入 workQueue 中,等待线程池中任务调度执行
         * 3.当 workQueue 已满,且 maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务
         * 4.当提交任务数超过 maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理
         * 5.当线程池中超过 corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程
         * 6.当设置 allowCoreThreadTimeOut(true) 时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭
         **/
        /**
         maximumPoolSize 推荐取值
         如果是 CPU 密集型任务,就需要尽量压榨CPU,参考值可以设为 NUMBER_OF_CORES + 1 或 NUMBER_OF_CORES + 2
         如果是 IO 密集型任务,参考值可以设置为 NUMBER_OF_CORES * 2
         */
        ExecutorService executorService = new ThreadPoolExecutor(NUMBER_OF_CORES,
                NUMBER_OF_CORES * 2, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT,
                queue, mThreadFactory);
        executorService.execute(runnable);
    }


    private static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumberAtomicInteger = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, String.format(Locale.CHINA, "%s%d", "NamedThreadFactory", threadNumberAtomicInteger.getAndIncrement()));
            /* thread.setDaemon(true);//是否是守护线程
            thread.setPriority(Thread.NORM_PRIORITY);//设置优先级 1~10 有3个常量 默认 Thread.MIN_PRIORITY*/
            return thread;
        }
    }

}

测试下

class TestThread {

    public static void main(String[] args) {
        ExecutorServiceHelper.execute(new Runnable() {
            @Override
            public void run() {
                //do something
                System.out.println("1");
            }
        });

    }
}