资料来源:https://www.bilibili.com/video/BV1Kw411Z7dF/?p=2&spm_id_from=pageDriver
Volume Control(chrome浏览器音量控制器扩展程序):http://www.3h3.com/soft/201466.html
离线安装chrome插件教程:http://www.crx4.com/507.html

一、什么是 JUC

1、JUC 简介

在 Java 中,线程部分是一个重点,本篇文章说的 JUC 也是关于线程的。JUC 就是 java.util.concurrent 工具包的简称。这是一个处理线程的工具包,JDK1.5 开始出现的。
image.png

2、进程与线程

进程(_Process)_是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。

线程(_thread)_是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流, 一个进程中可以并发多个线程,每条线程并行执行不同的任务
总结来说:
进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程是资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程是程序执行的最小单位。

3、线程的状态

3.1 线程状态枚举类

Thread.State

  1. public enum State {
  2. /**
  3. * Thread state for a thread which has not yet started.
  4. */
  5. NEW,(新建)
  6. /**
  7. *Thread state for a runnable thread. A thread in the runnable
  8. *state is executing in the Java virtual machine but it may
  9. *be waiting for other resources from the operating system
  10. *such as processor.
  11. */
  12. RUNNABLE,(准备就绪)
  13. /**
  14. *Thread state for a thread blocked waiting for a monitor lock.
  15. *A thread in the blocked state is waiting for a monitor lock
  16. *to enter a synchronized block/method or
  17. *reenter a synchronized block/method after calling
  18. *{@link Object#wait() Object.wait}.
  19. */
  20. BLOCKED,(阻塞)
  21. /**
  22. *Thread state for a waiting thread.
  23. *A thread is in the waiting state due to calling one of the
  24. *following methods:
  25. *<ul>
  26. *<li>{@link Object#wait() Object.wait} with no timeout</li>
  27. *<li>{@link #join() Thread.join} with no timeout</li>
  28. *<li>{@link LockSupport#park() LockSupport.park}</li>
  29. * </ul>
  30. *
  31. *<p>A thread in the waiting state is waiting for another thread to
  32. *perform a particular action.
  33. *
  34. *For example, a thread that has called <tt>Object.wait()</tt>
  35. *on an object is waiting for another thread to call
  36. *<tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
  37. *that object. A thread that has called <tt>Thread.join()</tt>
  38. *is waiting for a specified thread to terminate.
  39. */
  40. WAITING,(不见不散)
  41. /**
  42. *Thread state for a waiting thread with a specified waiting time.
  43. *A thread is in the timed waiting state due to calling one of
  44. *the following methods with a specified positive waiting time:
  45. *<ul>
  46. <li>{@link #sleep Thread.sleep}</li>
  47. *<li>{@link Object#wait(long) Object.wait} with timeout</li>
  48. *<li>{@link #join(long) Thread.join} with timeout</li>
  49. *<li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
  50. *<li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
  51. * </ul>
  52. */
  53. TIMED_WAITING,(过时不候)
  54. /**
  55. *Thread state for a terminated thread.
  56. *The thread has completed execution.
  57. */
  58. TERMINATED;(终结)
  59. }

3.2 wait/sleep的区别

(1) sleep 是Thread 的静态方法,wait 是Object 的方法,任何对象实例都能调用。
(2) sleep 不会释放锁,它也不需要占用锁。wait 会释放锁,但调用它的前提是当前线程占有锁(即代码要在 synchronized 中)。
(3) 它们都可以被interrupted 方法中断。

4、并发与并行

4.1 串行模式

串行表示所有任务都一一按先后顺序进行。串行意味着必须先装完一车柴才能运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤。
串行是一次只能取得一个任务,并执行这个任务。

4.2 并行模式

并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核 CPU。

4.3 并发

并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,==并发的重点在于它是一种现象==, ==并发描述的是多进程同时运行的现象==。但实际上,对于单核心 CPU 来说,同一时刻只能运行一个线程。所以,这里的”同时运行”表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占CPU 的,而是执行一会停一会。
要解决大并发问题,通常是将大任务分解成多个小任务,由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。
这可能会出现一些现象:

  1. 可能出现一个小任务执行了多次,还没开始下个任务的情况。这时一般会采用队列或类似的数据结构来存放各个小任务的成果
  2. 可能出现还没准备好第一步就执行第二步的可能。这时,一般采用多路复用或异步的方式,比如只有准备好产生了事件通知才执行某个任务。
  3. 可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这些小任务,这时很可能要配合多路复用才能达到较高的效率

    4.4 小结(重点)

并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:春运抢票 电商秒杀…
并行:多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中

5、管程

管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现).但是这样并不能保证进程以设计的顺序执行
JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程
(monitor)对象,管程(monitor)会随着java 对象一同创建和销毁
执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程
image.png

6、用户线程和守护线程

用户线程平时用到的普通线程,自定义线程
守护线程运行在后台,是一种特殊的线程,比如垃圾回收
当主线程结束后用户线程还在运行,JVM存
如果没有用户线程都是守护线程,JVM结束
image.png
image.png
image.png

二、Lock 接口

1、Synchronized

1.1 Synchronized关键字回顾

synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:
1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{} 括起来的代码,作用的对象是调用这个代码块的对象;
2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;
注意:虽然可以使用synchronized 来定义方法,但synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方法使用了synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方
法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。
3. 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象;
4. 修改一个类,其作用的范围是synchronized 后面括号括起来的部分,作用主的对象是这个类的所有对象。

1.2 售票案例

  1. package com.atguigu.sync;
  2. //第一步 创建资源类,定义属性和和操作方法
  3. class Ticket {
  4. //票数
  5. private int number = 30;
  6. //操作方法:卖票
  7. public synchronized void sale() {
  8. //判断:是否有票
  9. if(number > 0) {
  10. System.out.println(Thread.currentThread().getName()+" : 卖出:"+(number--)+" 剩下:"+number);
  11. }
  12. }
  13. }
  14. public class SaleTicket {
  15. //第二步 创建多个线程,调用资源类的操作方法
  16. public static void main(String[] args) {
  17. //创建Ticket对象
  18. Ticket ticket = new Ticket();
  19. //创建三个线程
  20. new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. //调用卖票方法
  24. for (int i = 0; i < 40; i++) {
  25. ticket.sale();
  26. }
  27. }
  28. },"AA").start();
  29. new Thread(new Runnable() {
  30. @Override
  31. public void run() {
  32. //调用卖票方法
  33. for (int i = 0; i < 40; i++) {
  34. ticket.sale();
  35. }
  36. }
  37. },"BB").start();
  38. new Thread(new Runnable() {
  39. @Override
  40. public void run() {
  41. //调用卖票方法
  42. for (int i = 0; i < 40; i++) {
  43. ticket.sale();
  44. }
  45. }
  46. },"CC").start();
  47. }
  48. }

