配置

  • corePoolSize : 核心线程数,一旦创建将不会再释放。
  • maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
  • keepAliveSeconds : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。
  • handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序

示例:

  1. package com.greatonce.tuya.biz.impl;
  2. @EnableCaching
  3. @Configuration
  4. public class BizConfiguration {
  5. @Value("${tuya.biz.task.corePoolSize:10}")
  6. private int corePoolSize;
  7. @Value("${tuya.biz.task.maxPoolSize:20}")
  8. private int maxPoolSize;
  9. @Value("${tuya.biz.task.queueCapacity:50}")
  10. private int queueCapacity;
  11. @Value("${tuya.biz.task.keepAliveSeconds:60}")
  12. private int keepAliveSeconds;
  13. @Bean
  14. public ThreadPoolExecutorFactoryBean bizExecutor() {
  15. ThreadPoolExecutorFactoryBean factoryBean = new ThreadPoolExecutorFactoryBean();
  16. // 核心线程数,一直存活,图鸦中设置为1
  17. factoryBean.setCorePoolSize(corePoolSize);
  18. // 当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。
  19. // 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
  20. // 图鸦中设置为10
  21. factoryBean.setMaxPoolSize(maxPoolSize);
  22. // 任务队列容量,图鸦中设置为20
  23. factoryBean.setQueueCapacity(queueCapacity);
  24. // 当线程空闲时间达到setKeepAliveSeconds,该线程会退出,直到线程数量等于corePoolSize。
  25. // 图鸦中设置为60
  26. factoryBean.setKeepAliveSeconds(keepAliveSeconds);
  27. factoryBean.setThreadNamePrefix("biz-task");
  28. //(1) 默认的ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException;
  29. //(2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
  30. //(3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除;
  31. //(4) ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
  32. factoryBean.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  33. return factoryBean;
  34. }

使用

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

示例:

  1. @Autowired
  2. private ExecutorService executorService;
  3. /**
  4. * 立即投放
  5. */
  6. @Override
  7. public void goPublish(PublishProduct publishProduct) {
  8. Store store = storeService.getByKey(publishProduct.getStoreId());
  9. String publishSetting = publishProduct.getPublishSetting();
  10. List<PublishProductImage> publishProductImages = publishProduct.getPublishProductImages();
  11. CountDownLatch countDownLatch = new CountDownLatch(publishProductImages.size());
  12. publishProductImages.forEach(publishProductImage -> {
  13. executorService.execute(() -> {
  14. try {
  15. publishProductImageService.publish(store, publishProductImage, publishSetting);
  16. } catch (Exception e) {
  17. publishLOGGER.error("投放图片失败", e);
  18. } finally {
  19. countDownLatch.countDown();
  20. }
  21. });
  22. });
  23. try {
  24. countDownLatch.await();
  25. publishAfter(publishProduct);
  26. } catch (InterruptedException e) {
  27. publishLOGGER.error("服务异常中断", e);
  28. }
  29. }

@Async

在Spring中,基于@Async标注的方法会异步执行, 使用方式如下:

  1. 启用异步, 在启动类上标注@EnableAsync
  2. 配置对应的业务线程池, 示例代码如下

    1. /**
    2. * 默认的业务线程池.
    3. */
    4. @Bean
    5. public AsyncTaskExecutor bizExecutor() {
    6. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    7. executor.setCorePoolSize(bizCorePoolSize);
    8. executor.setMaxPoolSize(bizMaxPoolSize);
    9. executor.setQueueCapacity(bizQueueCapacity);
    10. executor.setThreadNamePrefix("oms-biz-");
    11. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    12. executor.setKeepAliveSeconds(bizKeepAliveSeconds);
    13. return executor;
    14. }
  3. 使用注解并指定线程池, 示例代码如下

    1. @Override
    2. @Async("bizExecutor")
    3. public void asyncDownload(ProductMallMappingDownloadBO downloadBO) {
    4. BizContext.setNickname(downloadBO.getOperator());
    5. Store store = storeService.getByKey(downloadBO.getStoreId());
    6. downloadBO.setStore(store);
    7. download(downloadBO);
    8. BIZ_LOGGER.log("铺货下载", "参数:{}", JsonUtil.toJson(downloadBO));
    9. }