一、自定义线程池
1.图解:

- 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
- 主线程类似于生产者,产生任务并放入阻塞队列中
- 线程池类似于消费者,得到阻塞队列中已有的任务并执行
2.具体实现
package panw.ThreadPool;import javafx.concurrent.Worker;import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;import java.util.Deque;import java.util.HashSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;@Slf4jpublic class ThreadPoolTest {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1, TimeUnit.SECONDS, 1,(queue,task)->{// queue.put(task); // 死等// queue.offer(task,50,TimeUnit.MILLISECONDS); //超时// log.debug("放弃{}",task); //放弃,后面任务继续会执行// throw new RuntimeException("任务执行失败"+task); //抛出异常,后面任务不执行// task.run(); //自己执行});for (int i = 0; i < 15; i++) {int j=i;threadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("{}",j);});}}}@Slf4jclass ThreadPool{private BlockingQueue<Runnable> queue ;private HashSet<Worker> workers =new HashSet<>();private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int capacity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;queue=new BlockingQueue<>(capacity);this.rejectPolicy=rejectPolicy;}public void execute(Runnable task){synchronized (workers){if (workers.size()<coreSize){Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();}else {// queue.put(task);queue.tryPut(rejectPolicy,task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {while (task!=null||(task=queue.poll(timeout,timeUnit))!=null){try {log.debug("正在执行...{}", task);task.run();}catch (Exception e){}finally {task=null;}}synchronized (workers){log.debug("worker 被移除{}", this);workers.remove(this);}}}}@Slf4jclass BlockingQueue<T>{private Deque<T> queue ;private int capacity;private ReentrantLock lock;private Condition fullWait;private Condition emptyWait;public BlockingQueue(int capacity) {this.queue=new ArrayDeque<>();this.capacity=capacity;this.lock= new ReentrantLock();this.fullWait=lock.newCondition();this.emptyWait= lock.newCondition();}//阻塞添加public void put(T t){lock.lock();try {while (queue.size()==capacity){log.debug("等待加入任务队列 {}", t);fullWait.await();}log.debug("加入任务队列 {}", t);queue.addLast(t);emptyWait.signal();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}//带超时添加public boolean offer(T t,long timeout,TimeUnit timeUnit){lock.lock();long nanos = timeUnit.toNanos(timeout);try {while (queue.size()==capacity){if (nanos<=0){log.debug("超时了...删除{}",t);return false;}log.debug("等待加入任务队列 {} ...", t);nanos = fullWait.awaitNanos(nanos);}log.debug("加入任务队列 {}", t);queue.addLast(t);emptyWait.signal();return true;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}//阻塞获取public T take(){lock.lock();try {while (queue.isEmpty()){emptyWait.await();}T t = queue.removeFirst();fullWait.signal();return t;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}//带超时获取public T poll(long timeout, TimeUnit timeUnit){lock.lock();long nanos = timeUnit.toNanos(timeout);try {while (queue.isEmpty()){if (nanos<=0){return null;}nanos = emptyWait.awaitNanos(nanos);}T t = queue.removeFirst();fullWait.signal();return t;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}//容量public int size(){lock.lock();try {return queue.size();}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {//判断队列是否已满if (queue.size()==capacity){rejectPolicy.reject(this,task);}else{log.debug("加入阻塞队列{}",task);queue.addLast(task);emptyWait.signal();}}finally {lock.unlock();}}}@FunctionalInterfaceinterface RejectPolicy<T>{void reject(BlockingQueue<T> queue,T task);}
二、ThreadPoolExecutor
1.继承关系
2.线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
从源码可以看出:
| 状态名称 | 高3位的值 | 描述 |
|---|---|---|
| RUNNING | 111 | 接收新任务,同时处理任务队列中的任务 |
| SHUTDOWN | 000 | 不接受新任务,但是处理任务队列中的任务 |
| STOP | 001 | 中断正在执行的任务,同时抛弃阻塞队列中的任务 |
| TIDYING | 010 | 任务执行完毕,活动线程为0时,即将进入终结阶段 |
| TERMINATED | 011 | 终结状态 |
// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 并不是所有平台的int都是32位。// 去掉前三位保存线程状态的位数,剩下的用于保存线程数量// 高3位为0,剩余位数全为1private static final int COUNT_BITS = Integer.SIZE - 3;// 2^COUNT_BITS次方,表示可以保存的最大线程数// CAPACITY 的高3位为 0private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
- 使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
线程池内部属性也是包含了上述自定义线程池的。
// 工作线程,内部封装了Threadprivate final class Workerextends AbstractQueuedSynchronizerimplements Runnable {...}// 阻塞队列,用于存放来不及被核心线程执行的任务private final BlockingQueue<Runnable> workQueue;// 锁private final ReentrantLock mainLock = new ReentrantLock();// 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)private final HashSet<Worker> workers = new HashSet<Worker>();
3.构造方法及参数
直接来看看最多参数的构造:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
参数解释
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- maximumPoolSize - corePoolSize = 救急线程数
- keepAliveTime:救急线程空闲时的最大生存时间
- unit:时间单位
- workQueue:阻塞队列(存放任务)
- 有界阻塞队列 ArrayBlockingQueue
- 无界阻塞队列 LinkedBlockingQueue
- 最多只有一个同步元素的 SynchronousQueue
- 优先队列 PriorityBlockingQueue
- threadFactory:线程工厂(给线程取名字)不重要
-
工作流程:
当一个任务传给线程池以后,可能有以下几种可能
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
DiscardPolicy:放弃本次任务
public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
CallerRunsPolicy:让调用者运行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
简单使用
```java package panw.ThreadPool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;
@Slf4j public class ThreadPoolExecutorTest { public static void main(String[] args) { // new ThreadPoolExecutor(1,4,1, TimeUnit.SECONDS,); AtomicInteger atomic = new AtomicInteger(0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), r -> new Thread(r,"MyThread-"+atomic.getAndIncrement()), new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 10; i++) {threadPoolExecutor.execute(()->{log.debug(String.valueOf(Thread.currentThread()));try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}});}}
}
<a name="Pd20a"></a>### 4.常见四种线程池<a name="Y0vYw"></a>#### FixedThreadPool()定长线程池:- 可控制线程最大并发数(同时执行的线程数)- 超出的线程会在队列中等待```java//nThreads => 最大线程数即maximumPoolSizeExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);//构造public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}
CachedThreadPool
可缓存线程池:
- 线程数无限制
- 有空闲线程则复用空闲线程,若无空闲线程则新建线程
一定程序减少频繁创建/销毁线程,减少系统开销
//创建ExecutorService cachedThreadPool = Executors.newCachedThreadPool();//构造public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
SingleThreadExecutor()
单线程化的线程池:
有且仅有一个工作线程执行任务
所有任务按照指定顺序执行,即遵循队列的入队出队规则
//创建ExecutorService singleThreadPool = Executors.newSingleThreadPool();//构造public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
几个注意
SingleThread和自己创建一个线程来运行多个任务的区别
- 当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而SingleThread会创建一个新线程,继续执行任务队列中剩余的任务。
SingleThread和newFixedThreadPool(1)的区别
// 强转为ThreadPoolExecutorThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);// 改变核心线程数threadPool.setCorePoolSize(2);
- newFixedThreadPool(1)传值为1,可以将FixedThreadPool强转为ThreadPoolExecutor,然后通过setCorePoolSize改变核心线程数
- 而SingleThread无法修改核心线程数,因为返回的是
FinalizableDelegatedExecutorServicepublic static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
ScheduledThreadPool()
定时线程池:
支持定时及周期性任务执行 ```java //nThreads => 最大线程数即maximumPoolSize ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
//构造 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
//ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
<a name="oGkND"></a>##### 定时功能```javaScheduledExecutorService pool = Executors.newScheduledThreadPool(2);//定时,以固定频率,上一个任务开始的时间计时,一个period后,检测上一个任务是否执行完毕,// 如果上一个任务执行完毕,则当前任务立即执行,// 如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。pool.scheduleAtFixedRate(()->{log.debug(LocalDateTime.now().toString());},1,5, TimeUnit.SECONDS);//1表示首次执行任务的延迟时间,5表示每次执行任务的间隔时间,TimeUnit.SECONDS执行的时间间隔数值单位//定时,以固定的延时 delay(延时)指的是一次执行终止和下一次执行开始之间的延迟。pool.scheduleWithFixedDelay(()->{log.debug(LocalDateTime.now().toString());},1,5,TimeUnit.SECONDS);

