简单线程池

  1. 固定线程数,没有动态调整线程数(maximunPoolSize)
  2. 使用java的LinkedBlockingQueue,默认资源耗尽为止,无拒绝策略

线程池类:MyThreadPoolCopy

  1. public class MyThreadPoolCopy {
  2. BlockingQueue<Task> blockingQueue;
  3. List<MyThreadCopy> myThreadCopyList;
  4. public MyThreadPoolCopy(int poolSize, int queueSize) {
  5. blockingQueue = new LinkedBlockingDeque<>(queueSize);
  6. myThreadCopyList = new ArrayList<>(poolSize);
  7. for (int i = 0; i < poolSize; i++) {
  8. MyThreadCopy myThreadCopy = new MyThreadCopy();
  9. myThreadCopy.start();
  10. myThreadCopyList.add(myThreadCopy);
  11. }
  12. }
  13. public void submit(Task task) throws InterruptedException {
  14. blockingQueue.put(task);
  15. }
  16. class MyThreadCopy extends Thread {
  17. @Override
  18. public void run() {
  19. while (true) {
  20. try {
  21. Task take = blockingQueue.take();
  22. take.executeJob();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }
  29. }

任务类:Task

  1. public class Task {
  2. String value;
  3. public Task(String value) {
  4. this.value = value;
  5. }
  6. public void executeJob() {
  7. System.out.println("value:" + value + ", Thread:" + Thread.currentThread().getName());
  8. }
  9. }

调用方式

  1. public static void main(String[] args) throws InterruptedException {
  2. MyThreadPoolCopy copy = new MyThreadPoolCopy(2, 10);
  3. Task task1 = new Task("AAA");
  4. Task task2 = new Task("BBB");
  5. Task task3 = new Task("CCC");
  6. Task task4 = new Task("DDD");
  7. Task task5 = new Task("EEE");
  8. Task task6 = new Task("FFF");
  9. copy.submit(task1);
  10. copy.submit(task2);
  11. copy.submit(task3);
  12. copy.submit(task4);
  13. copy.submit(task5);
  14. copy.submit(task6);
  15. }

优化

  1. 自己实现任务队列
  2. 自己实现拒绝策略
  3. 支持线程数的动态调整
  4. 对外提供安全的线程关闭接口

https://zhuanlan.zhihu.com/p/69294791
https://www.cnblogs.com/niuyourou/p/12494982.html

  1. 需要有个监控线程池,不断循环维护activeCount和queueSize平衡,新增如下方法

    1. while (!this.isShutDown && !Thread.currentThread().isInterrupted()) {
    2. try {
    3. timeUnit.sleep(10);
    4. } catch (InterruptedException e) {
    5. this.isShutDown = true;
    6. break;
    7. }
    8. synchronized (this) {
    9. if (this.isShutDown) {
    10. //DCL检查
    11. break;
    12. }
    13. //任务超过当前线程数两倍,线程池马力全开
    14. System.out.println(taskQueue.getSize() + " ===============================");
    15. if (taskQueue.getSize() >= activeCount * 2 && activeCount < maxSize) {
    16. int beforeActive = activeCount;
    17. for (int i = 0; i < maxSize; i++) {
    18. addThread();
    19. }
    20. System.out.println("add : active ->" + beforeActive + "---->" + activeCount);
    21. continue;
    22. }
    23. if (taskQueue.getSize() > 0 && activeCount < coreSize) {
    24. for (int i = 0; i < coreSize; i++) {
    25. addThread();
    26. }
    27. System.out.println();
    28. continue;
    29. }
    30. if (taskQueue.getSize() == 0 && activeCount > coreSize) {
    31. for (int i = coreSize; i < activeCount(); i++) {
    32. removeThread();
    33. System.out.println("remove : active-> " + activeCount());
    34. }
    35. }
    36. }
    37. }
  2. 对外优雅关闭需要设置shutdown标志位以及interupter()中断,如上面的MyThreadCopy可以改成如下写法 ```java … private volatile boolean isRunning = true;

@Override public void run() { while (isRunning && !this.isInterrupted()) { try { Runnable runnable = queue.getTask(); runnable.run(); //即executeJob() } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + “ 接收到interrupted中断信号”); e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + “ 终止执行!”); }

//关闭线程 public void shutDown() { this.isRunning = false; this.interrupt(); } ```

扩展

  1. 看ThreadPoolExecutor源码
  2. 引申出AQS源码