0. 线程池

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html?from=timeline

为什么要用线程池

  • 相比new Thread,Java提供的四种线程池的好处在于:

a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
c. 提供定时执行、定期执行、单线程、并发数控制等功能。

  • Java通过Executors提供四种线程池,分别为:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  • Executor框架有其局限性,不够灵活

Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,
但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,
使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

线程池的原理

线程池的核心参数都是什么含义

  1. //使用线程池来创建线程,简易版
  2. private static ExecutorService threadPool = new ThreadPoolExecutor(50, 300, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
  3. threadPool.submit(() -> {
  4. log.info("线程开始");
  5. });

ThreadPoolExecutor的构造函数

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }
  • corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
  • maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
  • keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
  • unit:keepAliveTime的单位
  • workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
  1. 直接提交队列:参数为SynchronousQueue队列(new SynchronousQueue())
    SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。**
  2. 有界的任务队列:参数为ArrayBlockingQueue有界任务队列( new ArrayBlockingQueue(10))
    若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
  3. 无界的任务队列:参数为LinkedBlockingQueue无界的任务队列(new LinkedBlockingQueue())
    使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
  4. 优先任务队列: 参数为PriorityBlockingQueue(new PriorityBlockingQueue())
    除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
  • threadFactory:线程工厂,用于创建线程,一般用默认即可;线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等
  • handler:拒绝策略;当任务太多来不及处理时(maximumPoolSize),如何拒绝任务;1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;5、自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略
  • ThreadPoolExecutor扩展ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,1、beforeExecute:线程池中任务运行前执行2、afterExecute:线程池中任务运行完毕后执行3、terminated:线程池退出后执行 ```java /**

    • 手动创建线程池
    • @param name 名称
    • @param num 数量
    • @return */ public ExecutorService getExecutorService(Integer num,String name) { //线程工厂 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(name).build();

      //启动线程池 return new ThreadPoolExecutor(num, num,

      1. 0L, TimeUnit.MILLISECONDS,
      2. new LinkedBlockingQueue<Runnable>(), namedThreadFactory,new ThreadPoolExecutor.AbortPolicy()

      ){

      1. @Override
      2. protected void afterExecute(Runnable r, Throwable t) {
      3. super.afterExecute(r, t);
      4. printException(r, t);
      5. }

      }; } private static void printException(Runnable r, Throwable t) { if (t == null && r instanceof Future<?>) {

      1. try {
      2. Future<?> future = (Future<?>) r;
      3. if (future.isDone()){
      4. future.get();
      5. }
      6. } catch (CancellationException ce) {
      7. t = ce;
      8. } catch (ExecutionException ee) {
      9. t = ee.getCause();
      10. } catch (InterruptedException ie) {
      11. Thread.currentThread().interrupt();
      12. }

      } if (t != null){

      1. log.error(t.getMessage(), t);

      }

}

  1. <a name="b1j7c"></a>
  2. # 1.多线程实现(两种)
  3. 线程的执行顺序不能人为干预,是由调度器(CPU)去处理的<br />多个线程在操作同一个资源的时候,会出现抢夺i问题,需要加入并发控制
  4. <a name="fedlV"></a>
  5. ## 实现方法一:继承Thread类,重写run()方法,调用start()启动线程 ------- 单继承局限性
  6. - 线程调用run()和start()是不一样的<br />start()方法: 它会启动一个新线程,并将其添加到线程池中,待其获得CPU资源时会执行run()方法,start()不能被重复调用。run()方法:它和普通的方法调用一样,不会启动新线程。只有等到该方法执行完毕,其它线程才能获得CPU资源。start()实际上是通过本地方法start0()启动一个新线程,新线程会调用run()方法。
  7. - 线程创建不一定立即执行,受CPU调度影响
  8. ![3.png](https://cdn.nlark.com/yuque/0/2021/png/25558498/1638870931316-ca572a1a-273d-4b52-9328-c522a27d2c2d.png#clientId=u1cf7da7d-c72a-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uc7ba54ff&margin=%5Bobject%20Object%5D&name=3.png&originHeight=458&originWidth=819&originalType=binary&ratio=1&rotation=0&showTitle=false&size=224132&status=done&style=none&taskId=u7fbb947e-6db9-4ca6-a180-27ec3909a15&title=)
  9. ```java
  10. package com.faq.demo01;
  11. //创建线程方式一:继承Thread类,重写run()方法,调用start开启线程
  12. //总结:注意,线程的开启不一定立即执行,由CPU调度执行
  13. public class TestThread1 extends Thread{
  14. @Override
  15. public void run() {
  16. //run方法线程体
  17. for (int i = 0; i < 20; i++) {
  18. System.out.println("我在看代码--"+i);
  19. }
  20. }
  21. public static void main(String[] args) {
  22. //main线程,主线程
  23. //创建一个线程对象
  24. TestThread1 testThread = new TestThread1();
  25. //调用start()方法开启线程
  26. //testThread.run();//普通方法调用
  27. testThread.start();//与主线程交替运行
  28. for (int i = 0; i < 20; i++) {
  29. System.out.println("我在学习多线程--"+i);
  30. }
  31. }
  32. }

