当有任务出现,如果每个任务均需创建一个新线程去处理任务,会过度浪费系统资源。线程池的做法则是:将线程存储起来,重复利用线程去处理任务。

image.png
整体是生产者-消费者结构,左侧Thread Pool存放可重用线程,中间的Blocking Queue用来平衡速度差异。Thread Pool消费tasks,main生产tasks.
如果tasks过多,线程处理不过来,Blocking Queue用来存储tasks,如果tasks过少,线程池中的消费者线程则需要在Blocking Queue中等待.

阻塞队列实现:

  1. class BlockingQueue<T>{
  2. // 1.任务队列
  3. private Deque<T> queue = new ArrayDeque<>();
  4. // 2.
  5. private ReentrantLock lock = new ReentrantLock();
  6. // 3.生产者条件变量
  7. private Condition fuulWaitSet = lock.newCondition();
  8. // 4.消费者条件变量
  9. private Condition emptyWaitSet = lock.newCondition();
  10. // 5.容量
  11. private int capcity;
  12. public BlockingQueue(int capcity) {
  13. this.capcity = capcity;
  14. }
  15. //带超时的阻塞获取
  16. public T poll(long timeout, TimeUnit unit){
  17. lock.lock();
  18. try {
  19. // timeout 统一转换为纳秒
  20. long nanos = unit.toNanos(timeout);
  21. while(queue.isEmpty()){
  22. try {
  23. if(nanos<=0){
  24. return null;
  25. }
  26. emptyWaitSet.awaitNanos(nanos);//能够自动解决虚假唤醒问题,返回的是剩余时间
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. //获取队列头元素并返回
  32. T t =queue.removeFirst();
  33. fuulWaitSet.signal();
  34. return t;
  35. }finally {
  36. lock.unlock();
  37. }
  38. }
  39. // 阻塞获取
  40. public T take(){
  41. lock.lock();
  42. try {
  43. while(queue.isEmpty()){
  44. try {
  45. emptyWaitSet.await();
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. //获取队列头元素并返回
  51. T t =queue.removeFirst();
  52. fuulWaitSet.signal();
  53. return t;
  54. }finally {
  55. lock.unlock();
  56. }
  57. }
  58. //阻塞添加
  59. public void put(T element){
  60. lock.lock();
  61. try{
  62. while(queue.size()==capcity){
  63. try {
  64. fuulWaitSet.await();
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. queue.addLast(element);
  70. emptyWaitSet.signal();
  71. }finally {
  72. lock.unlock();
  73. }
  74. }
  75. //获取大小,当然这里可以不加锁,完全只读
  76. public int size(){
  77. lock.lock();
  78. try {
  79. return queue.size();
  80. }finally {
  81. lock.unlock();
  82. }
  83. }
  84. }

阻塞队列的实现:ReentrantLock锁+两种情况的wait-notify实现(队列空、队列满)

线程池实现(无超时时间):

  1. class ThreadPool{
  2. //线程池中需要用到阻塞队列(任务队列)
  3. private BlockingQueue<Runnable> taskQueue;
  4. // 线程集合,如果是Thread对象所包含的信息有限,所以将线程类包装成Worker
  5. private HashSet<Worker> workers = new HashSet<>();
  6. // 核心线程数
  7. private int coreSize;
  8. // 获取任务的超时时间,如果线程限定时间内获取不到任务,释放线程
  9. private long timeout;
  10. private TimeUnit timeUnit;
  11. // 执行任务
  12. public void execute(Runnable task){
  13. //当任务数没有超过 coreSize时,直接交给 worker对象执行
  14. //如果任务数超过 coreSize时,加入任务队列暂存。
  15. synchronized (workers){
  16. //线程数小于任务
  17. if (workers.size() < coreSize){
  18. Worker worker = new Worker(task);
  19. workers.add(worker);
  20. worker.start();
  21. }else {
  22. taskQueue.put(task);
  23. }
  24. }
  25. }
  26. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {
  27. this.coreSize = coreSize;
  28. this.timeout = timeout;
  29. this.timeUnit = timeUnit;
  30. this.taskQueue = new BlockingQueue<>(queueCapacity);
  31. }
  32. class Worker extends Thread{
  33. private Runnable task;
  34. public Worker(Runnable task){
  35. this.task = task;
  36. }
  37. @Override
  38. public void run() {
  39. //执行任务
  40. // 1 task不为空,执行任务
  41. // 2 task执行完毕,再接着从任务队列获取任务并执行
  42. while(task!=null||(task = taskQueue.take())!=null)){
  43. try {
  44. task.run();
  45. }catch (Exception e){
  46. }
  47. finally {
  48. task = null;
  49. }
  50. }
  51. synchronized (workers){
  52. workers.remove(this);
  53. }
  54. }
  55. }
  56. }

本段代码逻辑稍微复杂些,ThreadPool中的 coreSize 表示线程池能承受的最大线程数。

这段代码实现的逻辑是:线程池中每个线程处理一个任务,传进task任务后,如果线程池当前线程数还没到满,则新创建线程并执行该任务,如果线程池已经满,则没有线程可以处理该任务,则将该任务存入任务队列中。

  1. public void execute(Runnable task){
  2. //当任务数没有超过 coreSize时,直接交给 worker对象执行
  3. //如果任务数超过 coreSize时,加入任务队列暂存。
  4. synchronized (workers){
  5. //线程数小于任务
  6. if (workers.size() < coreSize){
  7. Worker worker = new Worker(task);
  8. workers.add(worker);
  9. worker.start();
  10. }else {
  11. taskQueue.put(task);
  12. }
  13. }
  14. }

Worker implements Thread,所以Worker类是线程类,所以其要重写run方法,run方法要根据构造方法中传入的task来判断此任务是否可以执行。
注意run方法处于等待任务状态,如果新传进来的任务已经执行完毕,则其会去阻塞队列中查找任务执行,如果阻塞队列中已经没有任务可以执行,那么就会陷入【WAITING】状态,take()的实现细节:如果队列为空,则一直等待下去。
这种思路的弊端是:线程如果没有任务可执行,会陷入空等,下边的销毁线程代码实际没有起到作用
注意taskQueue.task()方法的线程安全问题,已经在BlockingQueue类中的解决。

  1. @Override
  2. public void run() {
  3. //执行任务
  4. // 1 task不为空,执行任务
  5. // 2 task执行完毕,再接着从任务队列获取任务并执行
  6. while(task!=null||(task = taskQueue.take())!=null)){
  7. try {
  8. task.run();
  9. }catch (Exception e){
  10. }
  11. finally {
  12. task = null;
  13. }
  14. }
  15. synchronized (workers){
  16. workers.remove(this);
  17. }
  18. }


线程池实现(有超时时间):

  1. class Worker extends Thread{
  2. private Runnable task;
  3. public Worker(Runnable task){
  4. this.task = task;
  5. }
  6. @Override
  7. public void run() {
  8. //执行任务
  9. // 1 task不为空,执行任务
  10. // 2 task执行完毕,再接着从任务队列获取任务并执行
  11. while(task!=null||(task = taskQueue.poll(timeout,timeUnit))!=null)){
  12. try {
  13. task.run();
  14. }catch (Exception e){
  15. }
  16. finally {
  17. task = null;
  18. }
  19. }
  20. synchronized (workers){
  21. workers.remove(this);
  22. }
  23. }
  24. }

Worker类中run方法实现细节改变,taskQueue.take()改为taskQueue.poll(),如果超时等待不到任务,taskQueue.poll()返回null值,run方法内的while循环结束,线程经过workers.remove()方法得以销毁。

阻塞队列满-拒绝策略:

如果任务数过多,阻塞队列中存放不下,主线程会陷入等待状态,应该使得主线程有选择空间。

  1. public class TestThreadPool {
  2. public static void main(String[] args) {
  3. //由主线程向线程池中提交任务
  4. ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);
  5. for (int i=0;i<15;i++){
  6. int j =i;
  7. threadPool.execute(()->{
  8. System.out.println(j);
  9. });
  10. }
  11. }
  12. }

向阻塞队列中输入任务的代码改为超时限制的添加:

  1. //带超时时间的阻塞添加
  2. public boolean offer(T task, long timeout, TimeUnit timeUnit){
  3. lock.lock();
  4. try {
  5. long nanos = timeUnit.toNanos(timeout);
  6. while (queue.size()==capcity){
  7. try {
  8. if(nanos<=0){
  9. return false;
  10. }
  11. nanos = fuulWaitSet.awaitNanos(nanos);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. queue.addLast(task);
  17. emptyWaitSet.signal();
  18. return true;
  19. }finally {
  20. lock.unlock();
  21. }
  22. }

向阻塞队列中输入任务有多种实现策略,如下:死等(最原始)、超时等待、调用者放弃任务的执行、调用者抛出异常、调用者自己执行任务

  1. public void execute(Runnable task){
  2. //当任务数没有超过 coreSize时,直接交给 worker对象执行
  3. //如果任务数超过 coreSize时,加入任务队列缓存。
  4. synchronized (workers){
  5. //线程数小于任务
  6. if (workers.size() < coreSize){
  7. Worker worker = new Worker(task);
  8. workers.add(worker);
  9. worker.start();
  10. }else {
  11. taskQueue.put(task);
  12. // 1.死等
  13. // 2.带超时等待
  14. // 3.让调用者放弃任务执行
  15. // 4.让调用者抛出异常
  16. // 5.让调用者自己执行任务
  17. }
  18. }
  19. }

如果将五种代码逻辑均写到execute方法中(多个if-else),明显代码灵活度不够,解决办法:策略模式,将方法抽象成接口,具体选择那种实现方式交给调用者,通过调用时传递进来。

声明函数式接口:

  1. @FunctionalInterface //拒绝策略,函数式接口
  2. interface RejectPolicy<T>{
  3. void reject(BlockingQueue<T> queue,T task);
  4. }

BlockingQueue类中,增加tryPut方法:

  1. public void tryPut(RejectPolicy<T> rejectPolicy, T task){
  2. lock.lock();
  3. try {
  4. //判断队列是否为满
  5. if(queue.size()==capcity){
  6. //传入就好,权力下放给调用者,妙!
  7. rejectPolicy.reject(this,task);
  8. }else{
  9. queue.addLast(task);
  10. }
  11. }
  12. finally {
  13. }
  14. }

如果队列满,那么调用rejectPolicy.reject()方法,将阻塞队列与task传进去,因为毕竟此操作是对阻塞队列而言。

因为tryPut方法中要使用rejectPolicy对象,且ThreadPool类中的execute方法需要调用tryPut方法,所以ThreadPool类中需要添加RejectPolicy类引用属性,并在构造方法中初始化:

  1. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
  2. this.coreSize = coreSize;
  3. this.timeout = timeout;
  4. this.timeUnit = timeUnit;
  5. this.taskQueue = new BlockingQueue<>(queueCapacity);
  6. this.rejectPolicy = rejectPolicy;
  7. }

主线程内,在ThreadPool初始化时,指定RejectPolicy的实现类对象:

  1. ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10,(queue,task)->{
  2. // 1.死等
  3. queue.put(task);
  4. });

本例完整代码:

  1. import com.sun.corba.se.spi.orbutil.threadpool.Work;
  2. import javafx.concurrent.Worker;
  3. import java.io.Closeable;
  4. import java.sql.Time;
  5. import java.util.ArrayDeque;
  6. import java.util.Deque;
  7. import java.util.HashSet;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.concurrent.locks.Condition;
  10. import java.util.concurrent.locks.ReentrantLock;
  11. public class TestThreadPool {
  12. public static void main(String[] args) {
  13. //由主线程向线程池中提交任务
  14. ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10,(queue,task)->{
  15. // 1.死等
  16. queue.put(task);
  17. });
  18. for (int i=0;i<15;i++){
  19. int j =i;
  20. threadPool.execute(()->{
  21. System.out.println(j);
  22. });
  23. }
  24. }
  25. }
  26. @FunctionalInterface //拒绝策略,函数式接口
  27. interface RejectPolicy<T>{
  28. void reject(BlockingQueue<T> queue,T task);
  29. }
  30. class ThreadPool{
  31. //线程池中需要用到阻塞队列,任务队列
  32. private BlockingQueue<Runnable> taskQueue;
  33. // 线程集合,如果是Thread对象所包含的信息有限,所以将线程类包装成Worker
  34. private HashSet<Worker> workers = new HashSet<>();
  35. // 核心线程数
  36. private int coreSize;
  37. // 获取任务的超时时间,如果线程限定时间内获取不到任务,释放线程
  38. private long timeout;
  39. private TimeUnit timeUnit;
  40. private RejectPolicy<Runnable> rejectPolicy;
  41. // 执行任务
  42. public void execute(Runnable task){
  43. //当任务数没有超过 coreSize时,直接交给 worker对象执行
  44. //如果任务数超过 coreSize时,加入任务队列缓存。
  45. synchronized (workers){
  46. //线程数小于任务
  47. if (workers.size() < coreSize){
  48. Worker worker = new Worker(task);
  49. workers.add(worker);
  50. worker.start();
  51. }else {
  52. taskQueue.put(task);
  53. // 1.死等
  54. // 2.带超时等待
  55. // 3.让调用者放弃任务执行
  56. // 4.让调用者抛出异常
  57. // 5.让调用者自己执行任务
  58. taskQueue.tryPut(rejectPolicy,task);
  59. }
  60. }
  61. }
  62. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
  63. this.coreSize = coreSize;
  64. this.timeout = timeout;
  65. this.timeUnit = timeUnit;
  66. this.taskQueue = new BlockingQueue<>(queueCapacity);
  67. this.rejectPolicy = rejectPolicy;
  68. }
  69. class Worker extends Thread{
  70. private Runnable task;
  71. public Worker(Runnable task){
  72. this.task = task;
  73. }
  74. @Override
  75. public void run() {
  76. //执行任务
  77. // 1 task不为空,执行任务
  78. // 2 task执行完毕,再接着从任务队列获取任务并执行
  79. while(task!=null||(task = taskQueue.poll(timeout,timeUnit))!=null)){
  80. try {
  81. task.run();
  82. }catch (Exception e){
  83. }
  84. finally {
  85. task = null;
  86. }
  87. }
  88. synchronized (workers){
  89. workers.remove(this);
  90. }
  91. }
  92. }
  93. }
  94. class BlockingQueue<T>{
  95. // 1.任务队列
  96. private Deque<T> queue = new ArrayDeque<>();
  97. // 2.
  98. private ReentrantLock lock = new ReentrantLock();
  99. // 3.生产者条件变量
  100. private Condition fuulWaitSet = lock.newCondition();
  101. // 4.消费者条件变量
  102. private Condition emptyWaitSet = lock.newCondition();
  103. // 5.容量
  104. private int capcity;
  105. public BlockingQueue(int capcity) {
  106. this.capcity = capcity;
  107. }
  108. //带超时的阻塞获取
  109. public T poll(long timeout, TimeUnit unit){
  110. lock.lock();
  111. try {
  112. // timeout 统一转换为纳秒
  113. long nanos = unit.toNanos(timeout);
  114. while(queue.isEmpty()){
  115. try {
  116. if(nanos<=0){
  117. return null;
  118. }
  119. emptyWaitSet.awaitNanos(nanos);//能够自动解决虚假唤醒问题,返回的是剩余时间
  120. } catch (InterruptedException e) {
  121. e.printStackTrace();
  122. }
  123. }
  124. //获取队列头元素并返回
  125. T t =queue.removeFirst();
  126. fuulWaitSet.signal();
  127. return t;
  128. }finally {
  129. lock.unlock();
  130. }
  131. }
  132. // 阻塞获取
  133. public T take(){
  134. lock.lock();
  135. try {
  136. while(queue.isEmpty()){
  137. try {
  138. emptyWaitSet.await();
  139. } catch (InterruptedException e) {
  140. e.printStackTrace();
  141. }
  142. }
  143. //获取队列头元素并返回
  144. T t =queue.removeFirst();
  145. fuulWaitSet.signal();
  146. return t;
  147. }finally {
  148. lock.unlock();
  149. }
  150. }
  151. //阻塞添加
  152. public void put(T element){
  153. lock.lock();
  154. try{
  155. while(queue.size()==capcity){
  156. try {
  157. fuulWaitSet.await();
  158. } catch (InterruptedException e) {
  159. e.printStackTrace();
  160. }
  161. }
  162. queue.addLast(element);
  163. emptyWaitSet.signal();
  164. }finally {
  165. lock.unlock();
  166. }
  167. }
  168. //带超时时间的阻塞添加
  169. public boolean offer(T task, long timeout, TimeUnit timeUnit){
  170. lock.lock();
  171. try {
  172. long nanos = timeUnit.toNanos(timeout);
  173. while (queue.size()==capcity){
  174. try {
  175. nanos = fuulWaitSet.awaitNanos(nanos);
  176. } catch (InterruptedException e) {
  177. e.printStackTrace();
  178. }
  179. }
  180. queue.addLast(task);
  181. emptyWaitSet.signal();
  182. }finally {
  183. lock.unlock();
  184. }
  185. }
  186. //获取大小,当然这里可以不加锁,完全只读
  187. public int size(){
  188. lock.lock();
  189. try {
  190. return queue.size();
  191. }finally {
  192. lock.unlock();
  193. }
  194. }
  195. public void tryPut(RejectPolicy<T> rejectPolicy, T task){
  196. lock.lock();
  197. try {
  198. //判断队列是否为满
  199. if(queue.size()==capcity){
  200. //传入就好,权力下放给调用者,妙!
  201. rejectPolicy.reject(this,task);
  202. }else{
  203. queue.addLast(task);
  204. }
  205. }
  206. finally {
  207. }
  208. }
  209. }

