自定义线程池

自定义线程池类图
image.pngimage.png

ThreadPool

ThreadPool是核心线程池,线程池在初始化时传入参数,构建任务队列,线程池大小,队列容量,以及队列满时的拒绝策略等。其中Worker是工作线程,用来执行传入的任务,

  1. public class ThreadPool {
  2. /**
  3. * 线程池核心线程数量
  4. */
  5. private int coreSize;
  6. /**
  7. * 提交的任务队列
  8. */
  9. private BlockQueue<Runnable> queue;
  10. /**
  11. * 拒接策略
  12. */
  13. private RejectPolicy<Runnable> rejectPolicy;
  14. /**
  15. * 任务的超时时间
  16. */
  17. private long timeOut;
  18. /**
  19. * 超时时间单位
  20. */
  21. private TimeUnit timeUnit;
  22. /**
  23. * 工作线程集合
  24. */
  25. private HashSet<Worker> workers = new HashSet();
  26. public ThreadPool(int coreSize, int queueSize, long timeOut, TimeUnit timeUnit) {
  27. this.coreSize = coreSize;
  28. this.queue = new BlockQueue<>(queueSize);
  29. this.timeOut = timeOut;
  30. this.timeUnit = timeUnit;
  31. }
  32. public ThreadPool(int coreSize, int queueSize, long timeOut, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy) {
  33. this.coreSize = coreSize;
  34. this.queue = new BlockQueue<>(queueSize);
  35. this.rejectPolicy = rejectPolicy;
  36. this.timeOut = timeOut;
  37. this.timeUnit = timeUnit;
  38. }
  39. /**
  40. * 执行任务
  41. */
  42. public void execute(Runnable task) {
  43. if (workers.size() < coreSize) {
  44. log.debug("线程池空间未满,直接创建线程...");
  45. Worker worker = new Worker(task);
  46. workers.add(worker);
  47. worker.start();
  48. } else {
  49. log.debug("线程池空间已满,提交任务进入队列等待...");
  50. log.info("线程池已满,任务放入队列...");
  51. // queue.put(task);
  52. // queue.offer(task, 500, TimeUnit.MILLISECONDS);
  53. queue.tryPut(rejectPolicy, task);
  54. }
  55. }
  56. }

Worker

Worker是一个工作线程,用来执行传入的任务,执行任务前需要判断任务是否为空,或者从队列中取出一个不为空的任务执行,如果没有任务,则线程结束。

  1. /**
  2. * 工作线程
  3. */
  4. class Worker extends Thread {
  5. /**
  6. * 执行的任务
  7. */
  8. private Runnable task;
  9. public Worker(Runnable task) {
  10. this.task = task;
  11. }
  12. /**
  13. * 执行任务
  14. * 1) 当 task 不为空,执行任务
  15. * 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
  16. */
  17. @Override
  18. public void run() {
  19. // while (null != task || (task = queue.take()) != null) {
  20. while (null != task || (task = queue.poll(500, TimeUnit.MILLISECONDS)) != null) {
  21. try {
  22. log.debug("{}正在执行...", task);
  23. task.run();
  24. } catch (Exception e) {
  25. log.debug("任务执行异常...");
  26. e.printStackTrace();
  27. } finally {
  28. log.debug("{}执行结束...", task);
  29. task = null;
  30. }
  31. }
  32. synchronized (workers) {
  33. log.debug("所有任务都被执行完毕,工作线程结束,workers是共享变量,需要保护");
  34. workers.remove(this);
  35. }
  36. }
  37. }

BlockQueue