如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:
1) 获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
2) 线程执行发生异常,此时JVM 会让线程自动释放锁。
那么如果这个获取锁的线程由于要等待IO 或者其他原因(比如调用 sleep 方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。
因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过 Lock 就可以办到。

2、Lock

Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock 提供了比synchronized 更多的功能。
Lock 与的 Synchronized 区别

  1. Lock 不是Java 语言内置的,synchronized 是Java 语言的关键字,因此是内置特性。Lock 是一个类,通过这个类可以实现同步访问;
  2. Lock 和synchronized 有一点非常大的不同,采用synchronized 不需要用户去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后, 系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。

    2.1 售票案例

    ```java package com.atguigu.lock;

import java.util.concurrent.locks.ReentrantLock;

//第一步 创建资源类,定义属性和和操作方法 class LTicket {

  1. //票数量
  2. private int number = 30;
  3. //创建可重入锁
  4. private final ReentrantLock lock = new ReentrantLock(true);
  5. //卖票方法
  6. public void sale() {
  7. //上锁
  8. lock.lock();
  9. try {
  10. //判断是否有票
  11. if(number > 0) {
  12. System.out.println(Thread.currentThread().getName()+" :卖出"+(number--)+" 剩余:"+number);
  13. }
  14. } finally {
  15. //解锁
  16. lock.unlock();
  17. }
  18. }

}

public class LSaleTicket {

  1. //第二步 创建多个线程,调用资源类的操作方法
  2. //创建三个线程
  3. public static void main(String[] args) {
  4. LTicket ticket = new LTicket();
  5. new Thread(()-> {
  6. for (int i = 0; i < 40; i++) {
  7. ticket.sale();
  8. }
  9. },"AA").start();
  10. new Thread(()-> {
  11. for (int i = 0; i < 40; i++) {
  12. ticket.sale();
  13. }
  14. },"BB").start();
  15. new Thread(()-> {
  16. for (int i = 0; i < 40; i++) {
  17. ticket.sale();
  18. }
  19. },"CC").start();
  20. }

}

  1. <a name="Ozlls"></a>
  2. ### 2.2 Lock 接口
  3. ```java
  4. public interface Lock {
  5. void lock();
  6. void lockInterruptibly() throws InterruptedException;
  7. boolean tryLock();
  8. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  9. void unlock();
  10. Condition newCondition();
  11. }

下面来逐个讲述 Lock 接口中每个方法的使用

2.3 lock

lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock 必须在try{}catch{}块中进行,并且将释放锁的操作放在finally 块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock 来进行同步的话,是以下面这种形式去使用的:

  1. Lock lock = ...;
  2. lock.lock();
  3. try{
  4. //处理任务
  5. }catch(Exception ex){
  6. }finally{
  7. lock.unlock(); //释放锁
  8. }

2.4 newCondition

关键字synchronized 与wait()/notify()这两个方法一起使用可以实现等待/通知模式,Lock 锁的newContition()方法返回 Condition 对象,Condition 类也可以实现等待/通知模式。
用notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以进行选择性通知,Condition 比较常用的两个方法:

  1. await()会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重新获得锁并继续执行。
  2. signal()用于唤醒一个等待的线程。

注意:在调用 Condition 的 await()/signal()方法前,也需要线程持有相关的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行。

3、ReentrantLock

ReentrantLock,意思是“可重入锁”,关于可重入锁的概念将在后面讲述。
ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法。下面通过一些实例看具体看一下如何使用。

  1. public class Test {
  2. private ArrayList<Integer> arrayList = new ArrayList<Integer>();
  3. public static void main(String[] args) {
  4. final Test test = new Test();
  5. new Thread(){
  6. public void run() {
  7. test.insert(Thread.currentThread());
  8. };
  9. }.start();
  10. new Thread(){
  11. public void run() {
  12. test.insert(Thread.currentThread());
  13. };
  14. }.start();
  15. }
  16. public void insert(Thread thread) {
  17. Lock lock = new ReentrantLock(); //注意这个地方
  18. lock.lock();
  19. try {
  20. System.out.println(thread.getName()+"得到了锁");
  21. for(int i=0;i<5;i++) {
  22. arrayList.add(i);
  23. }
  24. } catch (Exception e) {
  25. // TODO: handle exception
  26. }finally {
  27. System.out.println(thread.getName()+"释放了锁");
  28. lock.unlock();
  29. }
  30. }
  31. }

4、ReadWriteLock

ReadWriteLock 也是一个接口,在它里面只定义了两个方法:

  1. public interface ReadWriteLock {
  2. /**
  3. * Returns the lock used for reading.
  4. *
  5. * @return the lock used for reading.
  6. */
  7. Lock readLock();
  8. /**
  9. * Returns the lock used for writing.
  10. *
  11. * @return the lock used for writing.
  12. */
  13. Lock writeLock();
  14. }

一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock 实现了 ReadWriteLock 接口
ReentrantReadWriteLock 里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和 writeLock()用来获取读锁和写锁。
下面通过几个例子来看一下 ReentrantReadWriteLock 具体用法。
假如有多个线程要同时进行读操作的话,先看一下 synchronized 达到的效果:

  1. public class Test {
  2. private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  3. public static void main(String[] args) {
  4. final Test test = new Test();
  5. new Thread(){
  6. public void run() {
  7. test.get(Thread.currentThread());
  8. };
  9. }.start();
  10. new Thread(){
  11. public void run() {
  12. test.get(Thread.currentThread());
  13. };
  14. }.start();
  15. }
  16. public synchronized void get(Thread thread) {
  17. long start = System.currentTimeMillis();
  18. while(System.currentTimeMillis() - start <= 1) {
  19. System.out.println(thread.getName()+"正在进行读操作");
  20. }
  21. System.out.println(thread.getName()+"读操作完毕");
  22. }
  23. }

而改成用读写锁的话:

  1. private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  2. public static void main(String[] args) {
  3. final Test test = new Test();
  4. new Thread(){
  5. public void run() {
  6. test.get(Thread.currentThread());
  7. };
  8. }.start();
  9. new Thread(){
  10. public void run() {
  11. test.get(Thread.currentThread());
  12. };
  13. }.start();
  14. }
  15. public void get(Thread thread) {
  16. rwl.readLock().lock();
  17. try {
  18. long start = System.currentTimeMillis();
  19. while(System.currentTimeMillis() - start <= 1) {
  20. System.out.println(thread.getName()+"正在进行读操作");
  21. }
  22. System.out.println(thread.getName()+"读操作完毕");
  23. } finally {
  24. rwl.readLock().unlock();
  25. }
  26. }

说明 thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率。
==注意==

  1. 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
  2. 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

    5、小结(重点)

Lock 和 synchronized 有以下几点不同:
1. Lock 是一个接口,而synchronized 是Java 中的关键字,synchronized 是内置的语言实现;
2. synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock 在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock 时需要在finally 块中释放锁;
3. Lock 可以让等待锁的线程响应中断,而synchronized 却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断;
4. 通过Lock 可以知道有没有成功获取锁,而synchronized 却无法办到。
5. Lock 可以提高多个线程进行读操作的效率。

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于synchronized。

三、线程间通信

线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析

场景—-两个线程,一个线程对当前数值加1,另一个线程对当前数值减1,要求用线程间通信

1、synchronized 方案

  1. package com.atguigu.sync;
  2. //第一步 创建资源类,定义属性和操作方法
  3. class Share {
  4. // 初始值
  5. private int number = 0;
  6. // +1的方法
  7. public synchronized void incr() throws InterruptedException {
  8. // 第二步 判断 干活 通知
  9. while(number != 0) { // 判断number值是否是0,如果不是0,等待
  10. this.wait(); // 在哪里睡,就在哪里醒
  11. }
  12. // 如果number值是0,就+1操作
  13. number++;
  14. System.out.println(Thread.currentThread().getName()+" :: "+number);
  15. // 通知其他线程
  16. this.notifyAll();
  17. }
  18. // -1的方法
  19. public synchronized void decr() throws InterruptedException {
  20. // 判断
  21. while(number != 1) {
  22. this.wait();
  23. }
  24. // 干活
  25. number--;
  26. System.out.println(Thread.currentThread().getName()+" :: "+number);
  27. // 通知其他线程
  28. this.notifyAll();
  29. }
  30. }
  31. public class ThreadDemo1 {
  32. // 第三步 创建多个线程,调用资源类的操作方法
  33. public static void main(String[] args) {
  34. Share share = new Share();
  35. // 创建线程
  36. new Thread(()->{
  37. for (int i = 1; i <=10; i++) {
  38. try {
  39. share.incr(); // +1
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. },"AA").start();
  45. new Thread(()->{
  46. for (int i = 1; i <=10; i++) {
  47. try {
  48. share.decr(); //-1
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. },"BB").start();
  54. new Thread(()->{
  55. for (int i = 1; i <=10; i++) {
  56. try {
  57. share.incr(); //+1
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. },"CC").start();
  63. new Thread(()->{
  64. for (int i = 1; i <=10; i++) {
  65. try {
  66. share.decr(); //-1
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. },"DD").start();
  72. }
  73. }

1.1 虚假唤醒问题

image.png
image.png

2、Lock 方案

  1. package com.atguigu.lock;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. // 第一步 创建资源类,定义属性和操作方法
  6. class Share {
  7. private int number = 0;
  8. // 创建Lock,ReentrantLock可重入锁
  9. private Lock lock = new ReentrantLock();
  10. private Condition condition = lock.newCondition();
  11. // +1
  12. public void incr() throws InterruptedException {
  13. // 上锁
  14. lock.lock();
  15. try {
  16. // 判断
  17. while (number != 0) {
  18. condition.await();
  19. }
  20. // 干活
  21. number++;
  22. System.out.println(Thread.currentThread().getName()+" :: "+number);
  23. // 通知其他线程
  24. condition.signalAll();
  25. }finally {
  26. // 解锁
  27. lock.unlock();
  28. }
  29. }
  30. // -1
  31. public void decr() throws InterruptedException {
  32. lock.lock();
  33. try {
  34. while(number != 1) {
  35. condition.await();
  36. }
  37. number--;
  38. System.out.println(Thread.currentThread().getName()+" :: "+number);
  39. condition.signalAll();
  40. }finally {
  41. lock.unlock();
  42. }
  43. }
  44. }
  45. public class ThreadDemo2 {
  46. public static void main(String[] args) {
  47. Share share = new Share();
  48. new Thread(()->{
  49. for (int i = 1; i <=10; i++) {
  50. try {
  51. share.incr();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. },"AA").start();
  57. new Thread(()->{
  58. for (int i = 1; i <=10; i++) {
  59. try {
  60. share.decr();
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. },"BB").start();
  66. new Thread(()->{
  67. for (int i = 1; i <=10; i++) {
  68. try {
  69. share.incr();
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. },"CC").start();
  75. new Thread(()->{
  76. for (int i = 1; i <=10; i++) {
  77. try {
  78. share.decr();
  79. } catch (InterruptedException e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. },"DD").start();
  84. }
  85. }

3、线程间定制化通信

3.1 案例介绍

==问题:AA线程打印5 次AA,BB线程打印10 次BB,CC 线程打印15 次CC,按照此顺序循环10轮==
image.png

3.2 实现流程

  1. package com.atguigu.lock;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.UUID;
  5. import java.util.concurrent.locks.Condition;
  6. import java.util.concurrent.locks.Lock;
  7. import java.util.concurrent.locks.ReentrantLock;
  8. // 第一步 创建资源类
  9. class ShareResource {
  10. // 定义标志位
  11. private int flag = 1; // 1 AA 2 BB 3 CC
  12. // 创建Lock锁,ReentrantLock可重入锁
  13. private Lock lock = new ReentrantLock();
  14. // 创建三个condition,可以只创建一个Condition,三个方法均使用它c1.signalAll(),但达不到定制化的目的
  15. private Condition c1 = lock.newCondition();
  16. private Condition c2 = lock.newCondition();
  17. private Condition c3 = lock.newCondition();
  18. // 打印5次,参数第几轮
  19. public void print5(int loop) throws InterruptedException {
  20. // 上锁
  21. lock.lock();
  22. try {
  23. // 判断,使用while而不使用if是为了防止虚假唤醒
  24. while(flag != 1) {
  25. // 等待
  26. c1.await();
  27. }
  28. // 干活
  29. for (int i = 1; i <=5; i++) {
  30. System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
  31. }
  32. // 通知
  33. flag = 2; // 修改标志位 2
  34. c2.signal(); // 通知BB线程:BB线程用的c2.await()等待,使用c2.signal()可以达到精准通知的目的;
  35. // 若使用c1.signal()或c3.signal(),会出现死循环
  36. }finally {
  37. //释放锁
  38. lock.unlock();
  39. }
  40. }
  41. // 打印10次,参数第几轮
  42. public void print10(int loop) throws InterruptedException {
  43. lock.lock();
  44. try {
  45. while(flag != 2) {
  46. c2.await();
  47. }
  48. for (int i = 1; i <=10; i++) {
  49. System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
  50. }
  51. //修改标志位
  52. flag = 3;
  53. //通知CC线程
  54. c3.signal();
  55. }finally {
  56. lock.unlock();
  57. }
  58. }
  59. //打印15次,参数第几轮
  60. public void print15(int loop) throws InterruptedException {
  61. lock.lock();
  62. try {
  63. while(flag != 3) {
  64. c3.await();
  65. }
  66. for (int i = 1; i <=15; i++) {
  67. System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
  68. }
  69. //修改标志位
  70. flag = 1;
  71. //通知AA线程
  72. c1.signal();
  73. }finally {
  74. lock.unlock();
  75. }
  76. }
  77. }
  78. public class ThreadDemo3 {
  79. public static void main(String[] args) {
  80. ShareResource shareResource = new ShareResource();
  81. new Thread(()->{
  82. for (int i = 1; i <=10; i++) {
  83. try {
  84. shareResource.print5(i);
  85. } catch (InterruptedException e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. },"AA").start();
  90. new Thread(()->{
  91. for (int i = 1; i <=10; i++) {
  92. try {
  93. shareResource.print10(i);
  94. } catch (InterruptedException e) {
  95. e.printStackTrace();
  96. }
  97. }
  98. },"BB").start();
  99. new Thread(()->{
  100. for (int i = 1; i <=10; i++) {
  101. try {
  102. shareResource.print15(i);
  103. } catch (InterruptedException e) {
  104. e.printStackTrace();
  105. }
  106. }
  107. },"CC").start();
  108. }
  109. }

4、多线程编程步骤

image.png
image.png

四、集合的线程安全

1、ArrayList线程不安全

image.png
System.out.println(list);取数据是出现异常的原因
异常内容:java.util.ConcurrentModificationException
问题: 为什么会出现并发修改异常?
查看ArrayList 的add 方法源码

  1. /**
  2. * Appends the specified element to the end of this list.
  3. *
  4. * @param e element to be appended to this list
  5. * @return <tt>true</tt> (as specified by {@link Collection#add})
  6. */
  7. public boolean add(E e) {
  8. ensureCapacityInternal(size + 1); // Increments modCount!!
  9. elementData[size++] = e;
  10. return true;
  11. }

==那么我们如何去解决_List 类型的线程安全问题_?==

1.1 解决方案 - Vector

Vector 是矢量队列,它是JDK1.0 版本添加的类。继承于AbstractList,实现了List, RandomAccess, Cloneable 这些接口。Vector 继承了AbstractList, 实现了List;所以,它是一个队列,支持相关的添加、删除、修改、遍历等功能。 Vector 实现了 RandmoAccess 接口,即提供了随机访问功能
RandmoAccess 是 java 中用来被 List 实现,为 List 提供快速访问功能的。在Vector 中,我们即可以通过元素的序号快速获取元素对象;这就是快速随机访问。 Vector 实现了 Cloneable 接口,即实现 clone()函数。它能被克隆。

==和_ArrayList 不同,Vector 中的操作是线程安全的。_==
image.png
现在没有运行出现并发异常,为什么?
查看 Vector 的 add 方法
image.png
add方法被_synchronized 同步修辞线程安全因此没有并发异常_

1.2 解决方案 - Collections

Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的
image.png
没有并发修改异常
查看方法源码

  1. public static <T> List<T> synchronizedList(List<T> list) {
  2. return (list instanceof RandomAccess ?
  3. new SynchronizedRandomAccessList<>(list) :
  4. new SynchronizedList<>(list));
  5. }

1.3 解决方案 - CopyOnWriteArrayList(重点,常用)

image.png
image.png
首先我们对 CopyOnWriteArrayList 进行学习,其特点如下:
它相当于线程安全的 ArrayList。和 ArrayList 一样,它是个可变数组;但是和ArrayList 不同的时,它具有以下特性:
1. 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
2. 它是线程安全的。
3. 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove()等等)的开销很大。
4. 迭代器支持hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
5. 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
1. 独占锁效率低:采用读写分离思想解决
2. 写线程获取到锁,其他写线程阻塞
3. 复制思想:

当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来得及写会内存,其他的线程就会读到了脏数据。
==这就是_CopyOnWriteArrayList 的思想和原理。就是拷贝一份。_==
image.png

没有线程安全问题
原因分析(重点):==动态数组与线程安全==
下面从“动态数组”和“线程安全”两个方面进一步对
CopyOnWriteArrayList 的原理进行说明。

1.3.1 “动态数组”机制

o 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因
o 由于它在“添加修改删除”数据时,都会新建数组,所以涉及到修改数据的操作,_CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话, 效率比较高。_

1.3.2 “线程安全”机制

o 通过volatile 和互斥锁来实现的。
o 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该volatile 变量最后的写入;就这样,通过volatile 提供了“读取到的数据总是最新的”这个机制的保证。
o 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”, 再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”,就达到了保护数据的目的。

2、HashSet线程不安全

image.png

解决方案CopyOnWriteArraySet

image.png

3、HashMap线程不安全

image.png

解决方案:ConcurrentHashMap

image.png

4、小结(重点)

1)线程安全与线程不安全集合
集合类型中存在线程安全与线程不安全的两种,常见例如:
ArrayList Vector
HashMap HashTable
但是以上都是通过 synchronized 关键字实现,效率较低
2)Collections 构建的线程安全集合

3)java.util.concurrent 并发包下
CopyOnWriteArrayList CopyOnWriteArraySet 类型,通过动态数组与线程安全个方面保证线程安全

五、多线程锁

1、锁的八个问题演示

  1. package com.atguigu.sync;
  2. import java.util.concurrent.TimeUnit;
  3. class Phone {
  4. public static synchronized void sendSMS() throws Exception {
  5. // 停留4秒
  6. TimeUnit.SECONDS.sleep(4);
  7. System.out.println("------sendSMS");
  8. }
  9. public synchronized void sendEmail() throws Exception {
  10. System.out.println("------sendEmail");
  11. }
  12. public void getHello() {
  13. System.out.println("------getHello");
  14. }
  15. }
  16. /**
  17. * @Description: 8锁
  18. *
  19. 1 标准访问,先打印短信还是邮件 -> synchronized锁住的是同一个对象,所以sendEmail会等sendSMS
  20. ------sendSMS
  21. ------sendEmail
  22. 2 停4秒在短信方法内,先打印短信还是邮件
  23. ------sendSMS
  24. ------sendEmail
  25. 3 新增普通的hello方法,是先打短信还是hello
  26. ------getHello
  27. ------sendSMS
  28. 4 现在有两部手机,先打印短信还是邮件 -> 两部手机,不同对象,所以sendEmail不会等sendSMS
  29. ------sendEmail
  30. ------sendSMS
  31. 5 两个静态同步方法,1部手机 -> synchronized锁的是整个字节码文件Class,锁住的是同一个对象
  32. ------sendSMS
  33. ------sendEmail
  34. 6 两个静态同步方法,2部手机,先打印短信还是邮件 -> synchronized锁的是当前类的Class对象
  35. ------sendSMS
  36. ------sendEmail
  37. 7 1个静态同步方法,1个普通同步方法,1部手机 -> synchronized两把锁不一样,一个是Class,一个是对象
  38. ------sendEmail
  39. ------sendSMS
  40. 8 1个静态同步方法,1个普通同步方法,2部手机 -> synchronized两把锁不一样,一个是Class,一个是对象
  41. ------sendEmail
  42. ------sendSMS
  43. */
  44. public class Lock_9 {
  45. public static void main(String[] args) throws Exception {
  46. Phone phone = new Phone();
  47. Phone phone2 = new Phone();
  48. new Thread(() -> {
  49. try {
  50. Phone1.sendSMS();
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }, "AA").start();
  55. Thread.sleep(100);
  56. new Thread(() -> {
  57. try {
  58. // phone.sendEmail();
  59. // phone.getHello();
  60. phone2.sendEmail();
  61. } catch (Exception e) {
  62. e.printStackTrace();
  63. }
  64. }, "BB").start();
  65. }
  66. }

1.1 结论

一个对象里面如果有多个synchronized 方法,某一个时刻内,只要一个线程去调用其中的一个synchronized 方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized 方法
锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized 方法
加个普通方法后发现和同步锁无关
换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁。
具体表现为以下_3 种形式。_
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的_Class对象。_
对于同步方法块,锁是_Synchonized括号里配置的对象_
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,
可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁, 所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!

2、公平锁和非公平锁

  1. // 创建可重入锁,默认是非公平锁
  2. // false:非公平锁,可能造成线程独占,其它线程饿死,但执行效率高
  3. // true:公平锁,阳光普照,每次抢占资源都会查看是否被其他线程占用,若占用就不抢占,执行效率相对低
  4. private final ReentrantLock lock = new ReentrantLock(true);

image.png

  1. package com.atguigu.lock;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. //第一步 创建资源类,定义属性和和操作方法
  4. class LTicket {
  5. // 票数量
  6. private int number = 30;
  7. // 创建可重入锁,true公平锁,false非公平锁
  8. private final ReentrantLock lock = new ReentrantLock(true);
  9. // 卖票方法
  10. public void sale() {
  11. // 上锁
  12. lock.lock();
  13. try {
  14. // 判断是否有票
  15. if(number > 0) {
  16. System.out.println(Thread.currentThread().getName()+" :卖出"+(number--)+" 剩余:"+number);
  17. }
  18. } finally {
  19. // 解锁
  20. lock.unlock();
  21. }
  22. }
  23. }
  24. public class LSaleTicket {
  25. //第二步 创建多个线程,调用资源类的操作方法
  26. //创建三个线程
  27. public static void main(String[] args) {
  28. LTicket ticket = new LTicket();
  29. new Thread(()-> {
  30. for (int i = 0; i < 40; i++) {
  31. ticket.sale();
  32. }
  33. },"AA").start();
  34. new Thread(()-> {
  35. for (int i = 0; i < 40; i++) {
  36. ticket.sale();
  37. }
  38. },"BB").start();
  39. new Thread(()-> {
  40. for (int i = 0; i < 40; i++) {
  41. ticket.sale();
  42. }
  43. },"CC").start();
  44. }
  45. }

3、可重入锁(递归锁)

  1. synchronizedlock都是可重入锁 <br /> sychronized是隐式锁,不用手工上锁与解锁,而lock为显示锁,需要手工上锁与解锁 <br /> 有了可重入锁之后,破解第一把之后就可以一直进入到内层结构。利用同一把锁可以进入各个区域,打开第一道大门后,里面的门可以自由进入

3.1 synchronized同步代码块演示可重入锁

image.png

3.2 synchronized同步方法演示可重入锁

image.png

3.3 Lock演示可重入锁

  1. package com.atguigu.sync;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. //可重入锁
  5. public class SyncLockDemo1 {
  6. public static void main(String[] args) {
  7. // Lock演示可重入锁
  8. Lock lock = new ReentrantLock();
  9. // 创建线程
  10. new Thread(()->{
  11. try {
  12. //上锁
  13. lock.lock();
  14. System.out.println(Thread.currentThread().getName()+" 外层操作");
  15. try {
  16. //上锁
  17. lock.lock();
  18. System.out.println(Thread.currentThread().getName()+" 内层操作");
  19. }finally {
  20. //释放锁
  21. lock.unlock();
  22. }
  23. }finally {
  24. //释放做
  25. lock.unlock();
  26. }
  27. },"t1").start();
  28. //创建新线程
  29. new Thread(()->{
  30. lock.lock();
  31. System.out.println("aaaa");
  32. lock.unlock();
  33. },"aa").start();
  34. }
  35. }

image.png

4、死锁

image.png

  1. package com.atguigu.sync;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * 演示死锁
  5. */
  6. public class DeadLock {
  7. // 创建两个对象
  8. static Object a = new Object();
  9. static Object b = new Object();
  10. public static void main(String[] args) {
  11. new Thread(()->{
  12. synchronized (a) {
  13. System.out.println(Thread.currentThread().getName()+" 持有锁a,试图获取锁b");
  14. try {
  15. TimeUnit.SECONDS.sleep(1);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. synchronized (b) {
  20. System.out.println(Thread.currentThread().getName()+" 获取锁b");
  21. }
  22. }
  23. },"A").start();
  24. new Thread(()->{
  25. synchronized (b) {
  26. System.out.println(Thread.currentThread().getName()+" 持有锁b,试图获取锁a");
  27. try {
  28. TimeUnit.SECONDS.sleep(1);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. synchronized (a) {
  33. System.out.println(Thread.currentThread().getName()+" 获取锁a");
  34. }
  35. }
  36. },"B").start();
  37. }
  38. }

image.png

4.1 验证是否存在死锁

image.png

六、Callable&Future 接口

1、Callable 接口

目前我们学习了有两种创建线程的方法-一种是通过创建 Thread 类,另一种是通过使用Runnable 创建线程。但是,Runnable 缺少的一项功能是,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能, Java 中提供了Callable 接口。
image.png
image.png
Callable 接口的特点如下(重点):
1)为了实现Runnable,需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法。
2)call()方法可以引发异常,而run()则不能。
3)为实现Callable 而必须重写call 方法
4)不能直接替换runnable,因为Thread 类的构造方法根本没有Callable

2、Future 接口

当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。
将Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写5 种方法,这里列出了重要的方法,如下:
1)public boolean cancel(boolean mayInterrupt):用于停止任务
如果尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true时才会中断任务。

2)public Object get()抛出InterruptedException,ExecutionException:用于获取任务的结果
如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。

3)public boolean isDone():如果任务完成,则返回true,否则返回false

可以看到Callable 和Future 做两件事-Callable 与Runnable 类似,因为它封装了要在另一个线程上运行的任务,而Future 用于存储从另一个线程获得的结果。实际上,future 也可以与Runnable 一起使用。

要创建线程,需要 Runnable。为了获得结果,需要 future。

3、FutureTask

image.png
Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建Thread 对象。因此,间接地使用 Callable 创建线程。
get()获取结果;isDone()判断是否计算结束

核心原理:(重点)
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成
1)当主线程将来需要时,就可以通过Future 对象获得后台作业的计算结果或者执行状态
2)一般FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
3)仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法
4)一旦计算完成,就不能再重新开始或取消计算
5)get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常
6)get 只计算一次,因此 get 方法放到最后
image.png
调用未来任务的时候,只会等待计算一次
image.png

案例:
image.png

4、使用Callable 和Future

image.png

5、小结(重点)

1)在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future 对象在后台完成, 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态
2)一般FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果
3)仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
4)只计算一次

七、JUC 三大辅助类

JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时 Lock 锁的频繁操作。这三种辅助类为:
1)CountDownLatch: 减少计数
2)CyclicBarrier: 循环栅栏
3)Semaphore: 信号灯

