类图

手撕线程池 - 图1

Executor

基础接口
将任务提交和任务执行进行解耦
基于生产者消费者模型, 提交任务的执行是生产者, 执行任务的线程是消费者

ExecutorService

扩展了Executor, 添加了生命周期的管理方法和用于任务提交的便利方法
状态: 运行-关闭-终止

JUC 线程池

Java线程池,是典型的池化思想的产物

线程池作用

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

在JVM中默认一个线程需要使用256k~1M(取决于32位还是64位操作系统)的内存。
CPU调度列入其中,这边不将线程调度列入是因为睡眠中的线程不会被调度(OS控制),如果不是睡眠中的线程那么是一定需要被调度的。

不用线程池

如果一个线程的时间非常长,就没必要用线程池了(不是不能作长时间操作,而是不宜。),况且我们还不能控制线程池中线程的开始、挂起、和中止。

Executor线程池构造4种线程池

Executors提供的工厂方法
Java通过Executors(jdk1.5并发包 java.util.current)提供四种静态方法创建线程池

Executors工厂类中提供的 四种线程池 其实也只是ThreadPoolExecutor的核心构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。

手动创建线程池效果更好

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
手撕线程池 - 图2
1.△newSingleThreadExecutor 单个线程

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
单例类线程池只有一个线程,无边界队列,适合cpu密集的运算

  1. // 源码
  2. public static ExecutorService newSingleThreadExecutor() {
  3. return new FinalizableDelegatedExecutorService
  4. (new ThreadPoolExecutor(1, 1,
  5. 0L, TimeUnit.MILLISECONDS,
  6. new LinkedBlockingQueue<Runnable>()));
  7. }

2.△ newCachedThreadPool 可缓存线程

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

  1. // 源码
  2. public static ExecutorService newCachedThreadPool() {
  3. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  4. 60L, TimeUnit.SECONDS,
  5. new SynchronousQueue<Runnable>());
  6. }

3.△ newFixedThreadPool 固定线程

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

4.△newScheduledThreadPool 定时线程

创建一个定长线程池,支持定时及周期性任务执行。

  1. <dependency>
  2. <groupId>com.google.guava</groupId>
  3. <artifactId>guava</artifactId>
  4. <version>23.0</version>
  5. </dependency>
ThreadFactory namedFactory = new ThreadFactoryBuilder()
    .setNameFormat("chat-pool-%d").build();

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5, namedFactory);

HelloThread helloThread = new HelloThread();
ScheduledFuture<?> scheduledFuture1 = executor.scheduleAtFixedRate(helloThread, 1, 1, TimeUnit.SECONDS);
// 调用
 try {
    TimeUnit.SECONDS.sleep(1);
    scheduledFuture1.cancel(true);
} catch (Exception e) {
    e.printStackTrace();
}
executor.shutdown();

核心线程池 ThreadPoolExecutor

核心接口 : ExecutorService

执行策略

手撕线程池 - 图3
当新的线程请求进来时,会先判断核心线程数
===核心线程数**corePoolSize如果未满则直接新建线程并执行,执行完将其放回线程池;
===核心线程数如果已满
======再检查
任务队列workQueue,如果没满就将当前线程请求加入缓冲队列 ,等待空闲线程分配;
======队列已满, 再检查线程池当前存在的线程数是否已达到规定的
最大线程maximumPoolSize,如果没有达到就创建线程执行;
============如果达到就执行对应的
拒绝策略**。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

根据源码可以发现整个线程池大致分为 3 个部分,

  • 是创建 worker 线程execute(Runnable command)
  • 添加任务到 workQueue; addWorker(Runnable firstTask, boolean core)
  • worker 线程执行具体任务

线程数配置

1.线程池的默认值

  • corePoolSize=1
    核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,
    当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
  • queueCapacity=Integer.MAX_VALUE
  • maxPoolSize=Integer.MAX_VALUE
    线程池最大线程数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime=60s 空闲线程允许的最大的存活时间
  • allowCoreThreadTimeout=false
  • unit: 存活时间的单位
  • workQueue: 阻塞任务队列
  • threadFactory: 线程工厂用来创建线程,一般设置线程名称
  • rejectedExcutionHandler=AbortPolicy() 拒绝策略,针对当队列满了时新来任务的处理方式
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2. 项目中如何配置线程数量?

