1. 基础多线程

1.2 创建方式

1.3 使用方式

2.线程池

2.1 线程池的创建方法。

  1. //源码的创建方式
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. ThreadFactory threadFactory,
  8. RejectedExecutionHandler handler) {
  9. ....
  10. }

2.2 七大参数

ThreadPoolExecutor中重要的几个参数详解

  • corePoolSize:核心线程数,也是线程池中常驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务
  • maximumPoolSize:最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)
  • keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉,注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作用了(因为不存在非核心线程);
  • unit:keepAliveTime的时间单位
  • workQueue:用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize时,这时新进来的任务会被放到队列中
  • threadFactory:创建线程的工厂类,默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建
  • handler:拒绝策略,线程池无法继续接收任务(队列已满且线程数达到maximunPoolSize)时的饱和策略,取值有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy

2.3 执行流程图

多线程 - 图1

2.4 阻塞队列

  • SynchronousQueue(同步移交队列):队列不作为任务的缓冲方式,可以简单理解为队列长度为零
    • 即有空闲线程就执行,否则拒绝策略
  • LinkedBlockingQueue(无界队列):队列长度不受限制,当请求越来越多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致内存占用过多或OOM
    • 注意:可以指定初始化阈值,这个阈值并不是队列的真正最大长度(链表最大长度是内存上限),只是为了告诉线程池执行器,当阻塞队列到达该阈值的时候要去创建新的线程(未达到最大线程属性情况下)去执行更多任务。
    • 如果不指定阈值,那么设置最大线程数量没意义。因为默认Link的阈值是Integer.MAX_VALUE,这个阈值达不到,那么就不可能创建新的线程.
    • 阈值不是Link的长度上限,哪怕待阻塞队列的长度大于阈值,也会继续追加到该Link中,不会执行拒绝策略
    • 拒绝策略失效。
  • ArrayBlockintQueue(有界队列):队列长度受限,当队列满了就需要创建多余的线程来执行任务。
    • 初始化时,必须指定队列的长度,与1.3的执行流程图完全一致。

2.5 拒绝策略

  • AbortPolicy:中断抛出异常
  • DiscardPolicy:默默丢弃任务,不进行任何通知
  • DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务
  • CallerRunsPolicy:让提交任务的线程去执行任务(对比前三种比较友好一丢丢)
  • 自定义:(r, executor) -> System.out.println(“reject!!”)

2.6 关闭线程池

  • shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表
  • shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略
  • isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true

2.7例子

  1. public static void main(String[] args) {
  2. //阻塞队列,二选一即可
  3. ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
  4. LinkedBlockingDeque<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<>(20);
  5. //创建线程池
  6. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  7. 1, 5, 100, TimeUnit.SECONDS, linkedBlockingDeque
  8. ,Executors.defaultThreadFactory()
  9. , (r, executor) -> System.out.println("reject!!")
  10. );
  11. //创建待执行的任务
  12. for (int i = 0; i < 10; i++) {
  13. threadPoolExecutor.execute(() -> {
  14. try {
  15. System.out.println("execute!");
  16. Thread.sleep(500);
  17. } catch (InterruptedException e) {
  18. throw new RuntimeException(e);
  19. }
  20. });
  21. }
  22. //安全关闭线程池
  23. threadPoolExecutor.shutdown();
  24. }

3.Springboot使用线程池

配置类

  1. @Configuration
  2. @EnableAsync //开启多线程
  3. public class ThreadPoolConf {
  4. @Bean("threadPoolTaskExecutor")
  5. public Executor taskExecutor(){
  6. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  7. //设置核心线程数
  8. executor.setCorePoolSize(5);
  9. //设置最大线程数
  10. executor.setMaxPoolSize(20);
  11. //设置活跃线程的时间,单位秒
  12. executor.setKeepAliveSeconds(60);
  13. //配置队列的大小
  14. executor.setQueueCapacity(Integer.MAX_VALUE);
  15. //设置线程前缀名称
  16. executor.setThreadNamePrefix("阿果线程池-");
  17. //当关闭停止spring容器时,如果线程池仍然有任务,那么不会中断尚未完成的任务
  18. executor.setWaitForTasksToCompleteOnShutdown(true);
  19. //初始化线程池
  20. executor.initialize();
  21. return executor;
  22. }
  23. }

使用方法 1

  1. @Resource(name = "threadPoolTaskExecutor")
  2. private Executor executor;
  3. @Override
  4. public void spikeConsumer() {
  5. executor.execute(
  6. ()-> //TODO
  7. );
  8. }

使用方法2

  1. //定义一个业务类,这里写方法,方法上的加Async注解表示指定线程池异步执行任务。
  2. /**
  3. 不可以在自己的类A,调用自己被Async注解的方法,这是会失效的,执行任务的还是当前线程。
  4. 应该是要把要异步的方法,专门提取并放到一个类B中,由自己类A发起调用类B的被Async注解的方法,采用使用但线程池。
  5. */
  6. @Service
  7. public class ThreadService {
  8. @Async("threadPoolTaskExecutor")
  9. public void increaseArticleViewCount(...) {
  10. //TODO
  11. }
  12. }