1、减少计数CountDownLatch

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。
await() 使当前线程在锁存器倒计数至零之前一直在等待,除非线程被中断
countDown()递减锁存器的计数,如果计数达到零,将释放所有等待的线程
1)CountDownLatch 主要有两个方法,当一个或多个线程调用await 方法时,这些线程会阻塞
2)其它线程调用countDown 方法会将计数器减1(调用 countDown 方法的线程不会阻塞)
3)当计数器的值变为0 时,因await 方法阻塞的线程会被唤醒,继续执行

场景: 6 个同学陆续离开教室后值班同学才可以关门

1.1 未使用CountDownLatch会出现的问题

image.png

1.2 使用CountDownLatch

image.png

2、循环栅栏CyclicBarrier

CyclicBarrier 看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句。可以将CyclicBarrier 理解为加1 操作.

场景: 集齐7 颗龙珠就可以召唤神龙

2.1 启动7个线程

image.png

2.2 启动6个线程

image.png

2.3 启动8个线程image.png

3、信号灯Semaphore

Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用acquire 方法获得许可证,release 方法释放许可
场景:抢车位,6 部汽车、3 个停车位
image.png
执行结果:
image.png

八、读写锁

image.png
悲观锁:单独每个人完成事情的时候,执行上锁解锁。解决并发中的问题,不支持并发操作,只能一个一个操作,效率低
乐观锁:每执行一件事情,都会比较数据版本号,谁先提交,谁先提交版本号

