课程背景

作者简介

一名普通的一线程序员,从实践的角度给大家分享JUC多线程编程.

学习场景

上班路上看视频,到了公司跟着笔记边写边思考,不知不觉成高手!

课程概要

1.适合人群最好是有一年java编程经验,并不是说这个课程有多难,而是对业务和应用场景的理解;
2.课程面向有一定经验的童鞋,因此课程中不会手把手教你写代码,重点是分析JUC的实际开发应用场景;
3.课程是基于自己的实际开发经验和网络资料学习总结而得出的,必然存在不足,仅供大家学习参考,
如果大家在学习的过程中发现有不足的地方,希望能给予指出;

配套视频列表

image.png

配套讲解源码

image.png

1.实际开发中的多线程场景

为了大家学习不空洞,我们先看几个实际开发中的应用案例!

  • 1.请求合并

https://www.yuque.com/java51/avi/murov6

  • 2.支付网关的对账单下载

https://www.yuque.com/java51/avi/dymvgd

  • 3.接口压力测试

https://www.yuque.com/java51/avi/osh2n2

使用到多线程的核心技术点:

1.减少计数:CountDownLatch 2.CompletableFuture接口 3.阻塞队列 4.ThreadPool线程池

2.回顾多线程基础

没有多线程基础的童鞋请先学习这个基础
课件:https://www.cnblogs.com/newAndHui/p/12748055.html

lock锁扩展

image.png
synchronized

  1. public class TicketSynchronized {
  2. // 总是量
  3. public static Integer num = 100;
  4. public synchronized void sale() {
  5. String name = Thread.currentThread().getName();
  6. ThreadUtil.sleep(1);
  7. if (num > 0) {
  8. System.out.println(name + " 售出第:" + num);
  9. num--;
  10. } else {
  11. System.out.println(name + " 剩余票为:" + num);
  12. }
  13. }
  14. }

