【代码参考】juc-demo.zip
概念
- 线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。Java线程有多种生命状态。
- 进程是操作系统执行的最小单位。一个进程可以包含多个线程。
- 线程池顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控.
线程状态
- 新建状态
- 就绪状态
- 运行状态
- 阻塞(等待)状态
- 超时阻塞状态
- 销毁(终结)状态
线程创建方式
- 继承Thread类,重写run()方法
- 实现Runnable接口,重写run()方法
package com.itmck.thread;
/**
*
* 创建线程
* 1)继承Thread类重写run()方法
* 2)实现Runnable接口重写run()方法
*
*/
public class ThreadTest {
public static void main(String[] args) throws Exception {
//内部类的方式创建线程
Thread thread = new Thread(() -> System.out.println("通过匿名内部类创建一个线程"));
thread.start();
MyThread myThread = new MyThread();
myThread.start();
Thread thread1 = new Thread(new MyRunnable());
thread1.start();
}
}
class MyThread extends Thread{
@Override
public void run() {
System.out.println("通过继承Thread类创建线程");
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("通过实现Runnable接口创建线程");
}
}
线程池
在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:
如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
何时使用线程池
- 单个任务处理时间比较短
-
线程池优势是什么
重用存在的线程,减少线程创建,消亡的开销,提高性能
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池的底层工作原理
线程池内部是通过队列+线程实现的,当我们利用线程池执行任务时:
- 如果此时线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
- 如果此时线程池中的线程数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。
- 如果此时线程池中的线程数量大于等于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
- 如果此时线程池中的线程数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
- 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数
详解Executor
Executor接口是线程池框架中最基础的部分,位于package java.util.concurrent 包下定义了一个用于执行Runnable的execute方 法。 下图为它的继承与实现
Executor线程池的核心顶层接口。定义了execute()方法 参数为Runnable类型的任务
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
//在完成已提交的任务后封闭办事,不再接管新任务
void shutdown();
//停止所有正在履行的任务并封闭办事
List<Runnable> shutdownNow();
//测试是否该ExecutorService已被关闭。
boolean isShutdown();
//测试是否所有任务都履行完毕了
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit);
//可用来提交Callable,并返回代表此任务的Future对象
<T> Future<T> submit(Callable<T> task);
//可用来提交Runnable任务,并返回代表此任务的Future对象
<T> Future<T> submit(Runnable task, T result);
//可用来提交Runnable任务,并返回代表此任务的Future对象
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) ;
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
}
线程池的5种状态
RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -TERMINATED。
进入TERMINATED的条件如下:
- 线程池不是RUNNING状态.
- 线程池状态不是TIDYING状态或TERMINATED状态.
- 如果线程池状态是SHUTDOWN并且workerQueue为空.
- workerCount为0.
- 设置TIDYING状态成功.
线程池的具体实现
- ThreadPoolExecutor 默认线程池
- ScheduledThreadPoolExecutor 定时线程池
ThreadPoolExecutor
线程池的创建
public ThreadPoolExecutor(
int corePoolSize,//核心线程大小
int maximumPoolSize,//最大线程数
long keepAliveTime,//存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue,//工作队列
ThreadFactory threadFactory,//用于创建线程任务的工厂
RejectedExecutionHandler handler //丢弃策略4种
)
线程池的提交
public void execute() //提交任务无返回值
public Future<?> submit() //任务执行完成后有返回值
线程池的核心参数解释
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
- 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
- 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
- 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
- 4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
- 1、AbortPolicy:直接抛出异常,默认策略;
- 2、CallerRunsPolicy:用调用者所在的线程来执行任务;
- 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- 4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
Executors类
Executors类主要用于提供线程池相关的操作.提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
常用静态方法:
如下所示,Executors提供的静态方法,其中重点关注如下.
newCachedThreadPool()
- 缓存型池子,先查看池中有没有以前建立的线程,如果有就 reuse.如果没有就建一个新的线程加入池中.
- 缓存型池子通常用于执行一些生存期很短的异步型任务, 因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。
能reuse的线程,必须是timeout IDLE内的池中线程,缺省 timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
newFixedThreadPool(int)
newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
- 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
- 和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
- 从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
- fixed池线程数固定,并且是0秒IDLE(无IDLE)
- cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
newScheduledThreadPool(int)
- 调度型线程池
- 这个池子里的线程可以按schedule依次delay执行,或周期执行
SingleThreadExecutor()
- 单例线程,任意时间池中只能有一个线程
- 用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
使用案例
package com.itmck.executor;
import java.util.concurrent.*;
/**
* Executor 线程池的核心顶层父类
*
*/
public class ExecutorsTest {
public static void main(String[] args) {
//通过指定参数创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,
20,
5, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()//丢弃策略
);
threadPoolExecutor.execute(() -> System.out.println("ThreadPoolExecutor创建线程池"));
threadPoolExecutor.shutdown();
/**
*
* 使用Executors来创建线程池更加方便。另外不用过多考虑线程池大小之类的参数。
* Executors类主要用于提供线程池相关的操作.提供了一系列工厂方法用于创建线程池,
* 返回的线程池都实现了ExecutorService接口。
*
*/
//单例线程,任意时间池中只能有一个线程
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> System.out.println("Executors创建线程池"));
executorService.shutdown();
//创建固定数目的活动线程池,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
ExecutorService executorService1 = Executors.newFixedThreadPool(10);//10线程数量
executorService1.execute(() -> System.out.println("Executors创建线程池"));
executorService1.shutdown();
//调度型线程池.这个池子里的线程可以按schedule依次delay执行,或周期执行
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> System.out.println("延迟5秒执行"), 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("从1秒后开始,每5秒钟执行一次"), 1, 5, TimeUnit.SECONDS);
// scheduledExecutorService.shutdown();//关闭
}
}
/**
* TimeUnit.DAYS //天
* TimeUnit.HOURS //小时
* TimeUnit.MINUTES //分钟
* TimeUnit.SECONDS //秒
* TimeUnit.MILLISECONDS //毫秒
*/