1、读写锁介绍

image.png

  1. 表锁:整个表操作,不会发生死锁
  2. 行锁:每个表中的单独一行进行加锁,会发生死锁
  3. 读锁:共享锁(可以有多个人读),会发生死锁
  4. 写锁:独占锁(只能有一个人写),会发生死锁
  5. 读写锁:一个资源可以被多个读线程访问,也可以被一个写线程访问,但不能同时存在读写线程,读写互斥,读读共享
  6. 读写锁ReentrantReadWriteLock
  7. 读锁为ReentrantReadWriteLock.ReadLockreadLock()方法
  8. 写锁为ReentrantReadWriteLock.WriteLockwriteLock()方法
  9. 创建读写锁对象private ReadWriteLock rwLock = new ReentrantReadWriteLock();
  10. 写锁 加锁 rwLock.writeLock().lock();,解锁为rwLock.writeLock().unlock();
  11. 读锁 加锁rwLock.readLock().lock();,解锁为rwLock.readLock().unlock();

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源, 就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA 的并发包提供了读写锁_ReentrantReadWriteLock_ 它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁

1.1 线程进入读锁的前提条件

1)没有其他线程的写锁
2)没有写请求, 或者==有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。==

1.2 线程进入写锁的前提条件

1)没有其他线程的读锁
2)没有其他线程的写锁

