多线程

多线程,是对资源的利用方式,以期望使用更少的资源,做更多的事情,提升 CPU、IO等资源的利用率。
java 实现多线程的方式:runnable,thread
thread run、start、join 的几个方法的区别:

  1. run 当前线程运行,串行,不能起到多线程的效果
  2. start 另起线程执行,并行
  3. join 等待线程执行完,阻止当前线程退出,多与 start 方法一起使用

代码

  1. package com.xiaohui.demo.webfluxclient.thread;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * <p>
  5. * 测试多线程
  6. * </p>
  7. *
  8. * @author xiaohui
  9. * @email xuxiaohuimail@163.com
  10. * @since 2021/6/22
  11. */
  12. public class TestThread {
  13. public static void main(String[] args) throws InterruptedException {
  14. System.out.println(">>>>>>>>>>>>>>>>>>>> start " + Thread.currentThread().getName());
  15. // run 方法,直接被当前线程调用业务代码
  16. MyThread mythead001 = new MyThread("mythead001");
  17. mythead001.run();
  18. // start 方法,另起线程执行当前业务代码
  19. MyThread mythead002 = new MyThread("mythead002");
  20. mythead002.start();
  21. // join 将当前的线程执行完成,再执行后面的业务代码(start会直接继续执行后续代码,不会等待返回),join 一般与start 一并使用
  22. MyThread mythead003 = new MyThread("mythead003");
  23. mythead003.start();
  24. mythead003.join();
  25. System.out.println(">>>>>>>>>>>>>>>>>>>> end " + Thread.currentThread().getName());
  26. }
  27. static class MyThread extends Thread {
  28. public MyThread(String myName) {
  29. this.myName = myName;
  30. }
  31. public String getMyName() {
  32. return myName;
  33. }
  34. public void setMyName(String myName) {
  35. this.myName = myName;
  36. }
  37. private String myName;
  38. @Override
  39. public void run() {
  40. try {
  41. System.out.println(">>>>>>>>>>>>>>>>>>> 进入mythread 。。。threadName:" + Thread.currentThread().getName() + ",参数:" + getMyName());
  42. TimeUnit.SECONDS.sleep(10);
  43. System.out.println(">>>>>>>>>>>>>>>>>>> 退出mythread 。。。threadName:" + Thread.currentThread().getName() + ",参数:" + getMyName());
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. static class MyRunnable implements Runnable {
  50. public MyRunnable(String myName) {
  51. this.myName = myName;
  52. }
  53. public String getMyName() {
  54. return myName;
  55. }
  56. public void setMyName(String myName) {
  57. this.myName = myName;
  58. }
  59. private String myName;
  60. @Override
  61. public void run() {
  62. try {
  63. System.out.println(">>>>>>>>>>>>>>>>>>> 进入MyRunnable 。。。threadName:" + Thread.currentThread().getName() + ",参数:" + getMyName());
  64. TimeUnit.SECONDS.sleep(10);
  65. System.out.println(">>>>>>>>>>>>>>>>>>> 退出MyRunnable 。。。threadName:" + Thread.currentThread().getName() + ",参数:" + getMyName());
  66. } catch (InterruptedException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. }

线程池

threadpool,

有了多线程,为什么还需要线程池?

创建线程消耗时间多,资源多,线程越多,切换线程消耗时间长。线程池可以复用线程。
image.png

代码测试:

  1. package com.xiaohui.demo.webfluxclient.thread;
  2. import org.junit.jupiter.api.Test;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Random;
  6. import java.util.concurrent.CopyOnWriteArrayList;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * <p>
  12. * 测试线程池
  13. * </p>
  14. *
  15. * @author xiaohui
  16. * @email xuxiaohuimail@163.com
  17. * @since 2021/6/23
  18. */
  19. public class TestThreadPool {
  20. /**
  21. * 测试非线程池下的代码
  22. *
  23. * @throws InterruptedException
  24. */
  25. @Test
  26. public void testNonThreadPool() throws InterruptedException {
  27. System.out.println(">>>>>>>>>>>>>>>>>>>>> testNonThreadPool");
  28. long start = System.currentTimeMillis();
  29. final List<Integer> list = new ArrayList<>();
  30. for (int i = 0; i < 100000; i++) {
  31. Thread thread = new Thread(() -> {
  32. list.add(new Random().nextInt());
  33. });
  34. thread.start();
  35. thread.join();
  36. }
  37. System.out.println(String.format("list size %d", list.size()));
  38. System.out.println(String.format("cost time %d", System.currentTimeMillis() - start));
  39. }
  40. @Test
  41. public void testThreadPool() throws InterruptedException {
  42. System.out.println(">>>>>>>>>>>>>>>>>>>>> testThreadPool");
  43. long start = System.currentTimeMillis();
  44. final List<Integer> list = new ArrayList<>();
  45. ExecutorService executorService = Executors.newFixedThreadPool(1000);
  46. for (int i = 0; i < 100000; i++) {
  47. /*
  48. >>>>>>>>>>>>>>>>>>>>> testThreadPool
  49. list size 99931
  50. cost time 5705
  51. */
  52. // 使用 submit, 执行结果 list size 小于 100000,ArrayList 非线程安全
  53. executorService.submit(() -> {
  54. list.add(new Random().nextInt());
  55. });
  56. }
  57. executorService.shutdown();
  58. executorService.awaitTermination(1, TimeUnit.HOURS);
  59. System.out.println(String.format("list size %d", list.size()));
  60. System.out.println(String.format("cost time %d", System.currentTimeMillis() - start));
  61. }
  62. @Test
  63. public void testThreadPool2() throws InterruptedException {
  64. System.out.println(">>>>>>>>>>>>>>>>>>>>> testThreadPool2");
  65. long start = System.currentTimeMillis();
  66. final List<Integer> list = new ArrayList<>();
  67. ExecutorService executorService = Executors.newFixedThreadPool(1000);
  68. for (int i = 0; i < 100000; i++) {
  69. /*
  70. >>>>>>>>>>>>>>>>>>>>> testThreadPool2
  71. list size 99985
  72. cost time 992
  73. */
  74. // 使用 execute, 执行结果 list size 小于 100000,ArrayList 非线程安全
  75. executorService.execute(() -> {
  76. list.add(new Random().nextInt());
  77. });
  78. }
  79. executorService.shutdown();
  80. executorService.awaitTermination(1, TimeUnit.HOURS);
  81. System.out.println(String.format("list size %d", list.size()));
  82. System.out.println(String.format("cost time %d", System.currentTimeMillis() - start));
  83. }
  84. @Test
  85. public void testThreadPool3() throws InterruptedException {
  86. System.out.println(">>>>>>>>>>>>>>>>>>>>> testThreadPool3");
  87. long start = System.currentTimeMillis();
  88. // final List<Integer> list = new ArrayList<>();
  89. // 采用线程安全的 list
  90. CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
  91. ExecutorService executorService = Executors.newFixedThreadPool(1000);
  92. for (int i = 0; i < 100000; i++) {
  93. /*
  94. >>>>>>>>>>>>>>>>>>>>> testThreadPool3
  95. list size 100000
  96. cost time 13683
  97. */
  98. // 使用 execute, 执行结果 list size 等于 100000,CopyOnWriteArrayList 非线程安全
  99. executorService.execute(() -> {
  100. list.add(new Random().nextInt());
  101. });
  102. }
  103. executorService.shutdown();
  104. executorService.awaitTermination(1, TimeUnit.HOURS);
  105. System.out.println(String.format("list size %d", list.size()));
  106. System.out.println(String.format("cost time %d", System.currentTimeMillis() - start));
  107. }
  108. /**
  109. * 采用 Executors.newSingleThreadExecutor() 线程池,不存在线程安全问题
  110. *
  111. * @throws InterruptedException
  112. */
  113. @Test
  114. public void testThreadPool4() throws InterruptedException {
  115. System.out.println(">>>>>>>>>>>>>>>>>>>>> testThreadPool4");
  116. long start = System.currentTimeMillis();
  117. final List<Integer> list = new ArrayList<>();
  118. // 采用单一线程池执行
  119. ExecutorService executorService = Executors.newSingleThreadExecutor();
  120. for (int i = 0; i < 100000; i++) {
  121. /*
  122. >>>>>>>>>>>>>>>>>>>>> testThreadPool4
  123. list size 100000
  124. cost time 150
  125. */
  126. // 使用 execute, 执行结果 list size 等于 100000,单一线程,不存在是否多线程的安全问题
  127. executorService.execute(() -> {
  128. list.add(new Random().nextInt());
  129. });
  130. }
  131. executorService.shutdown();
  132. executorService.awaitTermination(1, TimeUnit.HOURS);
  133. System.out.println(String.format("list size %d", list.size()));
  134. System.out.println(String.format("cost time %d", System.currentTimeMillis() - start));
  135. }
  136. }

为什么不推荐使用 Executors 工具类创建线程池

容易导致 CPU 使用率高,比如 Executors.newCacheThreadPool(),没有缓存的线程时,一个劲儿创建线程;
容易导致 OOM 内存溢出,比如 Executors.newFixedThreadPool(10)、Executors.newSingleThreadExecutor(),队列过大,耗尽内存;

它的原理(源码理解)

image.png
image.png
image.png
image.png
image.png
image.png
image.png

如何使用

代码。使用工具类 Executors

  1. package com.xiaohui.demo.webfluxclient.thread;
  2. import org.junit.jupiter.api.Test;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * <p>
  8. * 测试线程池2
  9. * </p>
  10. *
  11. * @author xiaohui
  12. * @email xuxiaohuimail@163.com
  13. * @since 2021/6/23
  14. */
  15. public class TestTreadPool_2 {
  16. @Test
  17. public void testMultiPool() throws InterruptedException {
  18. ExecutorService executorService1 = Executors.newSingleThreadExecutor();
  19. ExecutorService executorService2 = Executors.newFixedThreadPool(100);
  20. ExecutorService executorService3 = Executors.newCachedThreadPool();
  21. System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
  22. System.out.println("Executors.newSingleThreadExecutor() cost time:" + costTime(executorService1));
  23. System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
  24. System.out.println("Executors.Executors.newFixedThreadPool(100) cost time:" + costTime(executorService2));
  25. System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
  26. System.out.println("Executors.newCachedThreadPool() cost time:" + costTime(executorService3));
  27. // 結果
  28. /*
  29. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  30. curr thread name: pool-1-thread-1, i 为 0
  31. curr thread name: pool-1-thread-1, i 为 1
  32. curr thread name: pool-1-thread-1, i 为 2
  33. curr thread name: pool-1-thread-1, i 为 3
  34. curr thread name: pool-1-thread-1, i 为 4
  35. Executors.newSingleThreadExecutor() cost time:10021
  36. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  37. curr thread name: pool-2-thread-1, i 为 0
  38. curr thread name: pool-2-thread-2, i 为 1
  39. curr thread name: pool-2-thread-3, i 为 2
  40. curr thread name: pool-2-thread-4, i 为 3
  41. curr thread name: pool-2-thread-5, i 为 4
  42. Executors.Executors.newFixedThreadPool(100) cost time:2000
  43. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  44. curr thread name: pool-3-thread-1, i 为 0
  45. curr thread name: pool-3-thread-5, i 为 4
  46. curr thread name: pool-3-thread-4, i 为 3
  47. curr thread name: pool-3-thread-2, i 为 1
  48. curr thread name: pool-3-thread-3, i 为 2
  49. Executors.newCachedThreadPool() cost time:2017
  50. */
  51. }
  52. private Long costTime(ExecutorService executorService) throws InterruptedException {
  53. long start = System.currentTimeMillis();
  54. for (int i = 0; i < 5; i++) {
  55. final int a = i;
  56. executorService.execute(() -> {
  57. try {
  58. TimeUnit.SECONDS.sleep(2);
  59. System.out.println(String.format("curr thread name: %s, i 为 %d ", Thread.currentThread().getName(), a));
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. });
  64. }
  65. executorService.shutdown();
  66. executorService.awaitTermination(1, TimeUnit.HOURS);
  67. return System.currentTimeMillis() - start;
  68. }
  69. }

线程调优

术语介绍

线程与进程

进程,资源分配的最小单位;
线程,CPU 调度的最小单位;也叫轻量级进程 LWP(Light Weight Process)
一个进程拥有多个线程,一个线程属于某一个进程;

ULT 与 KLT 线程模型

用户级线程,user level thread,用户自己创建管理,操作系统(kernel)对线程的存在与否无感知。
image.png
内核级先吃,kernel level thread,依赖内核系统,与内核系统线程一对一管理
image.png
image.png
image.png
程序计数器,pc 寄存器,CPU 时间片等概念

阻塞队列

FIFO
在任意时刻(不管是否高并发),永远只有一个线程能够进行队列的入队或者出队操作!
线程安全的队列!
(实际)有界!(不管指不指定大小,最终取决于宿主操作系统);
队列满,只能进行出队操作,所有的入队操作必须等待,也就是被阻塞;
队列空,只能进行入队操作,所有的出队操作必须等待,也就是被阻塞;

提交优先级与执行优先级

线程池的提交优先级与执行优先级,都是代码实现,限定了它的表现形式。所以,查看源码吧~~~
任务提交数量:
提交优先:
小于 coreSize 线程,创建线程,执行任务
大于 coreSize 线程,小于 blockQueue size,提交到阻塞队列中
大于 blockQueue size,小于 maxSize 线程, 创建线程,执行任务
大于 blockQueue size,大于 maxSize 线程,采用相关的策略来进行处理

执行优先级:
coreSize 的 thread 先行
maxSize 的在后(任务未能填满阻塞队列时,maxSize 的线程不会创建,即任务在会在 coreSize 线程上执行);

线程中断信号,安全退出