一、自定义线程池
1.图解:
- 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
- 主线程类似于生产者,产生任务并放入阻塞队列中
- 线程池类似于消费者,得到阻塞队列中已有的任务并执行
2.具体实现
package panw.ThreadPool;
import javafx.concurrent.Worker;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1, 1, TimeUnit.SECONDS, 1,
(queue,task)->{
// queue.put(task); // 死等
// queue.offer(task,50,TimeUnit.MILLISECONDS); //超时
// log.debug("放弃{}",task); //放弃,后面任务继续会执行
// throw new RuntimeException("任务执行失败"+task); //抛出异常,后面任务不执行
// task.run(); //自己执行
}
);
for (int i = 0; i < 15; i++) {
int j=i;
threadPool.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("{}",j);
});
}
}
}
@Slf4j
class ThreadPool{
private BlockingQueue<Runnable> queue ;
private HashSet<Worker> workers =new HashSet<>();
private int coreSize;
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int capacity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
queue=new BlockingQueue<>(capacity);
this.rejectPolicy=rejectPolicy;
}
public void execute(Runnable task){
synchronized (workers){
if (workers.size()<coreSize){
Worker worker = new Worker(task);
log.debug("新增 worker{}, {}", worker, task);
workers.add(worker);
worker.start();
}else {
// queue.put(task);
queue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task!=null||(task=queue.poll(timeout,timeUnit))!=null){
try {
log.debug("正在执行...{}", task);
task.run();
}catch (Exception e){
}finally {
task=null;
}
}
synchronized (workers){
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
@Slf4j
class BlockingQueue<T>{
private Deque<T> queue ;
private int capacity;
private ReentrantLock lock;
private Condition fullWait;
private Condition emptyWait;
public BlockingQueue(int capacity) {
this.queue=new ArrayDeque<>();
this.capacity=capacity;
this.lock= new ReentrantLock();
this.fullWait=lock.newCondition();
this.emptyWait= lock.newCondition();
}
//阻塞添加
public void put(T t){
lock.lock();
try {
while (queue.size()==capacity){
log.debug("等待加入任务队列 {}", t);
fullWait.await();
}
log.debug("加入任务队列 {}", t);
queue.addLast(t);
emptyWait.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
//带超时添加
public boolean offer(T t,long timeout,TimeUnit timeUnit){
lock.lock();
long nanos = timeUnit.toNanos(timeout);
try {
while (queue.size()==capacity){
if (nanos<=0){
log.debug("超时了...删除{}",t);
return false;
}
log.debug("等待加入任务队列 {} ...", t);
nanos = fullWait.awaitNanos(nanos);
}
log.debug("加入任务队列 {}", t);
queue.addLast(t);
emptyWait.signal();
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
//阻塞获取
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
emptyWait.await();
}
T t = queue.removeFirst();
fullWait.signal();
return t;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
//带超时获取
public T poll(long timeout, TimeUnit timeUnit){
lock.lock();
long nanos = timeUnit.toNanos(timeout);
try {
while (queue.isEmpty()){
if (nanos<=0){
return null;
}
nanos = emptyWait.awaitNanos(nanos);
}
T t = queue.removeFirst();
fullWait.signal();
return t;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
//容量
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if (queue.size()==capacity){
rejectPolicy.reject(this,task);
}else{
log.debug("加入阻塞队列{}",task);
queue.addLast(task);
emptyWait.signal();
}
}finally {
lock.unlock();
}
}
}
@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
二、ThreadPoolExecutor
1.继承关系
2.线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
从源码可以看出:
状态名称 | 高3位的值 | 描述 |
---|---|---|
RUNNING | 111 | 接收新任务,同时处理任务队列中的任务 |
SHUTDOWN | 000 | 不接受新任务,但是处理任务队列中的任务 |
STOP | 001 | 中断正在执行的任务,同时抛弃阻塞队列中的任务 |
TIDYING | 010 | 任务执行完毕,活动线程为0时,即将进入终结阶段 |
TERMINATED | 011 | 终结状态 |
// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 并不是所有平台的int都是32位。
// 去掉前三位保存线程状态的位数,剩下的用于保存线程数量
// 高3位为0,剩余位数全为1
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^COUNT_BITS次方,表示可以保存的最大线程数
// CAPACITY 的高3位为 0
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
- 使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
线程池内部属性也是包含了上述自定义线程池的。
// 工作线程,内部封装了Thread
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
...
}
// 阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
// 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
private final HashSet<Worker> workers = new HashSet<Worker>();
3.构造方法及参数
直接来看看最多参数的构造:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解释
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- maximumPoolSize - corePoolSize = 救急线程数
- keepAliveTime:救急线程空闲时的最大生存时间
- unit:时间单位
- workQueue:阻塞队列(存放任务)
- 有界阻塞队列 ArrayBlockingQueue
- 无界阻塞队列 LinkedBlockingQueue
- 最多只有一个同步元素的 SynchronousQueue
- 优先队列 PriorityBlockingQueue
- threadFactory:线程工厂(给线程取名字)不重要
-
工作流程:
当一个任务传给线程池以后,可能有以下几种可能
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy:放弃本次任务
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
CallerRunsPolicy:让调用者运行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
简单使用
```java package panw.ThreadPool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;
@Slf4j public class ThreadPoolExecutorTest { public static void main(String[] args) { // new ThreadPoolExecutor(1,4,1, TimeUnit.SECONDS,); AtomicInteger atomic = new AtomicInteger(0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), r -> new Thread(r,"MyThread-"+atomic.getAndIncrement()), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(()->{
log.debug(String.valueOf(Thread.currentThread()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
}
<a name="Pd20a"></a>
### 4.常见四种线程池
<a name="Y0vYw"></a>
#### FixedThreadPool()
定长线程池:
- 可控制线程最大并发数(同时执行的线程数)
- 超出的线程会在队列中等待
```java
//nThreads => 最大线程数即maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
//构造
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
CachedThreadPool
可缓存线程池:
- 线程数无限制
- 有空闲线程则复用空闲线程,若无空闲线程则新建线程
一定程序减少频繁创建/销毁线程,减少系统开销
//创建
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//构造
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SingleThreadExecutor()
单线程化的线程池:
有且仅有一个工作线程执行任务
所有任务按照指定顺序执行,即遵循队列的入队出队规则
//创建
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
//构造
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
几个注意
SingleThread和自己创建一个线程来运行多个任务的区别
- 当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而SingleThread会创建一个新线程,继续执行任务队列中剩余的任务。
SingleThread和newFixedThreadPool(1)的区别
// 强转为ThreadPoolExecutor
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
// 改变核心线程数
threadPool.setCorePoolSize(2);
- newFixedThreadPool(1)传值为1,可以将FixedThreadPool强转为ThreadPoolExecutor,然后通过setCorePoolSize改变核心线程数
- 而SingleThread无法修改核心线程数,因为返回的是
FinalizableDelegatedExecutorService
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ScheduledThreadPool()
定时线程池:
支持定时及周期性任务执行 ```java //nThreads => 最大线程数即maximumPoolSize ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
//构造 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
//ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
<a name="oGkND"></a>
##### 定时功能
```java
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
//定时,以固定频率,上一个任务开始的时间计时,一个period后,检测上一个任务是否执行完毕,
// 如果上一个任务执行完毕,则当前任务立即执行,
// 如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
pool.scheduleAtFixedRate(()->{log.debug(LocalDateTime.now().toString());},1,5, TimeUnit.SECONDS);
//1表示首次执行任务的延迟时间,5表示每次执行任务的间隔时间,TimeUnit.SECONDS执行的时间间隔数值单位
//定时,以固定的延时 delay(延时)指的是一次执行终止和下一次执行开始之间的延迟。
pool.scheduleWithFixedDelay(()->{log.debug(LocalDateTime.now().toString());},1,5,TimeUnit.SECONDS);