1.3 读写锁的三个重要特性

(1) 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2) 重进入:读锁和写锁都支持线程重进入。
(3) 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

2、ReentrantReadWriteLock

ReentrantReadWriteLock 类的整体结构

  1. public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
  2. /** 读锁 */
  3. private final ReentrantReadWriteLock.ReadLock readerLock;
  4. /** 写锁 */
  5. private final ReentrantReadWriteLock.WriteLock writerLock;
  6. final Sync sync;
  7. /** 使用默认(非公平)的排序属性创建一个新的ReentrantReadWriteLock */
  8. public ReentrantReadWriteLock() {
  9. this(false);
  10. }
  11. /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
  12. public ReentrantReadWriteLock(boolean fair) {
  13. sync = fair ? new FairSync() : new NonfairSync();
  14. readerLock = new ReadLock(this);
  15. writerLock = new WriteLock(this);
  16. }
  17. /** 返回用于写入操作的锁 */
  18. public ReentrantReadWriteLock.WriteLock writeLock() {
  19. return writerLock;
  20. }
  21. /** 返回用于读取操作的锁 */
  22. public ReentrantReadWriteLock.ReadLock readLock() {
  23. return readerLock;
  24. }
  25. abstract static class Sync extends AbstractQueuedSynchronizer {}
  26. static final class NonfairSync extends Sync {}
  27. static final class FairSync extends Sync {}
  28. public static class ReadLock implements Lock, java.io.Serializable {}
  29. public static class WriteLock implements Lock, java.io.Serializable {}
  30. }

可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现; 同时其还实现了 Serializable 接口,表示可以进行序列化,在源代码中可以看到 ReentrantReadWriteLock 实现了自己的序列化逻辑。

3、入门案例

场景: 使用_ReentrantReadWriteLock 对一个hashmap 进行读和写操作_

3.1 未加读写锁代码

  1. package com.atguigu.readwrite;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.locks.ReadWriteLock;
  6. import java.util.concurrent.locks.ReentrantReadWriteLock;
  7. // 资源类
  8. class MyCache1 {
  9. // 创建map集合
  10. private volatile Map<String,Object> map = new HashMap<>();
  11. // 放数据
  12. public void put(String key, Object value) {
  13. try {
  14. System.out.println(Thread.currentThread().getName()+" 正在写操作" + key);
  15. // 暂停一会
  16. TimeUnit.MICROSECONDS.sleep(300);
  17. // 放数据
  18. map.put(key, value);
  19. System.out.println(Thread.currentThread().getName()+" 写完了" + key);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. // 取数据
  25. public Object get(String key) {
  26. Object result = null;
  27. try {
  28. System.out.println(Thread.currentThread().getName() + " 正在读取操作" + key);
  29. // 暂停一会
  30. TimeUnit.MICROSECONDS.sleep(300);
  31. result = map.get(key);
  32. System.out.println(Thread.currentThread().getName() + " 取完了" + key);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. return result;
  37. }
  38. }
  39. public class ReadWriteLockDemo1 {
  40. public static void main(String[] args) throws InterruptedException {
  41. MyCache1 myCache = new MyCache1();
  42. // 创建线程放数据
  43. for (int i = 1; i <=5; i++) {
  44. final int num = i;
  45. new Thread(()->{
  46. myCache.put(num + "", num + "");
  47. },String.valueOf(i)).start();
  48. }
  49. // 如果加了等待,模拟不了效果
  50. // TimeUnit.MICROSECONDS.sleep(300);
  51. // 创建线程取数据
  52. for (int i = 1; i <=5; i++) {
  53. final int num = i;
  54. new Thread(()->{
  55. myCache.get(num + "");
  56. },String.valueOf(i)).start();
  57. }
  58. }
  59. }

执行结果:
image.png

3.2 加读写锁代码

  1. package com.atguigu.readwrite;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReadWriteLock;
  7. import java.util.concurrent.locks.ReentrantReadWriteLock;
  8. // 资源类
  9. class MyCache {
  10. // 创建map集合
  11. private volatile Map<String,Object> map = new HashMap<>();
  12. // 创建读写锁对象
  13. private ReadWriteLock rwLock = new ReentrantReadWriteLock();
  14. // 放数据
  15. public void put(String key, Object value) {
  16. // 添加写锁
  17. rwLock.writeLock().lock();
  18. try {
  19. System.out.println(Thread.currentThread().getName()+" 正在写操作" + key);
  20. // 暂停一会
  21. TimeUnit.MICROSECONDS.sleep(300);
  22. // 放数据
  23. map.put(key, value);
  24. System.out.println(Thread.currentThread().getName()+" 写完了" + key);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. } finally {
  28. // 释放写锁
  29. rwLock.writeLock().unlock();
  30. }
  31. }
  32. // 取数据
  33. public Object get(String key) {
  34. // 添加读锁
  35. rwLock.readLock().lock();
  36. Object result = null;
  37. try {
  38. System.out.println(Thread.currentThread().getName() + " 正在读取操作" + key);
  39. // 暂停一会
  40. TimeUnit.MICROSECONDS.sleep(300);
  41. result = map.get(key);
  42. System.out.println(Thread.currentThread().getName() + " 取完了" + key);
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. } finally {
  46. // 释放读锁
  47. rwLock.readLock().unlock();
  48. }
  49. return result;
  50. }
  51. }
  52. public class ReadWriteLockDemo {
  53. public static void main(String[] args) throws InterruptedException {
  54. MyCache myCache = new MyCache();
  55. // 创建线程放数据
  56. for (int i = 1; i <=5; i++) {
  57. final int num = i;
  58. new Thread(()->{
  59. myCache.put(num + "", num + "");
  60. },String.valueOf(i)).start();
  61. }
  62. // TimeUnit.MICROSECONDS.sleep(300);
  63. // 创建线程取数据
  64. for (int i = 1; i <=5; i++) {
  65. final int num = i;
  66. new Thread(()->{
  67. myCache.get(num + "");
  68. },String.valueOf(i)).start();
  69. }
  70. }
  71. }

执行结果:
image.png

4、读写锁的演变

image.png

5、读写锁的降级

image.png
image.png
image.png

6、小结(重要)

1)在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)
2)在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)