额外知识: CPU密集型(CPU-bound) CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。 IO密集型(I/O bound) IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。 计算密集型程序适合C语言多线程,I/O密集型适合脚本语言开发的多线程。

如果是CPU密集型应用,则线程池大小设置为CPU核数+1
如果是IO密集型应用,则线程池大小设置为CPU核数*2+1
手撕线程池 - 图4
更多请参考: https://www.jianshu.com/p/f30ee2346f9f


3. 阻塞队列

手撕线程池 - 图5
1、[有界] ArrayBlockingQueue:由数组结构组成的阻塞队列
表现稳定,添加和删除使用同一个锁,通常性能不如后者
2、[有界] LinkedBlockingQueue:由链表组成的有界(但大小默认值为Integer.MAX_Value)
添加和删除两把锁是分开的,所以竞争会小一些
3、{无界} PriorityBlockingQueue:支持优先级排序的无界阻塞队列

上面三个队列他们也是存在共性的
put take 操作都是阻塞的
offer poll 操作不是阻塞的,offer 队列满了会返回false不会阻塞,poll 队列为空时会返回null不会阻塞
补充一点,并不是在所有场景下,非阻塞都是好的,阻塞代表着不占用CPU,在有些场景也是需要阻塞的,put take 存在必有其存在的必然性

4、{无界} DelayQueue:使用优先级队列实现的延迟无界阻塞队列
5、SynchronizedQueue:不存储元素的阻塞队列,也即单个元素的队列
6、{无界} LinkedTransferQueue:由链表结构组成的无界阻塞队列
7、LinkedBlockingDeque:由链表结构组成的双向阻塞队列

现在也来说一说无界队列的共同点

  • put 操作永远都不会阻塞,空间限制来源于系统资源的限制
  • 底层都使用 CAS 无锁编程

4. 拒绝策略解析

当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。

jdk默认提供了四种拒绝策略:
CallerRunsPolicy - 则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy 默认 - 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy - 直接丢弃
DiscardOldestPolicy - 当触发拒绝策略,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

线程池方法

1.execute() 与 submit() 的区别?

execute() 参数 Runnable ;submit() 参数 (Runnable) 或 (Runnable 和 结果 T) 或 (Callable)
execute() 没有返回值;而 submit() 有返回值
submit() 的返回值 Future 调用get方法时,可以捕获处理异常

设计一个线程池

1)线程池管理类[单例设计模式]
主要用于实现创建线程和添加线程处理的新任务,执行任务以及如何回收已经执行完任务的线程。
增加任务的方法(addTask)、批量增加任务的方法(batchAddTask)、得到实例的方法(getInstanse)以及执行任务的方法(execute)

//增加任务的方法
public void addTask(Task task){
    lock.lock();
    taskQueue.add(task);
    taskQueue.notifyAll();
    lock.unlock();
}

//执行任务的方法
public void execute(){
    for(int i=0; i<workThreads.length; i++){
    WorkThread wt = workThreads[i];
    if(wt.isWaiting()){
    new Thread(wt).start();
    break;
    }
}

2)工作线程类
线程池中的线程,也实现了Runnable接口,它主要用于处理任务队列中的任务。
在run()方法里面它首先判断任务队列里面是否有任务,如果没有就等待新任务加入,如果有任务就从任务队列中取出任务并执行,在线程池中可以设定工作线程的个数

while(isRunning){
    lock.lock();
    while(taskQueue.isEmpty()){
        try{
            taskQueue.wait();
        }catch(Exception e){
        }
    }
    task = (Task)taskQueue.remove(0);
    try{
        new Thread(task).start();
    }catch(Exception e){
        e.printStackTrace();
    }
    lock.unlock();
    }
}

3)任务类
定义任务的各种属性,以及要执行的操作

public class Task implements Runnable{
    public void run(){
    }
}

4)任务队列 Map、Set以及List
按先来先服务的顺序用于存放新加入的任务,以便让工作线程来执行
在文中通过Collections.synchronizedList将其转换为一个线程安全的类。任务队列定义如下所示:

private List<Task> taskQueue = Collections.synchronizedList(new LinkedList<Task>());

ForkJoinPool

这边推荐大家使用 newWorkStealingPool,也就是ForkJoinPool。采取了工作窃取的模式。 后续会跟大家一起聊聊 ForkJoinPool。

https://www.toutiao.com/i6735249778763891204/

线程池监控

运行时状态实时查看
用户基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,