1. public class TestPool {
    2. public static void main(String[] args) {
    3. ThreadPool threadPool = new ThreadPool(1,
    4. 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
    5. // 1. 死等
    6. // queue.put(task);
    7. // 2) 带超时等待
    8. // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
    9. // 3) 让调用者放弃任务执行
    10. // log.debug("放弃{}", task);
    11. // 4) 让调用者抛出异常
    12. // throw new RuntimeException("任务执行失败 " + task);
    13. // 5) 让调用者自己执行任务
    14. task.run();
    15. });
    16. for (int i = 0; i < 4; i++) {
    17. int j = i;
    18. threadPool.execute(() -> {
    19. try {
    20. Thread.sleep(1000L);
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. log.debug("{}", j);
    25. });
    26. }
    27. }
    28. }
    29. @FunctionalInterface // 拒绝策略
    30. interface RejectPolicy<T> {
    31. void reject(BlockingQueue<T> queue, T task);
    32. }
    33. @Slf4j(topic = "c.ThreadPool")
    34. class ThreadPool {
    35. // 任务队列
    36. private BlockingQueue<Runnable> taskQueue;
    37. // 线程集合
    38. private HashSet<Worker> workers = new HashSet<>();
    39. // 核心线程数
    40. private int coreSize;
    41. // 获取任务时的超时时间
    42. private long timeout;
    43. private TimeUnit timeUnit;
    44. private RejectPolicy<Runnable> rejectPolicy;
    45. // 执行任务
    46. public void execute(Runnable task) {
    47. // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
    48. // 如果任务数超过 coreSize 时,加入任务队列暂存
    49. synchronized (workers) {
    50. if(workers.size() < coreSize) {
    51. Worker worker = new Worker(task);
    52. log.debug("新增 worker{}, {}", worker, task);
    53. workers.add(worker);
    54. worker.start();
    55. } else {
    56. // taskQueue.put(task);
    57. // 1) 死等
    58. // 2) 带超时等待
    59. // 3) 让调用者放弃任务执行
    60. // 4) 让调用者抛出异常
    61. // 5) 让调用者自己执行任务
    62. taskQueue.tryPut(rejectPolicy, task);
    63. }
    64. }
    65. }
    66. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
    67. this.coreSize = coreSize;
    68. this.timeout = timeout;
    69. this.timeUnit = timeUnit;
    70. this.taskQueue = new BlockingQueue<>(queueCapcity);
    71. this.rejectPolicy = rejectPolicy;
    72. }
    73. class Worker extends Thread{
    74. private Runnable task;
    75. public Worker(Runnable task) {
    76. this.task = task;
    77. }
    78. @Override
    79. public void run() {
    80. // 执行任务
    81. // 1) 当 task 不为空,执行任务
    82. // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
    83. // while(task != null || (task = taskQueue.take()) != null) {
    84. while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
    85. try {
    86. log.debug("正在执行...{}", task);
    87. task.run();
    88. } catch (Exception e) {
    89. e.printStackTrace();
    90. } finally {
    91. task = null;
    92. }
    93. }
    94. synchronized (workers) {
    95. log.debug("worker 被移除{}", this);
    96. workers.remove(this);
    97. }
    98. }
    99. }
    100. }
    101. @Slf4j(topic = "c.BlockingQueue")
    102. class BlockingQueue<T> {
    103. // 1. 任务队列
    104. private Deque<T> queue = new ArrayDeque<>();
    105. // 2. 锁
    106. private ReentrantLock lock = new ReentrantLock();
    107. // 3. 生产者条件变量
    108. private Condition fullWaitSet = lock.newCondition();
    109. // 4. 消费者条件变量
    110. private Condition emptyWaitSet = lock.newCondition();
    111. // 5. 容量
    112. private int capcity;
    113. public BlockingQueue(int capcity) {
    114. this.capcity = capcity;
    115. }
    116. // 带超时阻塞获取
    117. public T poll(long timeout, TimeUnit unit) {
    118. lock.lock();
    119. try {
    120. // 将 timeout 统一转换为 纳秒
    121. long nanos = unit.toNanos(timeout);
    122. while (queue.isEmpty()) {
    123. try {
    124. // 返回值是剩余时间
    125. if (nanos <= 0) {
    126. return null;
    127. }
    128. nanos = emptyWaitSet.awaitNanos(nanos);
    129. } catch (InterruptedException e) {
    130. e.printStackTrace();
    131. }
    132. }
    133. T t = queue.removeFirst();
    134. fullWaitSet.signal();
    135. return t;
    136. } finally {
    137. lock.unlock();
    138. }
    139. }
    140. // 阻塞获取
    141. public T take() {
    142. lock.lock();
    143. try {
    144. while (queue.isEmpty()) {
    145. try {
    146. emptyWaitSet.await();
    147. } catch (InterruptedException e) {
    148. e.printStackTrace();
    149. }
    150. }
    151. T t = queue.removeFirst();
    152. fullWaitSet.signal();
    153. return t;
    154. } finally {
    155. lock.unlock();
    156. }
    157. }
    158. // 阻塞添加
    159. public void put(T task) {
    160. lock.lock();
    161. try {
    162. while (queue.size() == capcity) {
    163. try {
    164. log.debug("等待加入任务队列 {} ...", task);
    165. fullWaitSet.await();
    166. } catch (InterruptedException e) {
    167. e.printStackTrace();
    168. }
    169. }
    170. log.debug("加入任务队列 {}", task);
    171. queue.addLast(task);
    172. emptyWaitSet.signal();
    173. } finally {
    174. lock.unlock();
    175. }
    176. }
    177. // 带超时时间阻塞添加
    178. public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    179. lock.lock();
    180. try {
    181. long nanos = timeUnit.toNanos(timeout);
    182. while (queue.size() == capcity) {
    183. try {
    184. if(nanos <= 0) {
    185. return false;
    186. }
    187. log.debug("等待加入任务队列 {} ...", task);
    188. nanos = fullWaitSet.awaitNanos(nanos);
    189. } catch (InterruptedException e) {
    190. e.printStackTrace();
    191. }
    192. }
    193. log.debug("加入任务队列 {}", task);
    194. queue.addLast(task);
    195. emptyWaitSet.signal();
    196. return true;
    197. } finally {
    198. lock.unlock();
    199. }
    200. }
    201. public int size() {
    202. lock.lock();
    203. try {
    204. return queue.size();
    205. } finally {
    206. lock.unlock();
    207. }
    208. }
    209. public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    210. lock.lock();
    211. try {
    212. // 判断队列是否满
    213. if(queue.size() == capcity) {
    214. rejectPolicy.reject(this, task);
    215. } else { // 有空闲
    216. log.debug("加入任务队列 {}", task);
    217. queue.addLast(task);
    218. emptyWaitSet.signal();
    219. }
    220. } finally {
    221. lock.unlock();
    222. }
    223. }
    224. }