1.概念
多线程是Java中不可避免的一个重要主体。从本章开始,我们将展开对多线程的学习。接下来的内容,是对“JDK中新增JUC包”之前的Java多线程内容的讲解,涉及到的内容包括,Object类中的wait(), notify()等接口;Thread类中的接口;synchronized关键字。
注:JUC包是指,Java.util.concurrent包,它是由Java大师Doug Lea完成并在JDK1.5版本添加到Java中的。
在进入后面章节的学习之前,先对了解一些多线程的相关概念。
线程状态图
说明:
线程共包括以下5种状态。
- 新建状态(New) : 线程对象被创建后,就进入了新建状态。例如,Thread thread = new Thread()。
- 就绪状态(Runnable) : 也被称为“可执行状态”。线程对象被创建后,其它线程调用了该对象的start()方法,从而来启动该线程。例如,thread.start()。处于就绪状态的线程,随时可能被CPU调度执行。
- 运行状态(Running) : 线程获取CPU权限进行执行。需要注意的是,线程只能从就绪状态进入到运行状态。
阻塞状态(Blocked) : 阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:
(01) 等待阻塞 — 通过调用线程的wait()方法,让线程等待某工作的完成。
(02) 同步阻塞 — 线程在获取synchronized同步锁失败(因为锁被其它线程所占用),它会进入同步阻塞状态。
(03) 其他阻塞 — 通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。死亡状态(Dead) : 线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
这5种状态涉及到的内容包括Object类, Thread和synchronized关键字。这些内容我们会在后面的章节中逐个进行学习。
Object类,定义了wait(), notify(), notifyAll()等休眠/唤醒函数。
Thread类,定义了一些列的线程操作函数。例如,sleep()休眠函数, interrupt()中断函数, getName()获取线程名称等。
synchronized,是关键字;它区分为synchronized代码块和synchronized方法。synchronized的作用是让线程获取对象的同步锁。
在后面详细介绍wait(),notify()等方法时,我们会分析为什么“wait(), notify()等方法要定义在Object类,而不是Thread类中”。
run( ): 启动线程由主线程执行。
start( ) :让线程进入可执行状态 由cpu决定执行顺序。
线程分类 :守护线程,非守护线程
守护线程 :是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因 此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
2.基础知识
2.1 实现Runable接口
class ThreadRunable implements Runnable{
@Override
public void run() {
System.out.println("i am from runable");
}
}
2.2 继承Thread线程类
class ThreadExtend extends Thread{
@Override
public void run() {
System.out.println("i am from thread");
}
}
启动类
public class TestThread1 {
public static void main(String[] args) {
ThreadExtend t = new ThreadExtend();
t.start();
Thread t1 = new Thread(new ThreadRunable());
Thread t2 = new Thread(new ThreadRunable());
t1.run();
t2.run();
}
}
实现Runable的优点
- 避免继承的局限性,一个类只能继承一个父类
- 多线程共享一个接口的子类的对象,适合多个线程处理同一份资源。
2.3 生产者消费者
2.3.1 名词解释
- join:
让当前cpu执行的线程等待 直到调用join( ) 的那个线程执行完成后 继续执行等待的线程
不传入时间默认join(0) 传入0 当调用join() 的线程isAlive活着的时候 是让调用join的线程执行完成后 再执行其他线程
public class TestThreadJoin {
public static void main(String[] args) {
Thread t1 = new Thread(new ThreadRunable3());
t1.start();
try {
t1.join(2000); //强制让cpu执行2s
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("==============================================");
}
}
class ThreadRunable3 implements Runnable{
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
System.out.println("i am from runable " + Thread.currentThread().getName());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
i am from runable Thread-0
==============================================
i am from runable Thread-0
i am from runable Thread-0
i am from runable Thread-0
i am from runable Thread-0
==============================================
i am from runable Thread-0
i am from runable Thread-0
i am from runable Thread-0
i am from runable Thread-0
i am from runable Thread-0
- synchronized:
synchronized(this) 其中this 指的是锁住当前类的对象, 所以当是继承Thread时 实例化多个Thread对象并不能 锁住多个对象的方法
synchronize 被修饰在方法和代码块上 使用在代码块上更加能提高效率
结论:
- 当一个线程访问某个对象的synchronize关键字修饰代码块或者方法 ,其他线程访问该对象的 该synchronize关键字修饰代码块或者方法被阻塞
- 当一个线程访问某个对象的synchronize关键字修饰代码块或者方法 ,其他线程还可以访问该对象的其它非同步的代码块
- 当一个线程访问某个对象的synchronize关键字修饰代码块或者方法 ,其他线程访问该对象的其它同步的代码块被阻塞
- notify 基于线程调度算法,随机唤醒当前对象一个等待的线程。
- notifyAll 基于线程调度算法,随机唤醒当前对象所有等待的线程。
线程调度算法:
- 先进先出
- 最短耗时任务优先
- 时间片轮转
- 最大最小公平算法
- wait 的作用是让当前线程进入等待状态,同时,wait( )也会让当前线程释放它所持有的锁。”直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入”就绪状态”)
- sleep 让线程睡眠,不释放锁
2.3.2 实例
public class TestPC {
public static void main(String[] args) {
YaoDian yaoDian = new YaoDian();
Thread p1 = new Thread(new Producer(yaoDian));
// Thread p2 = new Thread(new Producer(yaoDian));
p1.start();
// p2.start();
Thread c1 = new Thread(new Customer(yaoDian));
// Thread c2 = new Thread(new Customer(yaoDian));
c1.start();
// c2.start();
}
}
class KouZhao {
KouZhao(String code){
this.code = code;
}
private String code;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
}
class YaoDian{
//线程安全集合
public static volatile List<KouZhao> kzList = new ArrayList<>();
public synchronized void inKz(KouZhao kouZhao) throws Exception{
while (kzList.size() == 3){ //使用while 的意思是 当前线程被唤醒后继续判断
System.out.println("口罩库存已满:" + kzList.size());
this.wait();
}
Thread.sleep(100);
System.out.println("生产口罩:" + kouZhao.getCode());
kzList.add(kouZhao);
this.notifyAll();
}
public synchronized void outKz() throws Exception{
while (kzList.size() == 0){
System.out.println("口罩库存不足!");
this.wait();
}
Thread.sleep(500);
System.out.println("消耗口罩:" + kzList.get(0).getCode() + "剩下口罩数量" + (kzList.size() - 1));
kzList.remove(kzList.get(0));
this.notifyAll();
}
}
//消费者
class Producer implements Runnable{
private YaoDian yaoDian;
Producer(YaoDian yaoDian){
this.yaoDian = yaoDian;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
KouZhao kz = new KouZhao("BH" + i);
yaoDian.inKz(kz);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//生产者
class Customer implements Runnable{
private YaoDian yaoDian;
Customer(YaoDian yaoDian){
this.yaoDian = yaoDian;
}
@Override
public void run() {
try {
// while (true) {
for (int i = 0; i < 5; i++) {
yaoDian.outKz();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.原理
3.1 CAS compare and swap
结构图:
解释:先读取当前值 a 假设 a = 1,修改后再次读取a的值 ,假如两次一样 则可以修改,不一样不能修改。
应用:java concurrent 包下面的 原子类的操作。如: AtomicInteger
底层:调用汇编指令 lock cmpxchg
3.2 volatile
1 : 保持线程可见性。
public class TestVolatile {
public static void main(String[] args) {
Thread t = new Thread(new VolatileClass());
t.start();
//
while (true){
if(VolatileClass.a == 1){
System.out.println("读取到了a = 1");
}
}
}
}
class VolatileClass implements Runnable{
public static int a = 0;
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
a = 1;
}
}
解释: 线程会读取常量池的数据,修改的话会同步到常量池中,因为上面的例子中,子线程睡眠2秒导致主线程执行了 while的内存,所以不会再主动修改a的数值。
加上volatile 关键字后: 相当于每一次获取值是直接从常量池 直接重新拿生成的新的副本。
2:禁止指令重排 加上 lock
class ClassSingleton{
public static volatile ClassSingleton classSingleton = null;
private ClassSingleton(){}
private ClassSingleton getSingleton(){
if(null == classSingleton){
synchronized (ClassSingleton.class){
if(null == classSingleton){
classSingleton = new ClassSingleton();
}
}
}
return classSingleton;
}
}
在单例设计模式中,双重check加锁实现多线程的单例模式,在定义class时,在上volatile防止在指令重排时,对象new 一半的时候返回。
3.3 synchronized
在JVM 2.6 有关于对象的内存结构的详细分析,我们知道锁信息是存放在对象头信息中的。
synchronized :过程分析,
- 首先是加上偏向锁(就是在对象头的前三位中存放这个要加锁的线程id,如果下次有线程执行的时候,发现id相同直接执行,以增加效率),以避免CPU效率低。这个叫偏向锁。
假如是多线程,就是说其他线程也要和之前的线程来争抢这把锁,会通过CAS的方式来获取到这把锁。
假如现在头信息存储的线程id是1,线程1去获取锁的时候首选读取是1,然修改成1,然后再读一次对象头的线程id还是1,则算获取到锁。线程2也执行类型过程:首先读取线程id是1,然后准备修改为2,修改前再去读一次还是1,则修改成功,这个过程只有一个线程能获取到当前锁。 这个过程叫做自旋锁。
如果自旋次数超过10time 就进入重量级锁。竞争太激烈的情况。
—2021.07.03
就是jvm给synchronized这个方法做了一些优化,并不是最开始就使用指令把同步代码块锁住,当一段同步代码没有被线程访问的时候,这个时候是处于无锁状态(0),如果这个同步代码块一只被同一个线程访问,那么jvm会给这段代码块加上偏量锁(1),如果不断的有线程进来,那么其他线程开始竞争的时候使用的是cas方式去竞争,这个时候jvm又会把锁升级成自旋锁(00),如果10次都没获取到锁,则进入重量级锁(11),开启指令。
用户态:大部分操作程序叫 用户态,但是一些特殊操作需要调用内核。比如加锁。
内核态:指的是和硬件的内核操作。
3.4 AQS
AbstractQueuedSynchronizer(以下简写AQS)这个抽象类
sync : 上面说了 是 偏向锁 自旋锁 重量级锁的演变
ReentrantLock :jvm实现
- 自旋
- park
- CAS
加锁过程:判断是否有锁,没有 -> 判断是否需要排队,不需要就直接返回。有锁,入队,
TODO
1.使用park 实现锁
public class TestPark {
public static void main(String[] args) {
Thread t = new Thread(new ParkClass());
t.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main");
LockSupport.unpark(t);
}
}
class ParkClass implements Runnable{
@Override
public void run() {
System.out.println("执行 1");
LockSupport.park();
System.out.println("执行2");
}
}
4.线程池
4.1概念
4.1.1 线程模型:
用户线程(ULT):用户程序实现,不依赖操作系统核心,应用提供创建,同步,调度,管理线程的函数来控制用户线程,不需要 用户态/内核态切换 速度快,内核对ULT 无感知,线程阻塞则进程阻塞。
内核线程(KLT):系统内核管理线程,内核保存线程状态和上下文信息,线程阻塞不会引起进程阻塞,多线程处理器并行运行,线程创建调度和管理 由内核完成,效率不ULT慢。
java 创建线程采用的是 KLT.
综上:线程是稀缺资源,创建或者销毁耗费资源,java创建线程是内核操作,需要频繁切换,线程池就是一个线程缓存,负责对线程的统一分配,调优和 调度。
阻塞队列:在任意时刻,只有一个线程可以对该队列进行入队或者出队操作。是线程安全的。
4.2基本原理:
线程五种状态:
- running
能接受新的任务 以及处理添加的任务
- shutdown
不接受新任务 可以处理添加的任务
- stop
不接新的任务,不处理已经添加的任务,并中断正在处理的任务
- tidying
所有的任务已经终止,ctl记录的任务数量为0 ctl负责记录线程池的运行状态以及线程数量
- terminated
线程池彻底终止
shutdown:不接受新任务 可以处理添加的任务
shutdown now:stop
执行task的时候 会调用execute 方法
int c = ctl.get();
// 1. 如果工作线程数小于核心线程数(corePoolSize),则创建一个工作线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果当前是running状态,并且任务队列能够添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列已经满了的情况下,则新启动一个工作线程来执行任务
else if (!addWorker(command, false))
reject(command);
而在addWorker方法中还存在有一些必要的判断逻辑,比如当前线程池是否是非running状态,队列是否为空等条件,当然最主要的逻辑还是判断当前工作线程数量是否大于maximumPoolSize以及启动工作线程执行任务。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
for (;;) {
int wc = workerCountOf(c);
// 1. 判断当前工作线程是否满足条件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 2. 增加工作线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// 3. 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
if (workerAdded) {
// 4. 运行工作线程
t.start();
workerStarted = true;
}
return workerStarted;
}
总结:
- 提交一个任务,如果线程池中的工作线程数小于corePoolSize,则新建一个工作线程执行任务
- 如果线程池当前的工作线程已经等于了corePoolSize,则将新的任务放入到工作队列中正在执行
- 如果工作队列已经满了,并且工作线程数小于maximumPoolSize,则新建一个工作线程来执行任务
- 如果当前线程池中工作线程数已经达到了maximumPoolSize,而新的任务无法放入到任务队列中,则采用对应的策略进行相应的处理(默认是拒绝策略)
4.3线程池线程安全
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池用整个int 32 为去记录整个线程池的生命状态
高三位记录记录线程池生命状态,低29记录当前线程数
4.4java提供的四种常用线程池
4.4.1 FixedThreadPool()
- 固定大小的线程池,可以指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值。
- 该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。同时使用无界的LinkedBlockingQueue来存放执行的任务。
当任务提交十分频繁的时候,LinkedBlockingQueue迅速增大,存在着耗尽系统资源的问题。而且在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源,需要shutdown。。
public class FixPoolDemo {
private static Runnable getThread(final int i) {
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}
};
}
public static void main(String args[]) {
ExecutorService fixPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
fixPool.execute(getThread(i));
}
fixPool.shutdown();
}
}
4.4.2 SingleThreadExecutor()
定长线程池:
可控制线程最大并发数(同时执行的线程数)
超出的线程会在队列中等待
按照先入先出的顺序执行任务
public class SingPoolDemo {
private static Runnable getThread(final int i){
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}
};
}
public static void main(String args[]) throws InterruptedException {
ExecutorService singPool = Executors.newSingleThreadExecutor();
for (int i=0;i<10;i++){
singPool.execute(getThread(i));
}
singPool.shutdown();
}
}
4.4.3 ScheduledThreadPool()
定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。
scheduleAtFixedRate:是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
schedultWithFixedDelay:是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
public class ScheduledExecutorServiceDemo {
public static void main(String args[]) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(4000);
System.out.println(Thread.currentThread().getId() + "执行了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}
4.4.4 CachedThreadPool()
- 缓存线程池,缓存的线程默认存活60秒。线程的核心池corePoolSize大小为0,核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。是一个直接提交的阻塞队列他总会迫使线程池增加新的线程去执行新的任务。
在没有任务执行时,当线程的空闲时间超过keepAliveTime(60秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
public class CachePool {
private static Runnable getThread(final int i){
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
}catch (Exception e){
}
System.out.println(i);
}
};
}
public static void main(String args[]){
ExecutorService cachePool = Executors.newCachedThreadPool();
for (int i=1;i<=10;i++){
cachePool.execute(getThread(i));
}
}
}
4.5阿里规范
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
阿里巴巴建立手动创建线程池、
手动创建线程池有几个注意点
1.任务独立。如何任务依赖于其他任务,那么可能产生死锁。例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
2.合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在Java并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如
Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。
3.设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小=NCPU *UCPU(1+W/C)。
4.选择合适的阻塞队列。newFixedThreadPool和newSingleThreadExecutor都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用SynchronousQueue来避免任务排队,以直接将任务从生产者提交到工作者线程。
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。
下面我们就对ThreadPoolExecutor的使用方法进行一个详细的概述。
首先看下ThreadPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数的参数含义如下:
corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
unit:keepAliveTime的单位
workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
threadFactory:线程工厂,用于创建线程,一般用默认即可;
handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;
接下来我们对其中比较重要参数做进一步的了解:
一、workQueue任务队列
上面我们已经介绍过了,它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;
1、直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,没执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public ThreadTask() {
}
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
输出:
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)
可以看到,当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。
使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;
2、有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示
static ExecutorService fixPool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
3、无界的任务队列:有界任务队列可以使用LinkedBlockingQueue实现,如下所示
static ExecutorService fixPool0 = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//优先任务队列
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<20;i++) {
pool.execute(new ThreadTask(i));
}
}
}
public class ThreadTask implements Runnable,Comparable<ThreadTask>{
private int priority;
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public ThreadTask() {
}
public ThreadTask(int priority) {
this.priority = priority;
}
//当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
public int compareTo(ThreadTask o) {
return this.priority>o.priority?-1:1;
}
public void run() {
try {
//让线程阻塞,使后续任务进入缓存队列
Thread.sleep(1000);
System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我们来看下执行的结果情况
priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1
大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。
通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
二、拒绝策略
一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池”超载”的情况。ThreadPoolExecutor自带的拒绝策略如下:
1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;
4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码:
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定义拒绝策略
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"执行了拒绝策略");
}
});
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
try {
//让线程阻塞,使后续任务进入缓存队列
Thread.sleep(1000);
System.out.println("ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
输出结果:
com.hhxx.test.ThreadTask@33909752执行了拒绝策略
com.hhxx.test.ThreadTask@55f96302执行了拒绝策略
com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;
三、ThreadFactory自定义线程创建
线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定义线程工厂
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("线程"+r.hashCode()+"创建");
//线程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
//输出执行线程的名称
System.out.println("ThreadName:"+Thread.currentThread().getName());
}
}
我们看下输出结果
线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170
可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。
四、ThreadPoolExecutor扩展
ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,
1、beforeExecute:线程池中任务运行前执行
2、afterExecute:线程池中任务运行完毕后执行
3、terminated:线程池退出后执行
通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args ) throws InterruptedException
{
//实现自定义接口
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("线程"+r.hashCode()+"创建");
//线程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy()) {
protected void beforeExecute(Thread t,Runnable r) {
System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
}
protected void afterExecute(Runnable r,Throwable t) {
System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
}
protected void terminated() {
System.out.println("线程池退出");
}
};
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask("Task"+i));
}
pool.shutdown();
}
}
public class ThreadTask implements Runnable{
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public ThreadTask(String name) {
this.setTaskName(name);
}
public void run() {
//输出执行线程的名称
System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
}
}
我看下输出结果
线程118352462创建
线程1550089733创建
准备执行:Task0
准备执行:Task1
TaskNameTask0—-ThreadName:threadPool118352462
线程865113938创建
执行完毕:Task0
TaskNameTask1—-ThreadName:threadPool1550089733
执行完毕:Task1
准备执行:Task3
TaskNameTask3—-ThreadName:threadPool1550089733
执行完毕:Task3
准备执行:Task2
准备执行:Task4
TaskNameTask4—-ThreadName:threadPool1550089733
执行完毕:Task4
准备执行:Task5
TaskNameTask5—-ThreadName:threadPool1550089733
执行完毕:Task5
准备执行:Task6
TaskNameTask6—-ThreadName:threadPool1550089733
执行完毕:Task6
准备执行:Task8
TaskNameTask8—-ThreadName:threadPool1550089733
执行完毕:Task8
准备执行:Task9
TaskNameTask9—-ThreadName:threadPool1550089733
准备执行:Task7
执行完毕:Task9
TaskNameTask2—-ThreadName:threadPool118352462
TaskNameTask7—-ThreadName:threadPool865113938
执行完毕:Task7
执行完毕:Task2
线程池退出
可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池, 当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
五、线程池线程数量
线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可
/**
* Nthreads=CPU数量
* Ucpu=目标CPU的使用率,0<=Ucpu<=1
* W/C=任务等待时间与任务计算时间的比率
*/
Nthreads = Ncpu*Ucpu*(1+W/C)