来自《Java并发编程的艺术》

    一个简易的线程池的实现。三个类:
    ThreadPool线程池接口
    DefaultThreadPool线程池接口实现
    Worker工作线程

    线程池的工作逻辑:
    简易线程池 - 图1

    全部代码

    1. package ConcurrencyArt;
    2. import java.util.ArrayList;
    3. import java.util.Collections;
    4. import java.util.LinkedList;
    5. import java.util.List;
    6. import java.util.concurrent.atomic.AtomicLong;
    7. interface ThreadPool<Job extends Runnable> {
    8. void execute(Job job);//执行一个Job,这个Job需要实现Runnable
    9. void shutdown();//关闭线程池
    10. void addWorkers(int m);//增加工作者线程
    11. void removeWorkers(int m);//减少工作者线程
    12. int getJobSize();//得到正在等待执行的任务数量
    13. }
    14. public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    15. private static final int MAX_WORKER_NUMBERS = 10;//线程池最大限制数
    16. private static final int DEFAULT_WORKER_NUMBERS = 5;//线程池默认的数量
    17. private static final int MIN_WORKER_NUMBERS = 1;//线程池最小的数量
    18. private final LinkedList<Job> jobs = new LinkedList<>();//工作列表
    19. private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());//工作者列表
    20. private int workerNum = DEFAULT_WORKER_NUMBERS;//工作者线程的数量
    21. private AtomicLong threadNum = new AtomicLong();//线程编号生成
    22. public DefaultThreadPool() {
    23. initializeWorkers(DEFAULT_WORKER_NUMBERS);
    24. }
    25. public DefaultThreadPool(int num) {
    26. workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
    27. initializeWorkers(workerNum);
    28. }
    29. //初始化线程工作者
    30. private void initializeWorkers(int num) {
    31. for (int i = 0; i < num; i++) {
    32. Worker worker = new Worker();
    33. workers.add(worker);
    34. Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
    35. thread.start();
    36. }
    37. }
    38. @Override
    39. public void execute(Job job) {
    40. if (job != null) {
    41. synchronized (jobs) {
    42. jobs.addLast(job);
    43. jobs.notify();//在worker的run方法中的while循环中,调用了jobs.wait();
    44. }
    45. }
    46. }
    47. @Override
    48. public void shutdown() {
    49. for (Worker worker : workers) {
    50. worker.shutdown();
    51. }
    52. }
    53. @Override
    54. public void addWorkers(int m) {
    55. synchronized (jobs) {
    56. if (m + this.workerNum > MAX_WORKER_NUMBERS) {
    57. m = MAX_WORKER_NUMBERS - this.workerNum;
    58. }
    59. initializeWorkers(m);
    60. this.workerNum += m;
    61. }
    62. }
    63. @Override
    64. public void removeWorkers(int m) {
    65. synchronized (jobs) {
    66. if (m >= this.workerNum) {
    67. throw new IllegalArgumentException("beyond workNum");
    68. }
    69. int count = 0;
    70. while (count < m) {
    71. Worker worker = workers.get(count);
    72. if (workers.remove(worker)) {
    73. worker.shutdown();
    74. count++;
    75. }
    76. }
    77. this.workerNum -= count;
    78. }
    79. }
    80. @Override
    81. public int getJobSize() {
    82. return jobs.size();
    83. }
    84. //工作者,负责消费任务。
    85. class Worker implements Runnable {
    86. private volatile boolean running = true;//在这里通过volatile boolean而不是interrupt方法来安全地中断线程
    87. @Override
    88. public void run() {
    89. while (running) {
    90. Job job = null;
    91. synchronized (jobs) {
    92. while (jobs.isEmpty()) {//当工作队列为空时,所有的工作者线程均等待在工作队列上。
    93. try {
    94. jobs.wait();
    95. } catch (InterruptedException e) {
    96. Thread.currentThread().interrupt();
    97. return;//感知到外部对工作者线程的中断操作,返回。
    98. }
    99. }
    100. job = jobs.removeFirst();
    101. }
    102. if (job != null) {
    103. try {
    104. //job,作为一个Runnable的实现类,在这里并未将其变成线程并start(),
    105. // 而是将其的run方法放在工作者线程中执行,这样真正的多线程是指工作者线程,
    106. //而提交进来的Job只是作为一个应该异步执行的任务。
    107. job.run();
    108. } catch (Exception ex) {
    109. //忽略job执行中的exception
    110. }
    111. }
    112. }
    113. }
    114. public void shutdown() {
    115. running = false;
    116. }
    117. }
    118. }