package com.lms.jdk8.juc;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author: 李孟帅
* @Date: 2021-12-16 10:33
* @Description:
*/
@Slf4j
public class ThreadPool {
// 任务队列
private BlockQueue<Runnable> taskQueue;
// 线程集合
private final HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, BlockQueue<Runnable> taskQueue, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = taskQueue;
this.rejectPolicy = rejectPolicy;
}
// 执行任务
public void execute(Runnable task) {
synchronized (workers) {
// 任务数没有超过coreSize时,直接交给worker执行,否则加入队列暂存
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.info("创建worker {},task:{}", worker, task);
workers.add(worker);
worker.start();
} else {
// 交给拒绝策略执行
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// task不为null,执行任务
// task为null,在从任务队列中获取任务并执行
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.info("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
@Slf4j
class BlockQueue<T> {
// 存放任务队列
private Deque<T> queue = new ArrayDeque<>();
// 容量
private int capacity;
// 锁
private ReentrantLock lock = new ReentrantLock();
// 生产者条件变量
private Condition fullCWait = lock.newCondition();
// 消费者条件变量
private Condition emptyWait = lock.newCondition();
public BlockQueue(int capacity) {
this.capacity = capacity;
}
// 带超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
// 返回值是剩余时间
nanos = emptyWait.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullCWait.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWait.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullCWait.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.info("等待加入任务队列 {}...", element);
fullCWait.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列 {}", element);
queue.addLast(element);
emptyWait.signal();
} finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean offer(T element, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
log.info("等待加入任务队列超时 {}...", element);
return false;
}
nanos = fullCWait.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列 {}", element);
queue.addLast(element);
emptyWait.signal();
return true;
} finally {
lock.unlock();
}
}
// 队列长度
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
// 尝试加入队列,失败时 拒绝策略
public void tryPut(RejectPolicy<T> rejectPolicy, T element) {
lock.lock();
try {
// 队列是否已满
if (queue.size() == capacity) {
rejectPolicy.reject(this, element);
} else { //有空闲 加入队列
log.info("加入任务队列 {}", element);
queue.addLast(element);
emptyWait.signal();
}
} finally {
lock.unlock();
}
}
}
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T element);
}
@Slf4j
class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, new BlockQueue<>(1), (queue, element) -> {
// 死等
// queue.put(element);
// 超时等待
// queue.offer(element,500,TimeUnit.MILLISECONDS);
// 让调用者自己放弃
// log.info("放弃{}", element);
// 让调用者抛出异常
// throw new RuntimeException("任务执行失败:"+element);
// 让调用者自己执行任务
element.run();
});
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000);
log.info("run :{}", finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}