BlockQueue是自定义的一个队列,队列中有相应的存和取的方法,用于存放和消费任务,因为对列会被多个线程共享,所以需要对其进行线程安全保护,所以额外加了锁保护,以及等待队列。

  1. public class BlockQueue<T> {
  2. /**
  3. * 存放任务的队列
  4. */
  5. private Deque<T> queue;
  6. /**
  7. * 保证线程安全的锁
  8. */
  9. private ReentrantLock lock = new ReentrantLock();
  10. /**
  11. * 队列已满的等待集合
  12. */
  13. private Condition fullCondition = lock.newCondition();
  14. /**
  15. * 对列空闲的等待集合
  16. */
  17. private Condition emptyCondition = lock.newCondition();
  18. /**
  19. * 队列的最大容量
  20. */
  21. private int capcity;
  22. public BlockQueue(int capcity) {
  23. this.capcity = capcity;
  24. this.queue = new ArrayDeque<>(capcity);
  25. }
  26. /**
  27. * 带超时时间的加入任务,如果在规定时间内未完成队列加入,则丢弃任务
  28. */
  29. public void offer(T t, long timeOut, TimeUnit timeUnit) {
  30. lock.lock();
  31. try {
  32. log.debug("统一时间单位...");
  33. long nanos = timeUnit.toNanos(timeOut);
  34. while (queue.size() == capcity) {
  35. log.debug("队列已满,等待任务被执行后再加入...");
  36. nanos = fullCondition.awaitNanos(nanos);
  37. if (nanos < 0) {
  38. log.debug("超时结束等待,丢弃任务,awaitNanos 返回的是,入如果当前线程被虚假唤醒,则当前线程还应该等多久");
  39. return;
  40. }
  41. }
  42. log.debug("队列还有空间...");
  43. queue.addLast(t);
  44. log.debug("通知消费线程取任务...");
  45. emptyCondition.signalAll();
  46. } catch (InterruptedException e) {
  47. log.debug("阻塞被打断...");
  48. e.printStackTrace();
  49. } finally {
  50. lock.unlock();
  51. }
  52. }
  53. /**
  54. * @throws
  55. * @description 带拒绝策略的put
  56. * @author SongHongWei
  57. * @params
  58. * @updateTime 2021/10/4 17:23
  59. */
  60. public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
  61. lock.lock();
  62. try {
  63. if (queue.size() == capcity) {
  64. log.info("队列已满,等待任务被执行后再加入...");
  65. rejectPolicy.reject(this, task);
  66. } else {
  67. log.debug("队列还有空间...");
  68. queue.addLast(task);
  69. log.debug("通知消费线程取任务...");
  70. emptyCondition.signalAll();
  71. }
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. /**
  77. * 在对列里加入新任务
  78. * 阻塞添加
  79. *
  80. * @param t
  81. */
  82. public void put(T t) {
  83. lock.lock();
  84. try {
  85. //不用if的原因是防止线程被虚假唤醒,即条件不满足时被唤醒
  86. while (queue.size() == capcity) {
  87. log.info("队列已满,等待任务被执行后再加入...");
  88. fullCondition.await();
  89. }
  90. log.debug("队列还有空间...");
  91. queue.addLast(t);
  92. log.debug("通知消费线程取任务...");
  93. emptyCondition.signalAll();
  94. } catch (InterruptedException e) {
  95. log.debug("阻塞被打断...");
  96. e.printStackTrace();
  97. } finally {
  98. lock.unlock();
  99. }
  100. }
  101. /**
  102. * 获取任务,需要考虑线程安全
  103. * 阻塞获取
  104. *
  105. * @return
  106. */
  107. public T take() {
  108. lock.lock();
  109. try {
  110. while (queue.isEmpty()) {
  111. log.info("队列已空,等待任务加入...");
  112. try {
  113. emptyCondition.await();
  114. } catch (InterruptedException e) {
  115. log.debug("阻塞被打断...");
  116. e.printStackTrace();
  117. }
  118. }
  119. T first = queue.removeFirst();
  120. log.debug("唤醒因队列满了导致在阻塞中的put线程");
  121. fullCondition.signalAll();
  122. return first;
  123. } finally {
  124. lock.unlock();
  125. }
  126. }
  127. /**
  128. * 带超时时间的任务获取
  129. *
  130. * @param timeOut
  131. * @param timeUnit
  132. * @return
  133. */
  134. public T poll(long timeOut, TimeUnit timeUnit) {
  135. lock.lock();
  136. try {
  137. long nanos = timeUnit.toNanos(timeOut);
  138. while (queue.isEmpty()) {
  139. log.info("队列已空,等待任务加入...");
  140. try {
  141. nanos = emptyCondition.awaitNanos(nanos);
  142. if (nanos < 0) {
  143. log.debug("超时结束等待,丢弃任务,awaitNanos 返回的是,入如果当前线程被虚假唤醒,则当前线程还应该等多久");
  144. return null;
  145. }
  146. } catch (InterruptedException e) {
  147. log.debug("阻塞被打断...");
  148. e.printStackTrace();
  149. }
  150. }
  151. T first = queue.removeFirst();
  152. log.debug("唤醒因队列满了导致在阻塞中的put线程");
  153. fullCondition.signalAll();
  154. return first;
  155. } finally {
  156. lock.unlock();
  157. }
  158. }
  159. }

RejectPolicy

RejectPolicy是一个接口函数,用来指定当队列满的时候,采用什么样的方式任务,例如,继续等待直到队列有任务被消费了,或者带超时时间的等待,或者放弃任务,或者抛出异常等等。

  1. /**
  2. * @author fmj
  3. * 抽象一个拒接策略,用来处理当线程池的线程都处于busy状态,还有任务提交进来时的处理方式
  4. * @date 2021 2021/9/24 15:06
  5. */
  6. @FunctionalInterface
  7. public interface RejectPolicy<T> {
  8. void reject(BlockQueue<T> queue, T task);
  9. }

PoolClient

PoolClient是用来调用线程池的,模拟调用场景

  1. /**
  2. * @author fmj
  3. * 模拟调用自定义线程池
  4. * @date 2021 2021/9/24 15:07
  5. */
  6. @Slf4j
  7. public class PoolClient {
  8. public static void main(String[] args) {
  9. ThreadPool pool = new ThreadPool(2, 3, 1000L, TimeUnit.MILLISECONDS);
  10. ThreadPool rejectPool = new ThreadPool(2, 3, 1000L, TimeUnit.MILLISECONDS, (q, t) -> {
  11. // log.info("1.队列满时一直等待,知道有线程执行完其他任务");
  12. // q.put(t);
  13. // log.info("2.队列满时执行带超时时间的等待");
  14. // q.offer(t, 2000L, TimeUnit.SECONDS);
  15. log.info("3.队列满时,放弃当前任务");
  16. // log.info("4.队列满时,由调用者自己执行");
  17. // t.run();
  18. // log.info("5.队列满时,抛出异常");
  19. // throw new RuntimeException("队列已满");
  20. });
  21. for (int i = 0; i < 10; i++) {
  22. int j = i;
  23. rejectPool.execute(() -> {
  24. try {
  25. Thread.sleep(1000);
  26. log.info("{}", j);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. });
  31. }
  32. }
  33. }