自定义线程池
ThreadPool
ThreadPool是核心线程池,线程池在初始化时传入参数,构建任务队列,线程池大小,队列容量,以及队列满时的拒绝策略等。其中Worker是工作线程,用来执行传入的任务,
public class ThreadPool {
/**
* 线程池核心线程数量
*/
private int coreSize;
/**
* 提交的任务队列
*/
private BlockQueue<Runnable> queue;
/**
* 拒接策略
*/
private RejectPolicy<Runnable> rejectPolicy;
/**
* 任务的超时时间
*/
private long timeOut;
/**
* 超时时间单位
*/
private TimeUnit timeUnit;
/**
* 工作线程集合
*/
private HashSet<Worker> workers = new HashSet();
public ThreadPool(int coreSize, int queueSize, long timeOut, TimeUnit timeUnit) {
this.coreSize = coreSize;
this.queue = new BlockQueue<>(queueSize);
this.timeOut = timeOut;
this.timeUnit = timeUnit;
}
public ThreadPool(int coreSize, int queueSize, long timeOut, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.queue = new BlockQueue<>(queueSize);
this.rejectPolicy = rejectPolicy;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
}
/**
* 执行任务
*/
public void execute(Runnable task) {
if (workers.size() < coreSize) {
log.debug("线程池空间未满,直接创建线程...");
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
log.debug("线程池空间已满,提交任务进入队列等待...");
log.info("线程池已满,任务放入队列...");
// queue.put(task);
// queue.offer(task, 500, TimeUnit.MILLISECONDS);
queue.tryPut(rejectPolicy, task);
}
}
}
Worker
Worker
是一个工作线程,用来执行传入的任务,执行任务前需要判断任务是否为空,或者从队列中取出一个不为空的任务执行,如果没有任务,则线程结束。
/**
* 工作线程
*/
class Worker extends Thread {
/**
* 执行的任务
*/
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
/**
* 执行任务
* 1) 当 task 不为空,执行任务
* 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
*/
@Override
public void run() {
// while (null != task || (task = queue.take()) != null) {
while (null != task || (task = queue.poll(500, TimeUnit.MILLISECONDS)) != null) {
try {
log.debug("{}正在执行...", task);
task.run();
} catch (Exception e) {
log.debug("任务执行异常...");
e.printStackTrace();
} finally {
log.debug("{}执行结束...", task);
task = null;
}
}
synchronized (workers) {
log.debug("所有任务都被执行完毕,工作线程结束,workers是共享变量,需要保护");
workers.remove(this);
}
}
}
BlockQueue
BlockQueue是自定义的一个队列,队列中有相应的存和取的方法,用于存放和消费任务,因为对列会被多个线程共享,所以需要对其进行线程安全保护,所以额外加了锁保护,以及等待队列。
public class BlockQueue<T> {
/**
* 存放任务的队列
*/
private Deque<T> queue;
/**
* 保证线程安全的锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 队列已满的等待集合
*/
private Condition fullCondition = lock.newCondition();
/**
* 对列空闲的等待集合
*/
private Condition emptyCondition = lock.newCondition();
/**
* 队列的最大容量
*/
private int capcity;
public BlockQueue(int capcity) {
this.capcity = capcity;
this.queue = new ArrayDeque<>(capcity);
}
/**
* 带超时时间的加入任务,如果在规定时间内未完成队列加入,则丢弃任务
*/
public void offer(T t, long timeOut, TimeUnit timeUnit) {
lock.lock();
try {
log.debug("统一时间单位...");
long nanos = timeUnit.toNanos(timeOut);
while (queue.size() == capcity) {
log.debug("队列已满,等待任务被执行后再加入...");
nanos = fullCondition.awaitNanos(nanos);
if (nanos < 0) {
log.debug("超时结束等待,丢弃任务,awaitNanos 返回的是,入如果当前线程被虚假唤醒,则当前线程还应该等多久");
return;
}
}
log.debug("队列还有空间...");
queue.addLast(t);
log.debug("通知消费线程取任务...");
emptyCondition.signalAll();
} catch (InterruptedException e) {
log.debug("阻塞被打断...");
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* @throws
* @description 带拒绝策略的put
* @author SongHongWei
* @params
* @updateTime 2021/10/4 17:23
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capcity) {
log.info("队列已满,等待任务被执行后再加入...");
rejectPolicy.reject(this, task);
} else {
log.debug("队列还有空间...");
queue.addLast(task);
log.debug("通知消费线程取任务...");
emptyCondition.signalAll();
}
} finally {
lock.unlock();
}
}
/**
* 在对列里加入新任务
* 阻塞添加
*
* @param t
*/
public void put(T t) {
lock.lock();
try {
//不用if的原因是防止线程被虚假唤醒,即条件不满足时被唤醒
while (queue.size() == capcity) {
log.info("队列已满,等待任务被执行后再加入...");
fullCondition.await();
}
log.debug("队列还有空间...");
queue.addLast(t);
log.debug("通知消费线程取任务...");
emptyCondition.signalAll();
} catch (InterruptedException e) {
log.debug("阻塞被打断...");
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 获取任务,需要考虑线程安全
* 阻塞获取
*
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.info("队列已空,等待任务加入...");
try {
emptyCondition.await();
} catch (InterruptedException e) {
log.debug("阻塞被打断...");
e.printStackTrace();
}
}
T first = queue.removeFirst();
log.debug("唤醒因队列满了导致在阻塞中的put线程");
fullCondition.signalAll();
return first;
} finally {
lock.unlock();
}
}
/**
* 带超时时间的任务获取
*
* @param timeOut
* @param timeUnit
* @return
*/
public T poll(long timeOut, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeOut);
while (queue.isEmpty()) {
log.info("队列已空,等待任务加入...");
try {
nanos = emptyCondition.awaitNanos(nanos);
if (nanos < 0) {
log.debug("超时结束等待,丢弃任务,awaitNanos 返回的是,入如果当前线程被虚假唤醒,则当前线程还应该等多久");
return null;
}
} catch (InterruptedException e) {
log.debug("阻塞被打断...");
e.printStackTrace();
}
}
T first = queue.removeFirst();
log.debug("唤醒因队列满了导致在阻塞中的put线程");
fullCondition.signalAll();
return first;
} finally {
lock.unlock();
}
}
}
RejectPolicy
RejectPolicy
是一个接口函数,用来指定当队列满的时候,采用什么样的方式任务,例如,继续等待直到队列有任务被消费了,或者带超时时间的等待,或者放弃任务,或者抛出异常等等。
/**
* @author fmj
* 抽象一个拒接策略,用来处理当线程池的线程都处于busy状态,还有任务提交进来时的处理方式
* @date 2021 2021/9/24 15:06
*/
@FunctionalInterface
public interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
PoolClient
PoolClient
是用来调用线程池的,模拟调用场景
/**
* @author fmj
* 模拟调用自定义线程池
* @date 2021 2021/9/24 15:07
*/
@Slf4j
public class PoolClient {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(2, 3, 1000L, TimeUnit.MILLISECONDS);
ThreadPool rejectPool = new ThreadPool(2, 3, 1000L, TimeUnit.MILLISECONDS, (q, t) -> {
// log.info("1.队列满时一直等待,知道有线程执行完其他任务");
// q.put(t);
// log.info("2.队列满时执行带超时时间的等待");
// q.offer(t, 2000L, TimeUnit.SECONDS);
log.info("3.队列满时,放弃当前任务");
// log.info("4.队列满时,由调用者自己执行");
// t.run();
// log.info("5.队列满时,抛出异常");
// throw new RuntimeException("队列已满");
});
for (int i = 0; i < 10; i++) {
int j = i;
rejectPool.execute(() -> {
try {
Thread.sleep(1000);
log.info("{}", j);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}