实现方法二:实现Runable类,实现run()方法,创建线程对象调用start()启动线程 ———— 方便同一个对象被多个线程使用

  1. package com.faq.demo01;
  2. //创建线程方式2:实现runnable接口,重写run方法,执行线程需要丢入runnable接口实现类,调用start方法
  3. public class TestThread3 implements Runnable{
  4. @Override
  5. public void run() {
  6. //run方法线程体
  7. for (int i = 0; i < 20; i++) {
  8. System.out.println("我在看代码--"+i);
  9. }
  10. }
  11. public static void main(String[] args) {
  12. //main线程,主线程
  13. //创建runnable接口的实现类对象
  14. TestThread3 testThread3 = new TestThread3();
  15. //创建线程对象,通过线程对象来开启我们的线程,代理
  16. Thread thread = new Thread(testThread3);
  17. thread.start();//可以合起来写成new Thread(testThread3).start();
  18. for (int i = 0; i < 20; i++) {
  19. System.out.println("我在学习多线程--"+i);
  20. }
  21. }
  22. }

实现方式三:实现Callable接口

  • 需要返回值类型
  • 重写call方法,需要抛出异常
  • 创建目标对象
  • 创建执行服务ExecutorService ser = Executors.newFixedThreadPool(1);
  • 提交执行
    Futureresult1=ser.submit(t1);
  • 获取结果
    boolean r1 = result1.get()
  • 关闭服务
    ser.shutdownNow(); ```java package com.faq.demo02;

import org.apache.commons.io.FileUtils;//别人的类

import java.io.File; import java.io.IOException; import java.net.URL; import java.util.concurrent.*;

//线程创建方式三:实现Callable接口 /* Callable的好处: 1、可以定义返回值 2、可以抛出异常

*/ public class TestCallable implements Callable{ private String url;//网络图片地址 private String name;//保存的文件名

  1. public TestCallable(String url,String name){
  2. this.url=url;
  3. this.name=name;
  4. }
  5. //线程的执行体
  6. @Override
  7. public Boolean call(){
  8. WebDownloader webDownloader = new WebDownloader();
  9. webDownloader.downloader(url,name);
  10. System.out.println("下载了文件名为:"+name);
  11. return true;
  12. }
  13. public static void main(String[] args) throws ExecutionException, InterruptedException {
  14. TestCallable t1 = new TestCallable("https://img-home.csdnimg.cn/images/20210114101935.png","1.jpg");
  15. TestCallable t2 = new TestCallable("https://img-home.csdnimg.cn/images/20201125054322.jpg","2.jpg");
  16. TestCallable t3 = new TestCallable("https://img-bss.csdn.net/1609984629114.jpg","3.jpg");
  17. //创建执行服务:
  18. ExecutorService ser = Executors.newFixedThreadPool(3);
  19. //提交执行:
  20. Future<Boolean> result1=ser.submit(t1);
  21. Future<Boolean> result2=ser.submit(t2);
  22. Future<Boolean> result3=ser.submit(t3);
  23. //获取结果:
  24. boolean r1 = result1.get();
  25. boolean r2 = result2.get();
  26. boolean r3 = result3.get();
  27. //打印返回值
  28. System.out.println(r1);
  29. System.out.println(r2);
  30. System.out.println(r3);
  31. //关闭服务:
  32. ser.shutdownNow();
  33. }

}

//下载器 class WebDownloader{ //下载方法 public void downloader(String url,String name){ try { FileUtils.copyURLToFile(new URL(url),new File(name));//调用类中方法 } catch (IOException e) { e.printStackTrace(); System.out.println(“IO异常,downloader方法出现问题”); } }

} 输出: 下载了文件名为:1.jpg 下载了文件名为:3.jpg 下载了文件名为:2.jpg true true true ```

2.多线程同步

同步解决的问题:每个线程在自己的工作内存交互,内存控制不当会造成数据不一致

3.多线程通信