1. 线程池

1.1 ThreadPoolExecutor

image.png

  • Running:运行状态。能处理新提交的任务,也能处理阻塞队列中的任务
  • Shutdown:关闭状态。不能接受新提交的任务,但是能够继续处理阻塞队列中已经保存的任务。
  • Stop:不接收新的任务,也不能处理队列中已经保存的任务
  • Tidying:如果所有的任务都已经终止,线程池会进入该状态
  • Terminated:Tidying 状态调用 terminated() 进入该状态。

ThreadPool 提供的方法

  • execute ():提交任务,交给线程池执行
  • submit ():提交任务,能够返回执行结果 execute+ Future
  • shutdown ():关闭线程池,等待任务都执行完
  • shutdownNow() :关闭线程池,不等待任务执行完

  • getTaskCount ():线程池已执行和未执行的任务总数

  • getCompletedTaskCount ():已完成的任务数量
  • getPoolSize ():线程池当前的线程数量
  • getActiveCount ():当前线程池中正在执行任务的线程数量
  • getQueue():返回执行程序使用的任务队列
  • isTerminating():如果此执行程序正在终止,则返回true
  • isTerminated()

生产上的一个使用实例

  1. /**
  2. * 同步录播课章节顺序
  3. *
  4. * @return
  5. * @throws InterruptedException
  6. */
  7. @ResponseBody
  8. @RequestMapping(value = "/synchroCourseVideoChapter", method = RequestMethod.GET)
  9. public String synchroCourseVideoChapter() throws InterruptedException {
  10. log.info("#############videoChapter:同步排序章节开始");
  11. String status;
  12. ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
  13. ClassModule module = new ClassModule();
  14. module.setPageSize(100);
  15. module.setDelFlag(0);
  16. module.setTeachMethod("TEACH_METHOD_VIDEO");
  17. int page = 1;
  18. for (; ; page++) {
  19. module.setPage(page);
  20. List<ClassModule> classModules = classModuleServiceImpl.findClassModuleByPage(module);
  21. if (classModules.isEmpty() || classModules.size() == 0) {
  22. log.info("#############videoChapter:暂无章节可以处理");
  23. break;
  24. }
  25. for (ClassModule classModule : classModules)
  26. executorService.execute(new CourseVideoChapterThread(classModule.getId()));
  27. // 获取任务队列的长度
  28. int size = executorService.getQueue().size();
  29. try {
  30. // 判断任务队列长度,计算休眠时间
  31. Thread.sleep((size / 10) == 0 ? (size / 10) : 20);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. // 关闭线程池 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
  37. executorService.shutdown();
  38. for (; ; ) {
  39. // 用死循环去判断执行程序是否已经终止,来更改状态
  40. if (executorService.isTerminated()) {
  41. // 所有的子线程都结束了
  42. status = JsonMsg.SUCCESS;
  43. log.info("#############videoChapter:更新录播课章节顺序处理完成");
  44. break;
  45. }
  46. Thread.sleep(1000);
  47. }
  48. return status;
  49. }

线程池类图
image.png

1.2 Executor 接口

  • Executors.newCachedThreadPool:创建可缓存的线程池
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  • Executors.newFixedThreadPool:创建定长线程池
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  • Executors.newScheduledThreadPool:创建定长线程池,并且支持定时和周期性的执行
  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

Demo:

  1. public static void main(String[] args) {
  2. ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
  3. executorService.schedule(new Runnable() {
  4. @Override
  5. public void run() {
  6. log.warn("schedule run");
  7. }
  8. }, 3, TimeUnit.SECONDS);
  9. executorService.scheduleAtFixedRate(new Runnable() {
  10. @Override
  11. public void run() {
  12. log.warn("schedule run");
  13. }
  14. }, 1, 3, TimeUnit.SECONDS);
  15. // executorService.shutdown();
  16. Timer timer = new Timer();
  17. timer.schedule(new TimerTask() {
  18. @Override
  19. public void run() {
  20. log.warn("timer run");
  21. }
  22. }, new Date(), 5 * 1000);
  23. }
  • Executors.newSingleThreadExecutor:创建单线程化的线程池
  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

1.3 线程池的合理配置

  • CPU 密集型任务,就需要尽量压榨 CPU,参考值可以设为 NCPU+1 (NCPU:CPU数量)
    • CPU 密集型:指的是系统的硬盘、内存性能相对 CPU 要好很多(硬盘、内存 > CPU)
  • IO 密集型任务,参考值可以设置为 2*NCPU
    • 系统的 CPU 性能相对硬盘、内存要好很多 (CPU > 硬盘、内存)

2. 多线程并发最佳实战

  • 使用本地变量
  • 使用不可变类
  • 最小化锁的作用域范围:S = 1/(1-a + a/n) (阿木达尔定律)
    • a:串行计算部分所占比例
    • n:并行处理的节点个数
    • 当 a=0 时,没有串行只有并行,最大加速比 s=n;当 a=1 时,最小加速比 s=1;当n→∞时,极限加速s→ 1/a,这也就是加速比的上限。例如,若串行代码占整个代码的25%,则并行处理的总体性能不可能超过4。这一公式已被学术界所接受,并被称做「阿姆达尔定律」(Amdahl law)。
  • 使用线程池的 Executor,而不是直接 new Thread 执行
  • 宁可使用同步也不要使用线程的 wait 和 notify
  • 使用 Blocking Queue 实现生产-消费模式
  • 使用并发集合而不是加了锁的同步集合
  • 使用 Semaphore 刨建有界的访向
  • 宁可使用同步代码块,也不使用同步的方法
  • 避免使用静态变量

3. Spring 与线程安全

  • Spring bean:singleton、prototype
    • 提供了 scope 属性来表示该 bean 的作用域,它是这个 bean 的生命周期
    • spring 默认 scope 为 singleton,没生成一个 bean,默认是单例对象,该对象的生命周期与 spring 容器是一致的,只是在第一次被注入时才会创建。
    • scope 为 prototype 时,bean 被定义为在每次注入时都被创建。(频繁创建,会严重影响性能)
  • 无状态对象
    • 我们交于 spring 管理的对象都会无状态对象,不会因为多线程导致状态被破坏。
    • 无状态对象是自身没有状态的对象,包括 dto、vo、service、dao、controller 等

spring 根本就没有对 bean 的多线程安全问题做出保证和措施,对于每个 bean 的线程安全问题,根本原因是因为每个 bean 自身设计没有在 bean 中声明任何有状态的实例变量或类变量。如果必须加入有状态的实例变量或者类变量,让类变成有状态的对象时,就需要使用 ThreadLocal 把变量变成线程私有的;如果 bean 的实例变量或者类变量需要在多个线程之间共享,那么我们就只能使用 synchronized、lock、CAS 等这些来实现线程同步的方法了。

4. HashMap 和 ConcurrentHashMap

单线程下 HashMap 的 rehash:
image.png

多线程并发下的 rehash:
image.png

横线上边是第一个线程,执行一半,下边的第二个线程开始执行,并且执行完了 rehash,接下来线程1被唤醒:
image.png
key = 5 和 key = 9 出现循环列表,访问时会 rehash 会出现死循环。

1.7 里的 ConcurrentHashMap:
image.png

1.8 里的 ConcurrentHashMap:
image.png

**