(附)补充知识-函数式接口与Lambda表达式:

Java8 函数式接口,函数式接口(Functional Interface)是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。
函数式接口可以被隐式转换为lambda表达式。

如下定义了一个函数式接口:

  1. @FunctionalInterface
  2. interface GreetingService{
  3. void sayMessage(String message);
  4. }

接口中只有一个抽象方法,该抽象方法的参数是 String message。

使用Lambda表达式来表示该接口的一个实现类对象:

  1. GreetingService greetService1 = message->System.out.println("hello "+ message);

其中message是待实现方法的参数,当有多个参数时用小括号扩起,->后是方法的实现细节

(附)补充知识-接口实现类对象的快捷创建:

与函数式接口+lambda表达式的用法类似,当需要快捷创建一个接口的实现类对象时,无需先定义接口实现类,接着再new impl()对象,而可以直接使用如下语法:

  1. new interface (){
  2. // 接口实现类细节...
  3. @Override
  4. // 实现方法...
  5. }

示例代码:

  1. public class TestInterfaceImpl {
  2. public static void main(String[] args) {
  3. Animal animal = new Animal() {
  4. @Override
  5. public void run() {
  6. System.out.println("I am a dog and I can fly");
  7. }
  8. };
  9. }
  10. }
  11. interface Animal{
  12. void run();
  13. }