原因:当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

九、阻塞队列

1、BlockingQueue 简介

Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员,包括他们各自的功能以及常见使用场景。
阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
image.png

1)当队列是空的,从队列中获取元素的操作将会被阻塞
2)当队列是满的,从队列中添加元素的操作将会被阻塞
3)试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
4)试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

常用的队列主要有以下两种:
1)先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
2)后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起

为什么需要 BlockingQueue:
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue 都给你一手包办了

在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

1)当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
2)当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒

2、BlockingQueue 核心方法

image.png
image.png

2.1 放入数据

1)offer(anObject):表示如果可能的话,将 anObject 加到BlockingQueue 里,即如果BlockingQueue 可以容纳,则返回 true,否则返回false.(本方法不阻塞当前执行方法的线程)
2)offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败
3)put(anObject):把 anObject 加到BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续

2.2 获取数据

1)poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等_time 参数规定的时间,取不到时返回_null
2)poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象, 如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
3)take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到_BlockingQueue 有新的数据被加入_;
4)drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

3、入门案例

image.png
image.png
image.png
image.png

4、常见的BlockingQueue

4.1 ArrayBlockingQueue(常用)

基于数组的阻塞队列实现,在ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC 的影响还是存在一定的区别。而在创建ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
==一句话总结: 由数组结构组成的有界阻塞队列。==

4.2 LinkedBlockingQueue(常用)

基于链表的阻塞队列,同ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
ArrayBlockingQueue 和_LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。_
一句话总结: 由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列

4.3 DelayQueue

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
一句话总结: 使用优先级队列实现的延迟无界阻塞队列

4.4 PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator 对象来决定),但需要注意的是PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者
因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁
一句话总结: 支持优先级排序的无界阻塞队列

4.5 SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。

公平模式和非公平模式的区别:
1)公平模式:SynchronousQueue 会采用公平锁,并配合一个FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
2)非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个LIFO 队列来管理多余的生产者和消费者,而后一种模式, 如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
一句话总结: 不存储元素的阻塞队列,也即单个元素的队列

4.6 LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和transfer 方法。
LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null 的节点,生产者线程就不入队了,直接就将元素填充到
该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
==一句话总结: 由链表组成的无界阻塞队列。==

4.7 LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。
对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况
1)插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出InterruptedException 异常
2)读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
==一句话总结: 由链表组成的双向阻塞队列==

5、小结

1. 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件__满足,被挂起的线程又会自动被唤起
2. _为什么需要_BlockingQueue?
在concurrent 包发布以前,在多线程环境下, 我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了

十、ThreadPool 线程池

1、线程池简介

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
例子:10 年前单核CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。

线程池的优势:__ 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量, 超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:
1)降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
2)提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
3)提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
4)Java 中的线程池是通过_Executor 框架实现的,该框架中用到了ExecutorExecutors(类似工具类)ExecutorServiceThreadPoolExecutor 这几个类_
image.png

2、线程池参数说明

image.png

2.1 常用参数(重点)

1)corePoolSize:线程池的核心线程数
2)maximumPoolSize:能容纳的最大线程数
3)keepAliveTime:空闲线程存活时间
4)unit:存活的时间单位
5)workQueue:存放提交但未执行任务的队列
6)threadFactory:创建线程的工厂类
7)handler:等待队列满后的拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(_workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略_
image.png

2.2 拒绝策略(重点)

CallerRunsPolicy:当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy(默认):丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy:直接丢弃,其他啥都没有
DiscardOldestPolicy:当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
image.png

3、线程池的种类与创建

3.1 newCachedThreadPool(常用)

作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新
建线程。线程池根据需求创建线程,可扩容,遇强则强
特点
1)线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
2)线程池中的线程可进行缓存重复利用和回收(回收默认时间为1 分钟)
3)当线程池中,没有可用线程,会重新创建一个线程
创建方式:

  1. /**
  2. * 可缓存线程池
  3. * @return
  4. */
  5. public static ExecutorService newCachedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(0,
  16. Integer.MAX_VALUE,
  17. 60L,
  18. TimeUnit.SECONDS,
  19. new SynchronousQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景
image.png

3.2 newFixedThreadPool(常用)

作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。一池N线程

特征:
1)线程池中的线程处于一定的量,可以很好的控制线程的并发量
2)线程可以重复被使用,在显示关闭之前,都将一直存在
3)超出一定量的线程被提交时候需在队列中等待
创建方式:

  1. /**
  2. * 固定长度线程池
  3. * @return
  4. */
  5. public static ExecutorService newFixedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(10,
  16. 10,
  17. 0L,
  18. TimeUnit.SECONDS,
  19. new LinkedBlockingQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景
image.png

3.3 newSingleThreadExecutor(常用)

作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程, 那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
一个任务一个任务执行,一池一线程

特征:线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
创建方式:

  1. /**
  2. * 单一线程池
  3. * @return
  4. */
  5. public static ExecutorService newSingleThreadExecutor(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(1,
  16. 1,
  17. 0L,
  18. TimeUnit.SECONDS,
  19. new LinkedBlockingQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景
image.png

3.4 newScheduleThreadPool(了解)

作用线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池
特征:**
(1)线程池中具有指定数量的线程,即便是空线程也将保留
(2)可定时或者延迟执行线程活动
创建方式:

  1. public static ScheduledExecutorService newScheduledThreadPool(
  2. int corePoolSize, ThreadFactory threadFactory) {
  3. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  4. }

场景: 适用于需要多个后台线程执行周期任务的场景

3.5 newWorkStealingPool

jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务
创建方式:

  1. public static ExecutorService newWorkStealingPool(int parallelism) {
  2. /**
  3. * parallelism:并行级别,通常默认为 JVM 可用的处理器个数
  4. * factory:用于创建 ForkJoinPool 中使用的线程。
  5. * handler:用于处理工作线程未处理的异常,默认为 null
  6. * asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列
  7. */
  8. return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  9. null,true);
  10. }

场景适用于大耗时,可并行执行的场景

4、线程池入门案例

场景火车站_3 个售票口,10个用户买票_

  1. package com.atguigu.pool;
  2. import java.util.concurrent.*;
  3. // 自定义线程池创建
  4. public class ThreadPoolDemo3 {
  5. /**
  6. * 火车站 3 个售票口, 10 个用户买票
  7. * @param args
  8. */
  9. public static void main(String[] args) {
  10. //定时线程次:线程数量为 3---窗口数为 3
  11. ExecutorService threadService = new ThreadPoolExecutor(3,
  12. 3,
  13. 60L,
  14. TimeUnit.SECONDS,
  15. new LinkedBlockingQueue<>(),
  16. Executors.defaultThreadFactory(),
  17. new ThreadPoolExecutor.DiscardOldestPolicy());
  18. try {
  19. // 10个人买票
  20. for (int i = 1; i <= 10; i++) {
  21. threadService.execute(()->{
  22. try {
  23. System.out.println(Thread.currentThread().getName() + " 窗口,开始卖票");
  24. Thread.sleep(5000);
  25. System.out.println(Thread.currentThread().getName() + " 窗口买票结束");
  26. }catch (Exception e){
  27. e.printStackTrace();
  28. }
  29. });
  30. } }catch (Exception e){
  31. e.printStackTrace();
  32. }finally {
  33. // 完成后结束
  34. threadService.shutdown();
  35. }
  36. }
  37. }

image.png

5、线程池底层工作原理(重点)

image.png
1、在创建了线程池后,线程池中的线程数为零
2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
1)如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;(任务1、任务2)
2)如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列; (任务3、4、5)
3)如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务; (任务6、7、8)
4)如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。(任务9、10)
3、当一个线程完成任务时,它会从队列中取下一个任务来执行
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
1)如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉(停掉非核心线程)。
2)所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
image.png

