自定义线程池
ThreadPool
ThreadPool是核心线程池,线程池在初始化时传入参数,构建任务队列,线程池大小,队列容量,以及队列满时的拒绝策略等。其中Worker是工作线程,用来执行传入的任务,
public class ThreadPool {/*** 线程池核心线程数量*/private int coreSize;/*** 提交的任务队列*/private BlockQueue<Runnable> queue;/*** 拒接策略*/private RejectPolicy<Runnable> rejectPolicy;/*** 任务的超时时间*/private long timeOut;/*** 超时时间单位*/private TimeUnit timeUnit;/*** 工作线程集合*/private HashSet<Worker> workers = new HashSet();public ThreadPool(int coreSize, int queueSize, long timeOut, TimeUnit timeUnit) {this.coreSize = coreSize;this.queue = new BlockQueue<>(queueSize);this.timeOut = timeOut;this.timeUnit = timeUnit;}public ThreadPool(int coreSize, int queueSize, long timeOut, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.queue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;this.timeOut = timeOut;this.timeUnit = timeUnit;}/*** 执行任务*/public void execute(Runnable task) {if (workers.size() < coreSize) {log.debug("线程池空间未满,直接创建线程...");Worker worker = new Worker(task);workers.add(worker);worker.start();} else {log.debug("线程池空间已满,提交任务进入队列等待...");log.info("线程池已满,任务放入队列...");// queue.put(task);// queue.offer(task, 500, TimeUnit.MILLISECONDS);queue.tryPut(rejectPolicy, task);}}}
Worker
Worker是一个工作线程,用来执行传入的任务,执行任务前需要判断任务是否为空,或者从队列中取出一个不为空的任务执行,如果没有任务,则线程结束。
/*** 工作线程*/class Worker extends Thread {/*** 执行的任务*/private Runnable task;public Worker(Runnable task) {this.task = task;}/*** 执行任务* 1) 当 task 不为空,执行任务* 2) 当 task 执行完毕,再接着从任务队列获取任务并执行*/@Overridepublic void run() {// while (null != task || (task = queue.take()) != null) {while (null != task || (task = queue.poll(500, TimeUnit.MILLISECONDS)) != null) {try {log.debug("{}正在执行...", task);task.run();} catch (Exception e) {log.debug("任务执行异常...");e.printStackTrace();} finally {log.debug("{}执行结束...", task);task = null;}}synchronized (workers) {log.debug("所有任务都被执行完毕,工作线程结束,workers是共享变量,需要保护");workers.remove(this);}}}
BlockQueue
BlockQueue是自定义的一个队列,队列中有相应的存和取的方法,用于存放和消费任务,因为对列会被多个线程共享,所以需要对其进行线程安全保护,所以额外加了锁保护,以及等待队列。
public class BlockQueue<T> {/*** 存放任务的队列*/private Deque<T> queue;/*** 保证线程安全的锁*/private ReentrantLock lock = new ReentrantLock();/*** 队列已满的等待集合*/private Condition fullCondition = lock.newCondition();/*** 对列空闲的等待集合*/private Condition emptyCondition = lock.newCondition();/*** 队列的最大容量*/private int capcity;public BlockQueue(int capcity) {this.capcity = capcity;this.queue = new ArrayDeque<>(capcity);}/*** 带超时时间的加入任务,如果在规定时间内未完成队列加入,则丢弃任务*/public void offer(T t, long timeOut, TimeUnit timeUnit) {lock.lock();try {log.debug("统一时间单位...");long nanos = timeUnit.toNanos(timeOut);while (queue.size() == capcity) {log.debug("队列已满,等待任务被执行后再加入...");nanos = fullCondition.awaitNanos(nanos);if (nanos < 0) {log.debug("超时结束等待,丢弃任务,awaitNanos 返回的是,入如果当前线程被虚假唤醒,则当前线程还应该等多久");return;}}log.debug("队列还有空间...");queue.addLast(t);log.debug("通知消费线程取任务...");emptyCondition.signalAll();} catch (InterruptedException e) {log.debug("阻塞被打断...");e.printStackTrace();} finally {lock.unlock();}}/*** @throws* @description 带拒绝策略的put* @author SongHongWei* @params* @updateTime 2021/10/4 17:23*/public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {if (queue.size() == capcity) {log.info("队列已满,等待任务被执行后再加入...");rejectPolicy.reject(this, task);} else {log.debug("队列还有空间...");queue.addLast(task);log.debug("通知消费线程取任务...");emptyCondition.signalAll();}} finally {lock.unlock();}}/*** 在对列里加入新任务* 阻塞添加** @param t*/public void put(T t) {lock.lock();try {//不用if的原因是防止线程被虚假唤醒,即条件不满足时被唤醒while (queue.size() == capcity) {log.info("队列已满,等待任务被执行后再加入...");fullCondition.await();}log.debug("队列还有空间...");queue.addLast(t);log.debug("通知消费线程取任务...");emptyCondition.signalAll();} catch (InterruptedException e) {log.debug("阻塞被打断...");e.printStackTrace();} finally {lock.unlock();}}/*** 获取任务,需要考虑线程安全* 阻塞获取** @return*/public T take() {lock.lock();try {while (queue.isEmpty()) {log.info("队列已空,等待任务加入...");try {emptyCondition.await();} catch (InterruptedException e) {log.debug("阻塞被打断...");e.printStackTrace();}}T first = queue.removeFirst();log.debug("唤醒因队列满了导致在阻塞中的put线程");fullCondition.signalAll();return first;} finally {lock.unlock();}}/*** 带超时时间的任务获取** @param timeOut* @param timeUnit* @return*/public T poll(long timeOut, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeOut);while (queue.isEmpty()) {log.info("队列已空,等待任务加入...");try {nanos = emptyCondition.awaitNanos(nanos);if (nanos < 0) {log.debug("超时结束等待,丢弃任务,awaitNanos 返回的是,入如果当前线程被虚假唤醒,则当前线程还应该等多久");return null;}} catch (InterruptedException e) {log.debug("阻塞被打断...");e.printStackTrace();}}T first = queue.removeFirst();log.debug("唤醒因队列满了导致在阻塞中的put线程");fullCondition.signalAll();return first;} finally {lock.unlock();}}}
RejectPolicy
RejectPolicy是一个接口函数,用来指定当队列满的时候,采用什么样的方式任务,例如,继续等待直到队列有任务被消费了,或者带超时时间的等待,或者放弃任务,或者抛出异常等等。
/*** @author fmj* 抽象一个拒接策略,用来处理当线程池的线程都处于busy状态,还有任务提交进来时的处理方式* @date 2021 2021/9/24 15:06*/@FunctionalInterfacepublic interface RejectPolicy<T> {void reject(BlockQueue<T> queue, T task);}
PoolClient
PoolClient是用来调用线程池的,模拟调用场景
/*** @author fmj* 模拟调用自定义线程池* @date 2021 2021/9/24 15:07*/@Slf4jpublic class PoolClient {public static void main(String[] args) {ThreadPool pool = new ThreadPool(2, 3, 1000L, TimeUnit.MILLISECONDS);ThreadPool rejectPool = new ThreadPool(2, 3, 1000L, TimeUnit.MILLISECONDS, (q, t) -> {// log.info("1.队列满时一直等待,知道有线程执行完其他任务");// q.put(t);// log.info("2.队列满时执行带超时时间的等待");// q.offer(t, 2000L, TimeUnit.SECONDS);log.info("3.队列满时,放弃当前任务");// log.info("4.队列满时,由调用者自己执行");// t.run();// log.info("5.队列满时,抛出异常");// throw new RuntimeException("队列已满");});for (int i = 0; i < 10; i++) {int j = i;rejectPool.execute(() -> {try {Thread.sleep(1000);log.info("{}", j);} catch (InterruptedException e) {e.printStackTrace();}});}}}