lock

  1. public class TicketLock {
  2. // 总是量
  3. public static Integer num = 100;
  4. //创建可重入锁
  5. private final ReentrantLock lock = new ReentrantLock();
  6. public void sale() {
  7. String name = Thread.currentThread().getName();
  8. lock.lock();
  9. try {
  10. if (num > 0) {
  11. ThreadUtil.sleep(1);
  12. System.out.println(name + " 售出第:" + num);
  13. num--;
  14. } else {
  15. System.out.println(name + " 剩余票为:" + num);
  16. }
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
  21. }

测试

  1. public class Test01 {
  2. public static void main(String[] args) {
  3. // TicketSynchronized ticket = new TicketSynchronized();
  4. TicketLock ticket = new TicketLock();
  5. new Thread(() -> {
  6. for (int i = 0; i < 100; i++) {
  7. ticket.sale();
  8. }
  9. }, "窗口01").start();
  10. new Thread(() -> {
  11. for (int i = 0; i < 100; i++) {
  12. ticket.sale();
  13. }
  14. }, "窗口02").start();
  15. new Thread(() -> {
  16. for (int i = 0; i < 100; i++) {
  17. ticket.sale();
  18. }
  19. }, "窗口03").start();
  20. new Thread(() -> {
  21. for (int i = 0; i < 100; i++) {
  22. ticket.sale();
  23. }
  24. }, "窗口04").start();
  25. new Thread(() -> {
  26. for (int i = 0; i < 100; i++) {
  27. ticket.sale();
  28. }
  29. }, "窗口05").start();
  30. }
  31. }

线程间通信

普通通信,存钱取钱交互进行

  1. public class Money {
  2. public static int num = 0;
  3. // 创建可重入锁
  4. private ReentrantLock lock = new ReentrantLock();
  5. // 创建通知对象
  6. private Condition condition = lock.newCondition();
  7. /**
  8. * 存钱
  9. */
  10. public void increase() {
  11. String name = Thread.currentThread().getName();
  12. lock.lock();
  13. try {
  14. ThreadUtil.sleep(1);
  15. if (num <= 0) {
  16. num += 1000;
  17. System.out.println(name + " 存入1000,通知其他线程取钱,当前余额:num=" + num);
  18. // 通知其他线程取钱
  19. condition.signalAll();
  20. } else {
  21. // 等待
  22. System.out.println(name + " 当前不差钱,等待...");
  23. condition.await();
  24. }
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. } finally {
  28. lock.unlock();
  29. }
  30. }
  31. /**
  32. * 取钱
  33. */
  34. public void reduce() {
  35. String name = Thread.currentThread().getName();
  36. lock.lock();
  37. try {
  38. ThreadUtil.sleep(1);
  39. if (num > 0) {
  40. num -= 1000;
  41. System.out.println(name + " 取走 1000,通知其他线程 存钱,当前余额:" + num);
  42. // 通知其他线程存钱
  43. condition.signalAll();
  44. } else {
  45. // 等待
  46. System.out.println(name + " 当前余额为零,不能取钱,等待----");
  47. condition.await();
  48. }
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. } finally {
  52. lock.unlock();
  53. }
  54. }
  55. }

测试:

  1. public class Test01 {
  2. public static void main(String[] args) {
  3. Money money = new Money();
  4. new Thread(() -> {
  5. for (int i = 0; i < 30; i++) {
  6. money.reduce();
  7. }
  8. }, "取钱窗口01").start();
  9. new Thread(() -> {
  10. for (int i = 0; i < 30; i++) {
  11. money.reduce();
  12. }
  13. }, "取钱窗口02").start();
  14. new Thread(() -> {
  15. for (int i = 0; i < 30; i++) {
  16. money.reduce();
  17. }
  18. }, "取钱窗口03").start();
  19. new Thread(() -> {
  20. for (int i = 0; i < 30; i++) {
  21. money.reduce();
  22. }
  23. }, "取钱窗口04").start();
  24. new Thread(() -> {
  25. for (int i = 0; i < 30; i++) {
  26. money.increase();
  27. }
  28. }, "存钱窗口01").start();
  29. new Thread(() -> {
  30. for (int i = 0; i < 30; i++) {
  31. money.increase();
  32. }
  33. }, "存钱窗口02").start();
  34. new Thread(() -> {
  35. for (int i = 0; i < 30; i++) {
  36. money.increase();
  37. }
  38. }, "存钱窗口03").start();
  39. }
  40. }

定制化通信

  1. public class Money {
  2. // 存钱窗口 名称 默认为:存钱窗口01
  3. public static String windowName = "存钱窗口01";
  4. // 创建可重入锁
  5. private ReentrantLock lock = new ReentrantLock();
  6. // 通知对象 存钱窗口01
  7. private Condition condition1 = lock.newCondition();
  8. // 通知对象 存钱窗口02
  9. private Condition condition2 = lock.newCondition();
  10. // 通知对象 存钱窗口03
  11. private Condition condition3 = lock.newCondition();
  12. /**
  13. * 存钱
  14. */
  15. public void increase5(int num) {
  16. String name = Thread.currentThread().getName();
  17. lock.lock();
  18. try {
  19. ThreadUtil.sleep(1);
  20. while (!name.equals(windowName)) {
  21. // 等待
  22. System.out.println(name + " 等待...,当前:" + windowName + " 正在操作");
  23. condition1.await(); // 当前线程被唤醒时会接着 后面的代码执行
  24. }
  25. // 处理业务
  26. for (int i = 1; i <= 5; i++) {
  27. System.out.println(name + "--" + i + " 第:" + num + "次循环");
  28. }
  29. // 通知 存钱窗口02 存钱
  30. windowName = "存钱窗口02";
  31. condition2.signal();
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
  38. /**
  39. * 存钱
  40. */
  41. public void increase10(int num) {
  42. String name = Thread.currentThread().getName();
  43. lock.lock();
  44. try {
  45. ThreadUtil.sleep(1);
  46. while (!name.equals(windowName)) {
  47. // 等待
  48. System.out.println(name + " 等待...,当前:" + windowName + " 正在操作");
  49. condition2.await();
  50. }
  51. // 处理业务
  52. for (int i = 1; i <= 10; i++) {
  53. System.out.println(name + "--" + i + " 第:" + num + "次循环");
  54. }
  55. // 通知 存钱窗口02 存钱
  56. windowName = "存钱窗口03";
  57. condition3.signal();
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. } finally {
  61. lock.unlock();
  62. }
  63. }
  64. /**
  65. * 存钱
  66. */
  67. public void increase15(int num) {
  68. String name = Thread.currentThread().getName();
  69. lock.lock();
  70. try {
  71. ThreadUtil.sleep(1);
  72. while (!name.equals(windowName)) {
  73. // 等待
  74. System.out.println(name + " 等待...,当前:" + windowName + " 正在操作");
  75. condition3.await();
  76. }
  77. // 处理业务
  78. for (int i = 1; i <= 15; i++) {
  79. System.out.println(name + "--" + i + " 第:" + num + "次循环");
  80. }
  81. // 通知 存钱窗口01 存钱
  82. windowName = "存钱窗口01";
  83. condition1.signal();
  84. } catch (InterruptedException e) {
  85. e.printStackTrace();
  86. } finally {
  87. lock.unlock();
  88. }
  89. }
  90. }

测试

  1. /**
  2. * @Copyright (C) XXXXXXXXXXX科技股份技有限公司
  3. * @Author: lidongping
  4. * @Date: 2021-09-15 18:30
  5. * @Description: <p>
  6. * 注意必须依次执行
  7. * 需求窗口依次存钱(5轮循环)
  8. * 存钱窗口01:每轮,循环5次
  9. * 存钱窗口02:每轮,循环10次
  10. * 存钱窗口03:每轮,循环15次
  11. *
  12. * </p>
  13. */
  14. public class test01 {
  15. public static void main(String[] args) {
  16. Money money = new Money();
  17. // 注意这里的 编写顺序无关
  18. new Thread(() -> {
  19. for (int i = 1; i <= 5; i++) {
  20. money.increase5(i);
  21. }
  22. }, "存钱窗口01").start();
  23. new Thread(() -> {
  24. for (int i = 1; i <= 5; i++) {
  25. money.increase10(i);
  26. }
  27. }, "存钱窗口02").start();
  28. new Thread(() -> {
  29. for (int i = 1; i <= 5; i++) {
  30. money.increase15(i);
  31. }
  32. }, "存钱窗口03").start();
  33. }
  34. }

lock 与 synchronized 区别

1.synchronized是Java中的关键字,Lock是一个接口;
2.通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
3.Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断;
4.synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;
而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁;
5.Lock 可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源
非常激烈时,此时 Lock 的性能要远远优于synchronized。

3.JUC概述

JUC就是java.util .concurrent工具包的简称。这是一个处理线程的工具包。
image.png

3.1.重要的3大辅助类

减少计数:CountDownLatch
循环栅栏:CyclicBarrier
信号灯:Semaphore
减少计数:CountDownLatch案例与理解

  1. package demo06;
  2. import common.ThreadUtil;
  3. import org.junit.Test;
  4. import java.util.concurrent.CountDownLatch;
  5. /**
  6. * @author 姿势帝-博客园
  7. * @address https://www.cnblogs.com/newAndHui/
  8. * @WeChat 851298348
  9. * @create 09/25 10:48
  10. * @description <p>
  11. * 减少计数
  12. * CountDownLatch
  13. * CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行
  14. * 减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法
  15. * 之后的语句。
  16. * 特点:
  17. * 1.CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这些线程会阻塞
  18. * 2. 其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程不会阻塞)
  19. * 3.当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行
  20. * </p>
  21. */
  22. public class CountDownLatchTest {
  23. /**
  24. * 在前的线程测试中如果使用单元测试,为了让子线程不被中断,我们都要在结束的地方使用线程睡眠
  25. * 下面的案例,我们使用CountDownLatch,让所有子线程执行完后,在结束主线程
  26. *
  27. * @throws InterruptedException
  28. */
  29. @Test
  30. public void test01() throws InterruptedException {
  31. int num = 10;
  32. CountDownLatch countDownLatch = new CountDownLatch(num);
  33. for (int i = 0; i < num; i++) {
  34. new Thread(() -> {
  35. ThreadUtil.sleep(1);
  36. System.out.println(Thread.currentThread().getName() + "..执行完成");
  37. // 计数器减一
  38. countDownLatch.countDown();
  39. }, "线程" + i).start();
  40. }
  41. System.out.println("主线程等待....");
  42. // await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。
  43. countDownLatch.await();
  44. System.out.println("子线程已执行完成,可以关闭了");
  45. }
  46. /**
  47. * 模拟并发请求,做压力测试
  48. * 在实际开发中我们开发完成接口后,通常会对接口做简单的压力测试
  49. * 那么如何自己写一个简单快速的压力单元测试呢?
  50. *
  51. * @throws InterruptedException
  52. */
  53. @Test
  54. public void test02() throws InterruptedException {
  55. // 并发请求数
  56. int num = 200;
  57. CountDownLatch countDownLatch = new CountDownLatch(num);
  58. for (int i = 0; i < num; i++) {
  59. // 计数器减一
  60. countDownLatch.countDown();
  61. new Thread(() -> {
  62. try {
  63. System.out.println(Thread.currentThread().getName() + "..准备中,等待发出请求");
  64. ThreadUtil.sleep(5);
  65. // 等待计数器归零
  66. countDownLatch.await();
  67. System.out.println(Thread.currentThread().getName() + "..发出http请求");
  68. } catch (InterruptedException e) {
  69. e.printStackTrace();
  70. }
  71. }, "线程" + i).start();
  72. }
  73. // 等待子线程执行完成,如果使用main方法不需要睡眠
  74. ThreadUtil.sleep(20);
  75. }
  76. }

循环栅栏:CyclicBarrier案例与理解

  1. public class CyclicBarrierTest {
  2. /**
  3. * 实际在开发中的应用场景我还没遇到,
  4. * 若果大家在开发中遇到了可以分享一下
  5. * @param args
  6. */
  7. public static void main(String[] args) {
  8. int num = 5;
  9. CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
  10. System.out.println(Thread.currentThread().getName() + "::所有子线程前部分已完成.......");
  11. });
  12. for (int i = 1; i <= num; i++) {
  13. new Thread(() -> {
  14. try {
  15. ThreadUtil.sleep(1);
  16. System.out.println(Thread.currentThread().getName() + "..前部分执行完成");
  17. // 等待其他子线程的前部分都执行完成后在执行后面的
  18. cyclicBarrier.await();
  19. System.out.println(Thread.currentThread().getName() + "..await 后执行");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } catch (BrokenBarrierException e) {
  23. e.printStackTrace();
  24. }
  25. }, "线程" + i).start();
  26. }
  27. }
  28. }

执行结果:

线程1..前部分执行完成 线程4..前部分执行完成 线程2..前部分执行完成 线程5..前部分执行完成 线程3..前部分执行完成 线程3::所有子线程前部分已完成……. (注:特别注意z) 线程3..await 后执行 线程1..await 后执行 线程2..await 后执行 线程4..await 后执行 线程5..await 后执行

信号灯:Semaphore案例与理解

  1. public class SemaphoreTest {
  2. /**
  3. * 场景 20个线程,抢占3个数据库线程池链接
  4. *
  5. * @param args
  6. */
  7. public static void main(String[] args) {
  8. int numThread = 50;
  9. int numData = 3;
  10. Semaphore semaphore = new Semaphore(numData);
  11. for (int i = 1; i <= numThread; i++) {
  12. new Thread(() -> {
  13. try {
  14. // 获取到数据库链接
  15. semaphore.acquire();
  16. System.out.println(Thread.currentThread().getName() + "..获取到数据库链接,正在使用");
  17. // 模拟使用时间,随机0到5秒
  18. ThreadUtil.sleep(new Random().nextInt(5));
  19. System.out.println(Thread.currentThread().getName() + "..使用完成");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } finally {
  23. // 释放链接
  24. semaphore.release();
  25. }
  26. }, "线程:" + i).start();
  27. }
  28. }
  29. }

更多的含义和用法大家可以看API文档
https://tool.oschina.net/apidocs/apidoc?api=jdk-zh

4.阻塞队列

  1. package demo08;
  2. import org.junit.Test;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.BlockingQueue;
  5. /**
  6. * @author 姿势帝-博客园
  7. * @address https://www.cnblogs.com/newAndHui/
  8. * @WeChat 851298348
  9. * @create 09/25 6:23
  10. * @description <p>
  11. * jdk文档:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh
  12. * 阻塞队列
  13. * <p>
  14. * 类别 抛出异常 特殊值 阻塞 超时
  15. * 插入 add(e) offer(e) put(e) offer(e, time, unit)
  16. * 取出 remove() poll() take() poll(time, unit)
  17. * 检查 element() peek() 不可用 不可用
  18. * </p>
  19. */
  20. public class BlockingQueueTest {
  21. /**
  22. * add 与 remove 方法测试
  23. * 特点超出报错
  24. */
  25. @Test
  26. public void test01() {
  27. System.out.println("队列测试开始....");
  28. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
  29. // 放入队列
  30. blockingQueue.add("AA");
  31. blockingQueue.add("BB");
  32. blockingQueue.add("CC");
  33. // 最多 放3个,多于直接报错 java.lang.IllegalStateException: Queue full
  34. //blockingQueue.add("DD");
  35. // 取出队列
  36. System.out.println(blockingQueue.remove());
  37. System.out.println(blockingQueue.remove());
  38. System.out.println(blockingQueue.remove());
  39. // 最多 取3个,多于直接报错 java.util.NoSuchElementException
  40. System.out.println(blockingQueue.remove());
  41. System.out.println("队列测试结束.");
  42. }
  43. /**
  44. * offer 与 poll 方法测试
  45. * 正常放入返回 true;
  46. * 无数据时返回 null
  47. */
  48. @Test
  49. public void test02() {
  50. System.out.println("队列测试开始....");
  51. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
  52. // 放入队列
  53. boolean aa = blockingQueue.offer("AA");
  54. System.out.println(" aa=" + aa);
  55. boolean bb = blockingQueue.offer("BB");
  56. System.out.println(" bb=" + bb);
  57. boolean cc = blockingQueue.offer("CC");
  58. System.out.println(" cc=" + cc);
  59. // 最多 放3个,超出返回 false
  60. boolean dd = blockingQueue.offer("DD");
  61. System.out.println(" dd=" + dd);
  62. // 取出队列
  63. System.out.println(blockingQueue.poll());
  64. System.out.println(blockingQueue.poll());
  65. System.out.println(blockingQueue.poll());
  66. // 最多 取3个,超出返回 null
  67. System.out.println(blockingQueue.poll());
  68. System.out.println("队列测试结束.");
  69. }
  70. /**
  71. * put 与 take 方法测试
  72. * 产生阻塞
  73. */
  74. @Test
  75. public void test03() throws InterruptedException {
  76. System.out.println("队列测试开始....");
  77. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
  78. // 放入队列
  79. blockingQueue.put("AA");
  80. System.out.println("放入AA完成");
  81. blockingQueue.put("BB");
  82. System.out.println("放入BB完成");
  83. blockingQueue.put("CC");
  84. System.out.println("放入CC完成");
  85. // 超出后会 阻塞
  86. // blockingQueue.put("DD");
  87. // System.out.println("放入DD完成");
  88. // 取出队列
  89. System.out.println(blockingQueue.take());
  90. System.out.println(blockingQueue.take());
  91. System.out.println(blockingQueue.take());
  92. // 超出会阻塞
  93. System.out.println(blockingQueue.take());
  94. System.out.println("队列测试结束.");
  95. }
  96. }

5.ThreadPool线程池

5.1.线程池创建

Executors创建线程池

  1. package demo09;
  2. import common.ThreadUtil;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. /**
  6. * @Copyright (C) XXXXXXXXXXX科技股份技有限公司
  7. * @Author: lidongping
  8. * @Date: 2021-10-02 9:32
  9. * @Description: <p>
  10. * 创建线程池
  11. * api:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh
  12. * </p>
  13. */
  14. public class ThreadPoolTest01 {
  15. /**
  16. * 线程池的创建
  17. * <p>
  18. * 特备注意:以下代码只是学习API练习的,实际生产中都不会这样使用!!!!
  19. *
  20. * @param args
  21. */
  22. public static void main(String[] args) {
  23. // 一池一线程
  24. // 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
  25. ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  26. execute(singleThreadExecutor, "一池一线程");
  27. // 一池三线程
  28. // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
  29. ExecutorService threadPool01 = Executors.newFixedThreadPool(3);
  30. execute(threadPool01, "一池三线程");
  31. // 一池无线多线程
  32. // 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
  33. ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
  34. execute(newCachedThreadPool, "一池无限多线程");
  35. }
  36. private static void execute(ExecutorService threadPool, String message) {
  37. for (int i = 0; i < 10; i++) {
  38. threadPool.execute(() -> {
  39. System.out.println(message + " 线程名称 [" + Thread.currentThread().getName() + "] 正在执行.....");
  40. ThreadUtil.sleep(2);
  41. System.out.println(message + " 线程名称 [" + Thread.currentThread().getName() + "] 执行完成!");
  42. });
  43. }
  44. threadPool.shutdown();
  45. }
  46. }

ThreadPoolExecutor创建线程池

  1. package demo09;
  2. import common.ThreadUtil;
  3. import java.util.Random;
  4. import java.util.concurrent.*;
  5. /**
  6. * @Copyright (C) XXXXXXXXXXX科技股份技有限公司
  7. * @Author: lidongping
  8. * @Date: 2021-09-26 19:04
  9. * @Description:
  10. */
  11. public class ThreadPoolTest02 {
  12. /**
  13. * 生产线程池的使用
  14. * <p>
  15. * 重要结论
  16. * 1.当任务来时先创建核心线程数;
  17. * 2.当核心线程是使用完后,如果还有需要处理的任务将进入队列;
  18. * 3.当队列满了的时候,在增加线程到最大线程数;
  19. * 4.当核心线程数满了,队列也满了,最大线程数也满了,这时会使用拒绝策略!
  20. *
  21. * @param args
  22. */
  23. public static void main(String[] args) {
  24. // 核心线程数,常用线程数
  25. int corePoolSize = 3;
  26. // 最大线程数
  27. int maximumPoolSize = 5;
  28. // 释放非核心线程数的空闲时间
  29. long keepAliveTime = 5L;
  30. // 空闲时间单位
  31. TimeUnit unit = TimeUnit.SECONDS;
  32. // 待执行任务的队列
  33. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2000);
  34. // 创建线程的工程类
  35. ThreadFactory threadFactory = Executors.defaultThreadFactory();
  36. // 等队列满后的拒绝策略
  37. RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
  38. ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
  39. System.out.println("开始多线程执行任务");
  40. for (int i = 1; i <= 50; i++) {
  41. final int num = i;
  42. try {
  43. //取一个线程并执行任务
  44. poolExecutor.execute(() -> {
  45. System.out.println(Thread.currentThread().getName() + "正在执行任务...." + num);
  46. ThreadUtil.sleep(new Random().nextInt(5));
  47. System.out.println(Thread.currentThread().getName() + "任务完成.." + num);
  48. });
  49. } catch (Exception e) {
  50. System.out.println("任务执行失败:" + num + "--" + e.getMessage());
  51. }
  52. }
  53. // 关闭线程,当所有子线程执行完成后会关闭(异步的)
  54. poolExecutor.shutdown();
  55. System.out.println("所有任务结束");
  56. }
  57. }

5.2.线程池实现的底层原理

实现原理图
image.png

线程池的核心要点

  1. 初始化的线程池时,线程数为0;

  2. 当调用 execute()方法执行一个新的任务时(获取线程)有如下4种情况: a. 如果正在运行的线程数量小于 corePoolSize(核心线程数),则会立即运行这个任务; b.如果正在运行的线程数量大于或等于 corePoolSize,则会将这个任务放入队列进行排队处理; c.如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize(最大线程数),则会新建线程执行当前任务; d.如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动拒绝策略来执行。

  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时: a.如果当前运行的线程数大于 corePoolSize,那么这个线程就会被回收。 b. 线程池的所有任务完成后,线程池中的线程数为corePoolSize。

5.3.线程池的4种拒绝策略

1.ThreadPoolExecutor.AbortPolicy:用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException.

2.ThreadPoolExecutor.CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。

3.ThreadPoolExecutor.DiscardOldestPolicy:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。

4.ThreadPoolExecutor.DiscardPolicy:用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

6.Fork与Join

6.1.简介

Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并在一起;
重点掌握:
1.如何定义一个任务对象;task extends RecursiveTask
2.如何拆分任务;task.fork()
3.如何合并任务;task.join()
4.如何创建执行任务的线程池; forkJoinPool = new ForkJoinPool(20);
5.如何将任务放入线程池运行;forkJoinPool.submit(task);
6.如何获取合并后的结果;task.get();
7.如何释放资源; forkJoinPool.shutdown();

6.2.API介绍

image.png
1.有反回的ForkJoinTask
image.png
2.无返回的ForkJoinTask
image.png
3.线程池对象,ForkJoinPool
image.png

6.3.Fork/Join使用案例

案例1:使用分支合并计算1+2+3…..10000; 案例2:使用分支合并统计支付宝对账文件的差异订单; 案例3:使用分支合并的方式给一个数组中的每个元素乘2;

案例1:使用分支合并计算1+2+3…..10000;
统计对象

  1. package demo10;
  2. import java.util.concurrent.RecursiveTask;
  3. /**
  4. * @Copyright (C) XXXXXXXXXXX科技股份技有限公司
  5. * @Author: lidongping
  6. * @Date: 2021-10-02 15:06
  7. * @Description: <p>
  8. * 需求:分开计算1+2+3+4+5.....+10000,每20个作为一组计算,最后合并在一起,得出总的结果;
  9. * </p>
  10. */
  11. public class StatisticsDemo extends RecursiveTask<Integer> {
  12. // 每组计算个数
  13. private static final Integer num = 20;
  14. // 每组的开始值
  15. private Integer startNum;
  16. // 每组的结束值
  17. private Integer endNum;
  18. // 计算结果
  19. private Integer result = 0;
  20. public StatisticsDemo(Integer startNum, Integer endNum) {
  21. this.startNum = startNum;
  22. this.endNum = endNum;
  23. }
  24. @Override
  25. protected Integer compute() {
  26. if ((endNum - startNum) <= num) {
  27. System.out.println(Thread.currentThread().getName() + " -- " + startNum + " 到 " + endNum);
  28. // 直接计算
  29. for (int i = startNum; i <= endNum; i++) {
  30. // 模拟计算用时
  31. // ThreadUtil.sleep(1);
  32. result = result + i;
  33. }
  34. } else {
  35. // 进行拆分
  36. Integer middle = (startNum + endNum) / 2;
  37. // System.out.println(Thread.currentThread().getName() + " 进行拆分 " + startNum + " 到 " + endNum + " middle=" + middle);
  38. // 左边
  39. StatisticsDemo left = new StatisticsDemo(startNum, middle);
  40. // 右边
  41. StatisticsDemo right = new StatisticsDemo(middle + 1, endNum);
  42. // 进行拆分
  43. left.fork();
  44. right.fork();
  45. // 合并结果
  46. result = left.join() + right.join();
  47. }
  48. return result;
  49. }
  50. }

测试对象

  1. /**
  2. * 测试:统计1+2+3...10000
  3. *
  4. * @param args
  5. * @throws Exception
  6. */
  7. public static void main(String[] args) throws Exception {
  8. System.out.println(new Date());
  9. // 创建统计对象 统计1+2+3...10000
  10. StatisticsDemo statisticsDemo = new StatisticsDemo(1, 10000);
  11. // 创建分支合并池 5个线程计算,然后合并
  12. ForkJoinPool forkJoinPool = new ForkJoinPool(5);
  13. ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(statisticsDemo);
  14. // 取结果
  15. Integer result = forkJoinTask.get();
  16. System.out.println("结果值:" + result);
  17. forkJoinPool.shutdown();
  18. System.out.println(new Date());
  19. }

测试结果

………………………. ForkJoinPool-1-worker-2 — 8791 到 8810 ForkJoinPool-1-worker-2 — 8811 到 8829 ForkJoinPool-1-worker-2 — 8771 到 8790 ForkJoinPool-1-worker-1 — 8751 到 8770 结果值:50005000

案例2:使用分支合并统计支付宝对账文件的差异订单;
任务对象

  1. package demo10;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.concurrent.ForkJoinTask;
  6. import java.util.concurrent.RecursiveTask;
  7. /**
  8. * @author 姿势帝-博客园
  9. * @address https://www.cnblogs.com/newAndHui/
  10. * @WeChat 851298348
  11. * @create 01/07 2:45
  12. * @description <p>
  13. * 分支合并计算
  14. * 找出支付宝对账文件中多有差异的订单
  15. * 为了便于测试作如下假设,实际情况中应到数据库查询
  16. * 假设订单尾号为1表示:支付宝中有,但系统中没有的订单,即:支付宝订单多
  17. * 假设订单尾号为2表示:系统中有,但支付宝中没有的订单,即:系统订单多
  18. * 假设订单尾号为3表示:收款金额不一致
  19. * </p>
  20. */
  21. public class ComparisonDemo extends RecursiveTask<List<String>> {
  22. // 需要统计的订单
  23. List<Map<String, Object>> list;
  24. // 统计的类型,1-支付宝订单多,2-系统订单多,3-收款金额不一致
  25. private String type;
  26. // 每次统计条数(用来判定是否需要拆分任务)
  27. int count;
  28. public ComparisonDemo(List<Map<String, Object>> list, Integer pageSize, String type) {
  29. this.list = list;
  30. this.type = type;
  31. this.count = pageSize;
  32. }
  33. @Override
  34. protected List<String> compute() {
  35. List<String> resultList = new ArrayList<>();
  36. int size = list.size();
  37. if (size <= count) {
  38. // 直接统计
  39. for (Map<String, Object> orderMap : list) {
  40. //System.out.println("线程名称:" + Thread.currentThread().getName());
  41. String orderNo = orderMap.get("orderNo").toString();
  42. // 取订单号的最后一位
  43. String suffix = orderNo.substring(orderNo.length() - 1);
  44. // 如果订单最后一位与类型一致,放入返回的集合
  45. if (type.equals(suffix)) {
  46. resultList.add(orderNo);
  47. }
  48. }
  49. } else {
  50. // 中间值
  51. int middle = size / 2;
  52. // 拆左边
  53. ComparisonDemo left = new ComparisonDemo(list.subList(0, middle), count, type);
  54. ForkJoinTask<List<String>> forkLeft = left.fork();
  55. // 拆右边
  56. ComparisonDemo right = new ComparisonDemo(list.subList(middle, size), count, type);
  57. ForkJoinTask<List<String>> forkRight = right.fork();
  58. // 合并
  59. List<String> joinLeft = forkLeft.join();
  60. List<String> joinRight = forkRight.join();
  61. resultList.addAll(joinLeft);
  62. resultList.addAll(joinRight);
  63. }
  64. // 返回最后的结果
  65. return resultList;
  66. }
  67. }

测试代码

  1. package demo10;
  2. import cn.hutool.core.io.FileUtil;
  3. import cn.hutool.core.util.RandomUtil;
  4. import cn.hutool.poi.excel.ExcelReader;
  5. import cn.hutool.poi.excel.ExcelUtil;
  6. import cn.hutool.poi.excel.ExcelWriter;
  7. import org.junit.Test;
  8. import java.util.ArrayList;
  9. import java.util.HashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.concurrent.ExecutionException;
  13. import java.util.concurrent.ForkJoinPool;
  14. import java.util.concurrent.ForkJoinTask;
  15. /**
  16. * @author 姿势帝-博客园
  17. * @address https://www.cnblogs.com/newAndHui/
  18. * @WeChat 851298348
  19. * @create 01/07 4:49
  20. * @description
  21. */
  22. public class ComparisonTest {
  23. /**
  24. * 模拟生成对账文件
  25. */
  26. @Test
  27. public void crateBillFile() {
  28. ExcelWriter writer = ExcelUtil.getWriter("F:\\test\\t40.xlsx");
  29. List<Map<String, String>> orderList = new ArrayList<>();
  30. for (int i = 0; i < 400000; i++) {
  31. Map<String, String> order = new HashMap<>();
  32. order.put("orderNo", "ON" + System.currentTimeMillis() + i);
  33. order.put("money", RandomUtil.randomInt(1, 999) + "");
  34. order.put("state", "0");
  35. orderList.add(order);
  36. }
  37. writer.write(orderList);
  38. // 必须加入这行才会真实的写入磁盘
  39. writer.flush();
  40. System.out.println("模拟生成对账文件结束");
  41. }
  42. /**
  43. * 使用合并框架
  44. * 使用合并框架耗时:19
  45. *
  46. * @param args
  47. * @throws ExecutionException
  48. * @throws InterruptedException
  49. */
  50. public static void main(String[] args) throws ExecutionException, InterruptedException {
  51. // 读取对账文件
  52. ExcelReader reader = ExcelUtil.getReader(FileUtil.file("F:\\test\\t1.xlsx"));
  53. List<Map<String, Object>> list = reader.readAll();
  54. long startTime = System.currentTimeMillis();
  55. // 创建执行任务对象
  56. ComparisonDemo comparisonDemo = new ComparisonDemo(list, 1000, "1");
  57. // 创建分支合并线程池
  58. ForkJoinPool forkJoinPool = new ForkJoinPool(20);
  59. // 提交执行任务
  60. ForkJoinTask<List<String>> task = forkJoinPool.submit(comparisonDemo);
  61. // 获取执行结果
  62. List<String> orderList = task.get();
  63. // 关闭线程池
  64. forkJoinPool.shutdown();
  65. long endTime = System.currentTimeMillis();
  66. System.out.println("使用合并框架耗时:" + (endTime - startTime));
  67. System.out.println("orderList:" + orderList);
  68. }
  69. /**
  70. * 普通统计
  71. * 普通统计耗时:68
  72. */
  73. @Test
  74. public void test01() {
  75. // 读取对账文件
  76. ExcelReader reader = ExcelUtil.getReader(FileUtil.file("F:\\test\\t40.xlsx"));
  77. List<Map<String, Object>> list = reader.readAll();
  78. long startTime = System.currentTimeMillis();
  79. List<String> orderList = new ArrayList<>();
  80. for (Map<String, Object> orderMap : list) {
  81. String orderNo = orderMap.get("orderNo").toString();
  82. // 取订单号的最后一位
  83. String suffix = orderNo.substring(orderNo.length() - 1);
  84. // 如果订单最后一位与类型一致,放入返回的集合
  85. if ("1".equals(suffix)) {
  86. orderList.add(orderNo);
  87. }
  88. }
  89. long endTime = System.currentTimeMillis();
  90. System.out.println("普通统计耗时:" + (endTime - startTime));
  91. System.out.println("orderList:" + orderList);
  92. }
  93. }

案例3:使用分支合并的方式给一个数组中的每个元素乘2;
请自己完成!

7.Callable与Future接口

7.1.结构关系

image.png
之前我们学习了创建线程的2中方式
1.继承Thread类
2.实现Runnable接口
这两种方式都无法获取到线程执行的结果
今天我们学习第三种方式:
Java 中提供的Callable接口,线程执行完成后可以获取到执行结果

7.2.Callable与Runnable接口特点:

image.png
1.实现Callable接口的 call()方法。
2.call()方法可以引发异常,而 run()则不能。
3.不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable

7.3.应用场景与特点

1.在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些
作业交给 Future 对象在后台完成
2.当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执
行状态
3.一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去
获取结果。
4.仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法
5.一旦计算完成,就不能再重新开始或取消计算
6.get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完
成状态,然后会返回结果或者抛出异常
7.get 只计算一次,因此 get 方法放到最后

注意:以上来自网络与个人经验总结的几个要点,其实还有很多很多知识点,这些大家不需要背诵, 大家可以先写代码,体会一下,然后在结合自己的业务场景或者看看API、网络搜索一下,毕竟自己总结的才是自己的

7.4.案例演示

1.创建无返回结果的线程

  1. public class ThreadRunnable implements Runnable {
  2. @Override
  3. public void run() {
  4. System.out.println(Thread.currentThread().getName() + ".." + "无返回的run()方法");
  5. }
  6. }

2.创建有返回结果的线程

  1. public class ThreadCallable implements Callable<String> {
  2. @Override
  3. public String call() throws Exception {
  4. System.out.println(Thread.currentThread().getName() + ".." + "有返回的call()方法");
  5. ThreadUtil.sleep(5);
  6. return "我有结果";
  7. }
  8. }

3.测试

  1. package demo05;
  2. import common.ThreadUtil;
  3. import org.junit.Test;
  4. import java.util.concurrent.ExecutionException;
  5. import java.util.concurrent.FutureTask;
  6. /**
  7. * @author 姿势帝-博客园
  8. * @address https://www.cnblogs.com/newAndHui/
  9. * @WeChat 851298348
  10. * @create 09/25 10:15
  11. * @description <p>
  12. * Callable&Future 接口 演示
  13. * </p>
  14. */
  15. public class CallableTest {
  16. /**
  17. * 调用 runnable 的多线程
  18. */
  19. @Test
  20. public void test01() {
  21. ThreadRunnable threadRunnable = new ThreadRunnable();
  22. new Thread(threadRunnable, "runnable线程").start();
  23. // 避免线程结束
  24. ThreadUtil.sleep(5);
  25. }
  26. /**
  27. * 调用 callable 的多线程
  28. */
  29. @Test
  30. public void test02() throws ExecutionException, InterruptedException {
  31. // 声明线程对象
  32. ThreadCallable threadCallable = new ThreadCallable();
  33. // 中间对象(与runnable的区别1)
  34. FutureTask<String> futureTask = new FutureTask<>(threadCallable);
  35. // 启动线程
  36. new Thread(futureTask, "callable线程").start();
  37. // 取线程的结果值 (与runnable的区别2)
  38. // 1. futureTask.get() 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常
  39. // 2. 只计算一次(重复调用的情况下,call方法只执行一次)
  40. for (int i = 0; i < 5; i++) {
  41. String s = futureTask.get();
  42. System.out.println("线程结果值:" + s);
  43. }
  44. // 避免线程结束
  45. ThreadUtil.sleep(5);
  46. }
  47. }

7.5.核心总结

大家就把上面的内容它作为一个知识点了解就可以了,实际生产中不会直接这样使用的!

8.CompletableFuture接口

重点归纳:

1.CompletableFuture.runAsync 没有返回值的异步阻塞

2.有返回值的异步任务

3.thenApply 有返回值的线程依赖 当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

4.thenAccept 消费处理结果 接收任务的处理结果,并消费处理,无返回结果。

5.exceptionally 异常处理,出现异常时触发

6.handle 最终结果处理 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

7.thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

8.thenCombine 合并两个没有依赖关系的 CompletableFutures 执行结果

案例代码如下:

  1. package demo11;
  2. import common.ThreadUtil;
  3. import org.junit.Test;
  4. import java.util.concurrent.CompletableFuture;
  5. public class CompletableTest01 {
  6. /**
  7. * CompletableFuture.runAsync
  8. * 没有返回值的异步阻塞
  9. *
  10. * @throws Exception
  11. */
  12. @Test
  13. public void test01() throws Exception {
  14. System.out.println("测试开始...");
  15. System.out.println("线程名称:" + Thread.currentThread().getName());
  16. // 异步执行
  17. CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
  18. System.out.println("线程名称:" + Thread.currentThread().getName());
  19. System.out.println(Thread.currentThread().getName() + " 异步执行正在执行.....");
  20. ThreadUtil.sleep(3);
  21. System.out.println(Thread.currentThread().getName() + " 异步执行结束");
  22. });
  23. ThreadUtil.sleep(3);
  24. System.out.println("获取异步执行结果....");
  25. // 该方法会阻塞(不论你是否调用,子线程都会执行)
  26. // runAsync.get();
  27. System.out.println("测试结束...");
  28. }
  29. /**
  30. * 有返回值的异步任务
  31. *
  32. * @throws Exception
  33. */
  34. @Test
  35. public void test02() throws Exception {
  36. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  37. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  38. System.out.println(Thread.currentThread().getName() + " 异步执行正在执行.....");
  39. ThreadUtil.sleep(2);
  40. System.out.println(" 异步执行结束");
  41. return "OK";
  42. });
  43. System.out.println("开始获取子线程结果...");
  44. // 该方法 只计算一次,会产生阻塞(不论你是否调用,子线程都会执行)
  45. String result = future.get();
  46. System.out.println("result::" + result);
  47. System.out.println("多次获取测试");
  48. String result2 = future.get();
  49. System.out.println("result2::" + result2);
  50. System.out.println("测试结束...");
  51. }
  52. /**
  53. * thenApply
  54. * 有返回值的线程依赖
  55. * 当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
  56. *
  57. * @throws Exception
  58. */
  59. @Test
  60. public void test03() throws Exception {
  61. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  62. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  63. System.out.println(" 正在操作数据库");
  64. ThreadUtil.sleep(2);
  65. System.out.println("数据库操作完成");
  66. return "操作数据库->";
  67. }).thenApply((str) -> {
  68. System.out.println(" 正在关闭数据库");
  69. ThreadUtil.sleep(2);
  70. System.out.println("数据库关闭完成");
  71. return str + "关闭数据库";
  72. });
  73. System.out.println("开始获取子线程结果...");
  74. // 该方法 只计算一次,会产生阻塞(不论你是否调用,子线程都会执行)
  75. String result = future.get();
  76. System.out.println("result::" + result);
  77. System.out.println("测试结束...");
  78. }
  79. /**
  80. * thenAccept
  81. * 消费处理结果
  82. * 接收任务的处理结果,并消费处理,无返回结果。
  83. * 思考这种情况与 test03 的区别: 结果也是在子线程中处理
  84. *
  85. * @throws Exception
  86. */
  87. @Test
  88. public void test03_01() throws Exception {
  89. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  90. CompletableFuture.supplyAsync(() -> {
  91. System.out.println(" 正在操作数据库");
  92. ThreadUtil.sleep(2);
  93. System.out.println("数据库操作完成");
  94. return "操作数据库->";
  95. }).thenApply((str1) -> {
  96. System.out.println(" 正在关闭数据库");
  97. ThreadUtil.sleep(2);
  98. System.out.println("数据库关闭完成");
  99. return str1 + "关闭数据库";
  100. }).thenAccept((str2) -> {
  101. System.out.println("数据库操作流程:" + str2);
  102. });
  103. // future.get();
  104. System.out.println("测试结束...");
  105. ThreadUtil.sleep(5);
  106. }
  107. /**
  108. * exceptionally
  109. * 异常处理,出现异常时触发
  110. *
  111. * @throws Exception
  112. */
  113. @Test
  114. public void test03_02() throws Exception {
  115. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  116. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  117. System.out.println(" 正在操作数据库");
  118. ThreadUtil.sleep(2);
  119. // 模拟错误
  120. //int i = 2 / 0;
  121. System.out.println("数据库操作完成");
  122. return "操作数据库->";
  123. }).thenApply((str) -> {
  124. System.out.println(" 正在关闭数据库");
  125. // 模拟错误
  126. int i = 2 / 0;
  127. ThreadUtil.sleep(2);
  128. System.out.println("数据库关闭完成");
  129. return str + "关闭数据库";
  130. }).exceptionally((exception) -> {
  131. String message = exception.getMessage();
  132. return "数据库操作错误:" + message;
  133. });
  134. System.out.println("开始获取子线程结果...");
  135. // 该方法 只计算一次,会产生阻塞(不论你是否调用,子线程都会执行)
  136. String result = future.get();
  137. System.out.println("result::" + result);
  138. System.out.println("测试结束...");
  139. }
  140. /**
  141. * handle
  142. * 最终结果处理
  143. * thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常
  144. *
  145. * @throws Exception
  146. */
  147. @Test
  148. public void test03_03() throws Exception {
  149. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  150. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  151. System.out.println(" 正在操作数据库");
  152. ThreadUtil.sleep(2);
  153. // 模拟错误
  154. //int i = 2 / 0;
  155. System.out.println("数据库操作完成");
  156. return "操作数据库->";
  157. }).thenApply((str) -> {
  158. System.out.println(" 正在关闭数据库");
  159. // 模拟错误
  160. int i = 2 / 0;
  161. ThreadUtil.sleep(2);
  162. System.out.println("数据库关闭完成");
  163. return str + "关闭数据库";
  164. }).handle((result, exception) -> {
  165. if (exception != null) {
  166. System.out.println("异常:" + exception.getMessage());
  167. // 如果是异常 result是null
  168. return result + "..exception";
  169. } else {
  170. System.out.println("正常的结果");
  171. // 如果没有发送异常,exception 是null
  172. return result + "..success";
  173. }
  174. });
  175. String result = future.get();
  176. System.out.println("result::" + result);
  177. System.out.println("测试结束...");
  178. }
  179. /**
  180. * thenCompose
  181. * 合并两个有依赖关系的 CompletableFutures 的执行结果
  182. *
  183. * @throws Exception
  184. */
  185. @Test
  186. public void test04() throws Exception {
  187. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  188. // 第一步
  189. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  190. ThreadUtil.sleep(2);
  191. return "操作数据库->";
  192. });
  193. // 第二步 合并两个有依赖关系的 CompletableFutures 的执行结果
  194. CompletableFuture<String> future2 = future1.thenCompose((str) ->
  195. CompletableFuture.supplyAsync(() -> {
  196. return str + "操作完毕";
  197. })
  198. );
  199. String result1 = future1.get();
  200. String result2 = future2.get();
  201. System.out.println("result1::" + result1);
  202. System.out.println("result2::" + result2);
  203. System.out.println("测试结束...");
  204. }
  205. /**
  206. * thenCombine
  207. * 合并两个没有依赖关系的 CompletableFutures 执行结果
  208. *
  209. * @throws Exception
  210. */
  211. @Test
  212. public void test05() throws Exception {
  213. System.out.println(Thread.currentThread().getName() + " 测试开始...");
  214. // 第一步
  215. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  216. ThreadUtil.sleep(2);
  217. return "操作数据库->";
  218. });
  219. // 第二步
  220. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  221. ThreadUtil.sleep(2);
  222. return "操作完毕";
  223. });
  224. // henCombine 合并两个没有依赖关系的 CompletableFutures 任务
  225. CompletableFuture<String> future3 = future1.thenCombine(future2, (str1, str2) -> {
  226. return str1 + "|" + str2;
  227. });
  228. String result1 = future1.get();
  229. String result2 = future2.get();
  230. String result3 = future3.get();
  231. System.out.println("result1::" + result1);
  232. System.out.println("result2::" + result2);
  233. System.out.println("result3::" + result3);
  234. System.out.println("测试结束...");
  235. }
  236. }

9.小结

课程已讲完,感谢各位童鞋的观看!