6、自定义线程池

  1. package com.atguigu.pool;
  2. import java.util.concurrent.*;
  3. // 自定义线程池创建
  4. public class ThreadPoolDemo2 {
  5. public static void main(String[] args) {
  6. ExecutorService threadPool = new ThreadPoolExecutor(
  7. 3, // 核心线程数
  8. 5, // 最大线程数
  9. 2L, // 存活时间
  10. TimeUnit.SECONDS,
  11. new ArrayBlockingQueue<>(3), // 阻塞队列
  12. Executors.defaultThreadFactory(), // 线程工厂
  13. new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
  14. );
  15. // 10个顾客请求
  16. try {
  17. for (int i = 1; i <=10; i++) {
  18. // 执行
  19. threadPool.execute(()->{
  20. System.out.println(Thread.currentThread().getName()+" 办理业务");
  21. });
  22. }
  23. }catch (Exception e) {
  24. e.printStackTrace();
  25. }finally {
  26. // 关闭
  27. threadPool.shutdown();
  28. }
  29. }
  30. }

image.png

7、注意事项(重点)

1)项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是FixedThreadPool 和SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为Integer.MAX_VALUE, 容易导致OOM。所以实际生产一般自己通过ThreadPoolExecutor 的7 个参数,自定义线程池
2)创建线程池推荐适用ThreadPoolExecutor 及其7 个参数手动创建
o corePoolSize:线程池的核心线程数
o maximumPoolSize:能容纳的最大线程数
o keepAliveTime:空闲线程存活时间
o unit:存活的时间单位
o workQueue:存放提交但未执行任务的队列
o threadFactory:创建线程的工厂类
o handler:等待队列满后的拒绝策略
3)为什么不允许适用不允许Executors.的方式手动创建线程池,如下图
image.png

十一、Fork/Join

1、Fork/Join 框架简介

Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事情:
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
image.png
image.png
1)任务分割:首先Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
2)执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里, 启动一个线程从队列里取数据,然后合并这些数据。

在 Java 的 Fork/Join 框架中,使用两个类完成上述操作
1)ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行fork 和join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:
a、RecursiveAction:用于没有返回结果的任务
b、RecursiveTask:用于有返回结果的任务
2)ForkJoinPool:ForkJoinTask 需要通过ForkJoinPool 来执行
3)RecursiveTask:继承后可以实现递归(自己调自己)调用的任务

Fork/Join 框架的实现原理
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而ForkJoinWorkerThread 负责执行这些任务。

2、Fork 方法

image.png
image.png
Fork 方法的实现原理:当我们调用ForkJoinTask 的fork 方法时,程序会把任务放在ForkJoinWorkerThread 的pushTask 的workQueue 中,异步地执行这个任务,然后立即返回结果

  1. public final ForkJoinTask<V> fork() {
  2. Thread t;
  3. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
  4. ((ForkJoinWorkerThread)t).workQueue.push(this);
  5. else
  6. ForkJoinPool.common.externalPush(this);
  7. return this;
  8. }

pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:

  1. final void push(ForkJoinTask<?> task) {
  2. ForkJoinTask<?>[] a; ForkJoinPool p;
  3. int b = base, s = top, n;
  4. if ((a = array) != null) { // ignore if queue removed
  5. int m = a.length - 1; // fenced write for task visibility
  6. U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
  7. U.putOrderedInt(this, QTOP, s + 1);
  8. if ((n = s - b) <= 1) {
  9. if ((p = pool) != null) p.signalWork(p.workQueues, this);//执行
  10. }
  11. else if (n >= m)
  12. growArray();
  13. }
  14. }

3、join 方法

Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看ForkJoinTask 的 join 方法的实现,代码如下:

  1. public final V join() {
  2. int s;
  3. if ((s = doJoin() & DONE_MASK) != NORMAL)
  4. reportException(s);
  5. return getRawResult();
  6. }

它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有 4 种:
==已完成(_NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)_==
1)如果任务状态是已完成,则直接返回任务结果。
2)如果任务状态是被取消,则直接抛出CancellationException
3)如果任务状态是抛出异常,则直接抛出对应的异常

让我们分析一下 doJoin 方法的实现

  1. private int doJoin() {
  2. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueuew;
  3. return (s = status) < 0 ? s :
  4. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue).
  5. tryUnpush(this) && (s = doExec()) < 0 ? s :
  6. wt.pool.awaitJoin(w, this, 0L) :
  7. externalAwaitDone();
  8. }
  9. final int doExec() {
  10. int s; boolean completed;
  11. if ((s = status) >= 0) {
  12. try {
  13. completed = exec();
  14. } catch (Throwable rex) {
  15. return setExceptionalCompletion(rex);
  16. }
  17. if (completed) s = setCompletion(NORMAL);
  18. }
  19. return s;
  20. }

在 doJoin()方法流程如下:
1. 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
2. 如果没有执行完,则从任务数组里取出任务并执行。
3. 如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。

4、Fork/Join 框架的异常处理

ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask 提供了 isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的getException 方法获取异常。
getException 方法返回 Throwable 对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回 null。

5、入门案例

场景: _生成一个计算任务,计算 _1+2+3 +1000==每 _100 个数切分一个子任务_==
image.png

  1. package com.atguigu.forkjoin;
  2. import java.util.concurrent.*;
  3. class MyTask extends RecursiveTask<Integer> {
  4. // 拆分差值不能超过10,计算10以内运算
  5. private static final Integer VALUE = 10;
  6. private int begin ; // 拆分开始值
  7. private int end; // 拆分结束值
  8. private int result ; // 返回结果
  9. // 创建有参数构造
  10. public MyTask(int begin,int end) {
  11. this.begin = begin;
  12. this.end = end;
  13. }
  14. // 拆分和合并过程
  15. @Override
  16. protected Integer compute() {
  17. // 判断相加两个数值是否大于10
  18. if((end - begin) <= VALUE) {
  19. // 相加操作
  20. for (int i = begin; i <= end; i++) {
  21. result = result + i;
  22. }
  23. } else {// 进一步拆分
  24. // 获取中间值
  25. int middle = (begin + end)/2;
  26. // 递归调用,切分为2个小任务 -> 拆分左边
  27. MyTask task01 = new MyTask(begin, middle);
  28. // 递归调用,切分为2个小任务 -> 拆分左边
  29. MyTask task02 = new MyTask(middle + 1, end);
  30. // 调用方法拆分,异步
  31. task01.fork();
  32. task02.fork();
  33. // 合并结果,同步阻塞获取执行结果
  34. result = task01.join() + task02.join();
  35. }
  36. return result;
  37. }
  38. }
  39. public class ForkJoinDemo {
  40. public static void main(String[] args) throws ExecutionException, InterruptedException {
  41. // 创建MyTask对象
  42. MyTask myTask = new MyTask(0,100);
  43. // 创建分支合并池对象
  44. ForkJoinPool forkJoinPool = new ForkJoinPool();
  45. ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
  46. // 获取最终合并之后结果
  47. Integer result = forkJoinTask.get();
  48. System.out.println(result);
  49. // 关闭池对象
  50. forkJoinPool.shutdown();
  51. }
  52. }

