1. package com.lms.jdk8.juc;
    2. import lombok.extern.slf4j.Slf4j;
    3. import java.util.ArrayDeque;
    4. import java.util.Deque;
    5. import java.util.HashSet;
    6. import java.util.concurrent.TimeUnit;
    7. import java.util.concurrent.locks.Condition;
    8. import java.util.concurrent.locks.ReentrantLock;
    9. /**
    10. * @Author: 李孟帅
    11. * @Date: 2021-12-16 10:33
    12. * @Description:
    13. */
    14. @Slf4j
    15. public class ThreadPool {
    16. // 任务队列
    17. private BlockQueue<Runnable> taskQueue;
    18. // 线程集合
    19. private final HashSet<Worker> workers = new HashSet<>();
    20. // 核心线程数
    21. private int coreSize;
    22. // 获取任务的超时时间
    23. private long timeout;
    24. private TimeUnit timeUnit;
    25. // 拒绝策略
    26. private RejectPolicy<Runnable> rejectPolicy;
    27. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, BlockQueue<Runnable> taskQueue, RejectPolicy<Runnable> rejectPolicy) {
    28. this.coreSize = coreSize;
    29. this.timeout = timeout;
    30. this.timeUnit = timeUnit;
    31. this.taskQueue = taskQueue;
    32. this.rejectPolicy = rejectPolicy;
    33. }
    34. // 执行任务
    35. public void execute(Runnable task) {
    36. synchronized (workers) {
    37. // 任务数没有超过coreSize时,直接交给worker执行,否则加入队列暂存
    38. if (workers.size() < coreSize) {
    39. Worker worker = new Worker(task);
    40. log.info("创建worker {},task:{}", worker, task);
    41. workers.add(worker);
    42. worker.start();
    43. } else {
    44. // 交给拒绝策略执行
    45. taskQueue.tryPut(rejectPolicy, task);
    46. }
    47. }
    48. }
    49. class Worker extends Thread {
    50. private Runnable task;
    51. public Worker(Runnable task) {
    52. this.task = task;
    53. }
    54. @Override
    55. public void run() {
    56. // 执行任务
    57. // task不为null,执行任务
    58. // task为null,在从任务队列中获取任务并执行
    59. while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
    60. try {
    61. log.info("正在执行...{}", task);
    62. task.run();
    63. } catch (Exception e) {
    64. e.printStackTrace();
    65. } finally {
    66. task = null;
    67. }
    68. }
    69. synchronized (workers) {
    70. log.debug("worker 被移除{}", this);
    71. workers.remove(this);
    72. }
    73. }
    74. }
    75. }
    76. @Slf4j
    77. class BlockQueue<T> {
    78. // 存放任务队列
    79. private Deque<T> queue = new ArrayDeque<>();
    80. // 容量
    81. private int capacity;
    82. // 锁
    83. private ReentrantLock lock = new ReentrantLock();
    84. // 生产者条件变量
    85. private Condition fullCWait = lock.newCondition();
    86. // 消费者条件变量
    87. private Condition emptyWait = lock.newCondition();
    88. public BlockQueue(int capacity) {
    89. this.capacity = capacity;
    90. }
    91. // 带超时阻塞获取
    92. public T poll(long timeout, TimeUnit unit) {
    93. lock.lock();
    94. try {
    95. long nanos = unit.toNanos(timeout);
    96. while (queue.isEmpty()) {
    97. try {
    98. if (nanos <= 0) {
    99. return null;
    100. }
    101. // 返回值是剩余时间
    102. nanos = emptyWait.awaitNanos(nanos);
    103. } catch (InterruptedException e) {
    104. e.printStackTrace();
    105. }
    106. }
    107. T t = queue.removeFirst();
    108. fullCWait.signal();
    109. return t;
    110. } finally {
    111. lock.unlock();
    112. }
    113. }
    114. // 阻塞获取
    115. public T take() {
    116. lock.lock();
    117. try {
    118. while (queue.isEmpty()) {
    119. try {
    120. emptyWait.await();
    121. } catch (InterruptedException e) {
    122. e.printStackTrace();
    123. }
    124. }
    125. T t = queue.removeFirst();
    126. fullCWait.signal();
    127. return t;
    128. } finally {
    129. lock.unlock();
    130. }
    131. }
    132. // 阻塞添加
    133. public void put(T element) {
    134. lock.lock();
    135. try {
    136. while (queue.size() == capacity) {
    137. try {
    138. log.info("等待加入任务队列 {}...", element);
    139. fullCWait.await();
    140. } catch (InterruptedException e) {
    141. e.printStackTrace();
    142. }
    143. }
    144. log.info("加入任务队列 {}", element);
    145. queue.addLast(element);
    146. emptyWait.signal();
    147. } finally {
    148. lock.unlock();
    149. }
    150. }
    151. // 带超时时间阻塞添加
    152. public boolean offer(T element, long timeout, TimeUnit unit) {
    153. lock.lock();
    154. try {
    155. long nanos = unit.toNanos(timeout);
    156. while (queue.size() == capacity) {
    157. try {
    158. if (nanos <= 0) {
    159. log.info("等待加入任务队列超时 {}...", element);
    160. return false;
    161. }
    162. nanos = fullCWait.awaitNanos(nanos);
    163. } catch (InterruptedException e) {
    164. e.printStackTrace();
    165. }
    166. }
    167. log.info("加入任务队列 {}", element);
    168. queue.addLast(element);
    169. emptyWait.signal();
    170. return true;
    171. } finally {
    172. lock.unlock();
    173. }
    174. }
    175. // 队列长度
    176. public int size() {
    177. lock.lock();
    178. try {
    179. return queue.size();
    180. } finally {
    181. lock.unlock();
    182. }
    183. }
    184. // 尝试加入队列,失败时 拒绝策略
    185. public void tryPut(RejectPolicy<T> rejectPolicy, T element) {
    186. lock.lock();
    187. try {
    188. // 队列是否已满
    189. if (queue.size() == capacity) {
    190. rejectPolicy.reject(this, element);
    191. } else { //有空闲 加入队列
    192. log.info("加入任务队列 {}", element);
    193. queue.addLast(element);
    194. emptyWait.signal();
    195. }
    196. } finally {
    197. lock.unlock();
    198. }
    199. }
    200. }
    201. @FunctionalInterface
    202. interface RejectPolicy<T> {
    203. void reject(BlockQueue<T> queue, T element);
    204. }
    205. @Slf4j
    206. class Test {
    207. public static void main(String[] args) {
    208. ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, new BlockQueue<>(1), (queue, element) -> {
    209. // 死等
    210. // queue.put(element);
    211. // 超时等待
    212. // queue.offer(element,500,TimeUnit.MILLISECONDS);
    213. // 让调用者自己放弃
    214. // log.info("放弃{}", element);
    215. // 让调用者抛出异常
    216. // throw new RuntimeException("任务执行失败:"+element);
    217. // 让调用者自己执行任务
    218. element.run();
    219. });
    220. for (int i = 0; i < 5; i++) {
    221. int finalI = i;
    222. threadPool.execute(() -> {
    223. try {
    224. Thread.sleep(1000);
    225. log.info("run :{}", finalI);
    226. } catch (InterruptedException e) {
    227. e.printStackTrace();
    228. }
    229. });
    230. }
    231. }
    232. }