十二、CompletableFuture

image.png

  1. package com.atguigu.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. //异步调用
  5. public class CompletableFutureDemo {
  6. public static void main(String[] args) throws Exception {
  7. // 异步调用没有返回值:runAsync
  8. CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
  9. System.out.println(Thread.currentThread().getName()+" : CompletableFuture1");
  10. });
  11. completableFuture1.get();
  12. // mq消息队列
  13. // 异步调用有返回值:supplyAsync
  14. CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
  15. System.out.println(Thread.currentThread().getName()+" : CompletableFuture2");
  16. // 模拟异常
  17. int i = 10 / 0;
  18. return 1024;
  19. });
  20. completableFuture2.whenComplete((t,u)->{
  21. System.out.println("------t = " + t); // 返回值1024,若有异常,返回null
  22. System.out.println("------u = " + u); // 获取异常:java.lang.ArithmeticException: / by zero
  23. }).get();
  24. }
  25. }

1、CompletableFuture 简介

CompletableFuture 在Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了Future, CompletionStage 接口,实现了Future 接口就可以兼容现在有线程池框架,而CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。

2、Future 与CompletableFuture

Futrue 在Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future 里面有isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future 的主要缺点如下:
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果, 现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future 不支持回调函数,所以无法实现这个功能
(3)不支持链式调用
对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。
(4)不支持多个Future 合并
比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后, 执行某些函数,是没法通过 Future 实现的。
(5) 不支持异常处理
Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。

3、CompletableFuture 入门

3.1 使用CompletableFuture

场景:主线程里面创建一个_CompletableFuture,然后主线程调用get 方法会阻塞,最后我们在一个子线程中使其终止。_

  1. /**
  2. * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们
  3. 在一个子线程中使其终止
  4. * @param args
  5. */
  6. public static void main(String[] args) throws Exception{
  7. CompletableFuture<String> future = new CompletableFuture<>();
  8. new Thread(() -> {
  9. try{
  10. System.out.println(Thread.currentThread().getName() + "子线程开始干活");
  11. // 子线程睡 5 秒
  12. Thread.sleep(5000);
  13. // 在子线程中完成主线程
  14. future.complete("success");
  15. }catch (Exception e){
  16. e.printStackTrace();
  17. }
  18. }, "A").start();
  19. // 主线程调用 get 方法阻塞
  20. System.out.println("主线程调用 get 方法获取结果为: " + future.get());
  21. System.out.println("主线程完成,阻塞结束!!!!!!");
  22. }

3.2 没有返回值的异步任务

  1. /**
  2. * 没有返回值的异步任务
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. //运行一个没有返回值的异步任务
  8. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  9. try {
  10. System.out.println("子线程启动干活");
  11. Thread.sleep(5000);
  12. System.out.println("子线程完成");
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. });
  17. // 主线程阻塞
  18. future.get();
  19. System.out.println("主线程结束");
  20. }

3.3 有返回值的异步任务

  1. /**
  2. * 没有返回值的异步任务
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. // 运行一个有返回值的异步任务
  8. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  9. try {
  10. System.out.println("子线程开始任务");
  11. Thread.sleep(5000);
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. return "子线程完成了!";
  16. });
  17. // 主线程阻塞
  18. String s = future.get();
  19. System.out.println("主线程结束, 子线程的结果为:" + s);
  20. }

3.4 线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

  1. private static Integer num = 10;
  2. /**
  3. * 先对一个数加 10,然后取平方
  4. * @param args
  5. */
  6. public static void main(String[] args) throws Exception{
  7. System.out.println("主线程开始");
  8. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  9. try {
  10. System.out.println("加 10 任务开始");
  11. num += 10;
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. return num;
  16. }).thenApply(integer -> {
  17. return num * num;
  18. });
  19. Integer integer = future.get();
  20. System.out.println("主线程结束, 子线程的结果为:" + integer);
  21. }

3.5 消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. return num;
  11. }).thenApply(integer -> {
  12. return num * num;
  13. }).thenAccept(new Consumer<Integer>() {
  14. @Override
  15. public void accept(Integer integer) {
  16. System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + integer);
  17. }
  18. });
  19. }

3.6 异常处理

exceptionally 异常处理,出现异常时触发

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  4. int i= 1/0;
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. return num;
  8. }).exceptionally(ex -> {
  9. System.out.println(ex.getMessage());
  10. return -1;
  11. });
  12. System.out.println(future.get());
  13. }

handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("加 10 任务开始");
  5. num += 10;
  6. return num;
  7. }).handle((i,ex) ->{
  8. System.out.println("进入 handle 方法");
  9. if(ex != null){
  10. System.out.println("发生了异常,内容为:" + ex.getMessage());
  11. return -1; }else{
  12. System.out.println("正常完成,内容为: " + i);
  13. return i; }
  14. });
  15. System.out.println(future.get());
  16. }

3.7 结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. // 第一步加 10
  4. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. return num;
  8. });
  9. // 合并
  10. CompletableFuture<Integer> future1 = future.thenCompose(i ->
  11. // 再来一个 CompletableFuture
  12. CompletableFuture.supplyAsync(() -> {
  13. return i + 1;
  14. }));
  15. System.out.println(future.get());
  16. System.out.println(future1.get());
  17. }

thenCombine 合并两个没有依赖关系的 CompletableFutures 任务

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("加 10 任务开始");
  5. num += 10;
  6. return num;
  7. });
  8. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("乘以 10 任务开始");
  10. num = num * 10;
  11. return num;
  12. });
  13. // 合并两个结果
  14. CompletableFuture<Object> future = job1.thenCombine(job2,
  15. new BiFunction<Integer, Integer, List<Integer>>() {
  16. @Override
  17. public List<Integer> apply(Integer a, Integer b) {
  18. List<Integer> list = new ArrayList<>();
  19. list.add(a);
  20. list.add(b);
  21. return list;
  22. }
  23. });
  24. System.out.println("合并结果为:" + future.get());
  25. }

合并多个任务的结果allOf 与anyOf
allOf:一系列独立的future 任务,等其所有的任务执行完后做一些事情

  1. /**
  2. * 先对一个数加 10,然后取平方
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. List<CompletableFuture> list = new ArrayList<>();
  8. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("加 10 任务开始");
  10. num += 10;
  11. return num;
  12. });
  13. list.add(job1);
  14. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  15. System.out.println("乘以 10 任务开始");
  16. num = num * 10;
  17. return num;
  18. });
  19. list.add(job2);
  20. CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  21. System.out.println("减以 10 任务开始");
  22. num = num * 10;
  23. return num;
  24. });
  25. list.add(job3);
  26. CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  27. System.out.println("除以 10 任务开始");
  28. num = num * 10;
  29. return num;
  30. });
  31. list.add(job4);
  32. //多任务合并
  33. List<Integer> collect =
  34. list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
  35. System.out.println(collect);
  36. }

anyOf:只要在多个future 里面有一个返回,整个任务就可以结束,而不需要等到每一个future 结束

  1. /**
  2. * 先对一个数加 10,然后取平方
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. CompletableFuture<Integer>[] futures = new CompletableFuture[4];
  8. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  9. try{
  10. Thread.sleep(5000);
  11. System.out.println("加 10 任务开始");
  12. num += 10;
  13. return num;
  14. }catch (Exception e){
  15. return 0;
  16. }
  17. });
  18. futures[0] = job1;
  19. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  20. try{
  21. Thread.sleep(2000);
  22. System.out.println("乘以 10 任务开始");
  23. num = num * 10;
  24. return num;
  25. }catch (Exception e){
  26. return 1;
  27. }
  28. });
  29. futures[1] = job2;
  30. CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  31. try{
  32. Thread.sleep(3000);
  33. System.out.println("减以 10 任务开始");
  34. num = num * 10;
  35. return num;
  36. }catch (Exception e){
  37. return 2;
  38. }
  39. });
  40. futures[2] = job3;
  41. CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  42. try{
  43. Thread.sleep(4000);
  44. System.out.println("除以 10 任务开始");
  45. num = num * 10;
  46. return num;
  47. }catch (Exception e){
  48. return 3;
  49. }
  50. });
  51. futures[3] = job4;
  52. CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
  53. System.out.println(future.get());
  54. }