1、JUC概述

1.1、什么是JUC

JUC实际上就是我们对于jdk中java.util .concurrent 工具包的简称。这个包下都是Java处理线程相关的类,自jdk1.5后出现。
此包中增加了在并发编程中很常用的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步 IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等。
image.png

1.2、进程和线程

1.2.1、进程

指系统中正在运行的一个应用程序;程序一旦运行技术进程;进程—-资源分配的最小单位是操作系统结构的基础。 在当代面向线程 设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。

1.2.2、线程

系统分配处理器时间资源的基本单位,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,
一个进程中可以并发多个线程,每条线程并行执行不同的任务。

总结来说:
进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程——资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。

1.3、线程的状态

1.3.1、线程状态枚举类

Thread.Stat

  1. public enum State {
  2. /**
  3. * Thread state for a thread which has not yet started.
  4. */
  5. NEW,(新建)
  6. /**
  7. * Thread state for a runnable thread. A thread in the runnable
  8. * state is executing in the Java virtual machine but it may
  9. * be waiting for other resources from the operating system
  10. * such as processor.
  11. */
  12. RUNNABLE,(准备就绪)
  13. /**
  14. * Thread state for a thread blocked waiting for a monitor lock.
  15. * A thread in the blocked state is waiting for a monitor lock
  16. * to enter a synchronized block/method or
  17. * reenter a synchronized block/method after calling
  18. * {@link Object#wait() Object.wait}.
  19. */
  20. BLOCKED,(阻塞)
  21. /**
  22. * Thread state for a waiting thread.
  23. * A thread is in the waiting state due to calling one of the
  24. * following methods:
  25. * <ul>
  26. * <li>{@link Object#wait() Object.wait} with no timeout</li>
  27. * <li>{@link #join() Thread.join} with no timeout</li>
  28. * <li>{@link LockSupport#park() LockSupport.park}</li>
  29. * </ul>
  30. ** <p>A thread in the waiting state is waiting for another thread to
  31. * perform a particular action.
  32. *
  33. * For example, a thread that has called <tt>Object.wait()</tt>
  34. * on an object is waiting for another thread to call
  35. * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
  36. * that object. A thread that has called <tt>Thread.join()</tt>
  37. * is waiting for a specified thread to terminate.
  38. */
  39. WAITING,(不见不散)
  40. /**
  41. * Thread state for a waiting thread with a specified waiting time.
  42. * A thread is in the timed waiting state due to calling one of
  43. * the following methods with a specified positive waiting time:
  44. * <ul>
  45. * <li>{@link #sleep Thread.sleep}</li>
  46. * <li>{@link Object#wait(long) Object.wait} with timeout</li>
  47. * <li>{@link #join(long) Thread.join} with timeout</li>
  48. * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
  49. * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
  50. * </ul>
  51. */
  52. TIMED_WAITING,(过时不候)
  53. /**
  54. * Thread state for a terminated thread.
  55. * The thread has completed execution.
  56. */
  57. TERMINATED;(终结) }

1.3.2、wait/sleep的区别
  1. sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何对象实例都能调用。
  2. sleep 不会释放锁,它也不需要占用锁。wait 会释放锁,但调用它的前提是当前线程占有锁(即代码要在 synchronized 中)。
  3. 它们都可以被 interrupted 方法中断

    1.4 并发与并行

    1.4.1、串行模式
    串行 表示所有任务都一一按先后顺序进行。串行意味着必须先装完一车柴才能运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤
    串行是一次只取得一个任务,并且执行这个任务
    1.4.2 并行模式
    并行 意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核 CPU。
    1.4.3 并发·
    并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,==并发的重点在于它是一种现象==, ==并发描述的是多进程同时运行的现象==。但实际上,对于单核心 CPU 来说,同一时刻只能运行一个线程。所以,这里的”同时运行”表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占 CPU 的,而是执行一会停一会。

要解决大并发问题,通常是将大任务分解成多个小任务, 由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。这可能会出现一些现象:

  • 可能出现一个小任务执行了多次,还没开始下个任务的情况。这时一般会采用队列或类似的数据结构来存放各个小任务的成果
  • 可能出现还没准备好第一步就执行第二步的可能。这时,一般采用多路复用或异步的方式,比如只有准备好产生了事件通知才执行某个任务。
  • 可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这些小任务,这时很可能要配合多路复用才能达到较高的效率
    1.4.4 小结(重点)
    并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
    例子:春运抢票 电商秒杀…
    并行:多项工作一起执行,之后再汇总
    例子:泡方便面,电水壶烧水,一边撕调料倒入桶中

    1.5、管程(Monitor)

    管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现)一种同步机制.但是这样并不能保证进程以设计的顺序执行JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程(monitor)对象,管程(monitor)会随着 java 对象一同创建和销毁执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程

    1.6、用户线程和守护线程

    用户线程:平时用到的普通线程,自定义线程
    守护线程:运行在后台,是一种特殊的线程,比如垃圾回收
    当主线程结束后,用户线程还在运行,JVM 存活
    如果没有用户线程,都是守护线程,JVM 结束 ```java package cn.wen; public class Main { public static void main(String[] args) {
    1. Thread aa = new Thread(() -> {
    2. System.out.println(Thread.currentThread().getName() + "::" + Thread.currentThread().isDaemon());
    3. while (true){
    4. // 当前线程一直运行
    5. }
    6. }, "aa");
    7. // 设置守护线程
    8. aa.setDaemon(true);
    9. aa.start();
    10. System.out.println(Thread.currentThread().getName()+" over");
    } }

// 输出 main over

  1. <a name="qK3vq"></a>
  2. ### 2、Lock 接口
  3. <a name="gAFTP"></a>
  4. #### 2.1、Synchronized
  5. <a name="mRMfZ"></a>
  6. ##### 2.1.1、Synchronized 关键字回顾
  7. synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:
  8. 1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象;
  9. 1. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;
  10. 1. 虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了
  11. 1. 修改一个静态的方法,其作用的范围是整个静态方法,**作用的对象是这个类的所有对象**;
  12. 1. 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主的对象是这个类的所有对象。
  13. ```java
  14. // 第一步 创建资源类 定义属性和操作方法
  15. class Ticket{
  16. // 票数
  17. private int number = 30;
  18. //操作方法:卖票
  19. public synchronized void sale(){
  20. // 判断
  21. if (number >0){
  22. System.out.println(Thread.currentThread().getName()+" :卖出:"+(number--)+" 剩下:"+number);
  23. }
  24. }
  25. }
  26. public class SaleTicket {
  27. // 第二步 创建多个线程,调用资源类的操作方法
  28. public static void main(String[] args) {
  29. // 创建Ticket对象
  30. Ticket ticket = new Ticket();
  31. // 创建第三个线程 匿名内部类
  32. new Thread(new Runnable() {
  33. @Override
  34. public void run() {
  35. for (int i = 0; i < 40; i++){
  36. ticket.sale();
  37. }
  38. }
  39. },"AA").start();
  40. new Thread(new Runnable() {
  41. @Override
  42. public void run() {
  43. for (int i = 0; i < 40; i++) {
  44. ticket.sale();
  45. }
  46. }
  47. },"BB").start();
  48. new Thread(new Runnable() {
  49. @Override
  50. public void run() {
  51. for (int i = 0; i < 40; i++) {
  52. ticket.sale();
  53. }
  54. }
  55. },"CC").start();
  56. }
  57. }

如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况
1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
2)线程执行发生异常,此时 JVM 会让线程自动释放锁。
那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过 Lock 就可以办到。

2.2、什么是Lock

Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock 提供了比 synchronized 更多的功能。
Lock 与的 Synchronized 区别

  • Lock 不是 Java 语言内置的,synchronized 是 Java 语言的关键字,因此是内置特性。Lock 是一个类,通过这个类可以实现同步访问;
  • Lock 和 synchronized 有一点非常大的不同,采用 synchronized 不需要用户去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后,系统会自动让线程释放对锁的占用;而 Lock 则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象
    2.2.1 Lock接口
    1. public interface Lock {
    2. void lock();
    3. void lockInterruptibly() throws InterruptedException;
    4. boolean tryLock();
    5. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    6. void unlock();
    7. Condition newCondition();
    8. }
    2.2.2 lock
    lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
    采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在 try{}catch{}块中进行,并且将释放锁的操作放在finally 块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用 Lock来进行同步的话,是以下面这种形式去使用的lock实现售票
    1. Lock lock = ...;
    2. lock.lock();
    3. try{
    4. //处理任务
    5. }catch(Exception ex){
    6. }finally{
    7. lock.unlock(); //释放锁
    8. }
    2.2.3 newCondition
    关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类也可以实现等待/通知模式。
    用 notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以进行选择性通知, Condition 比较常用的两个方法:
    • await()会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重新获得锁并继续执行。
    • signal()用于唤醒一个等待的线程。
    注意:在调用 Condition 的 await()/signal()方法前,也需要线程持有相关的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行。

    2.3 ReentrantLock

    ReentrantLock,意思是“可重入锁”,关于可重入锁的概念将在后面讲述。
    ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法。下面通过一些实例看具体看一下如何使用
    1. public class Test {
    2. private ArrayList<Integer> arrayList = new ArrayList<Integer>();
    3. public static void main(String[] args) {
    4. final Test test = new Test();
    5. new Thread(){
    6. public void run() {
    7. test.insert(Thread.currentThread());
    8. };
    9. }.start();
    10. new Thread(){
    11. public void run() {
    12. test.insert(Thread.currentThread());
    13. };
    14. }.start();
    15. }
    16. public void insert(Thread thread) {
    17. Lock lock = new ReentrantLock(); //注意这个地方
    18. lock.lock();
    19. try {
    20. System.out.println(thread.getName()+"得到了锁");
    21. for(int i=0;i<5;i++) {
    22. arrayList.add(i);
    23. }
    24. } catch (Exception e) {
    25. // TODO: handle exception
    26. }finally {
    27. System.out.println(thread.getName()+"释放了锁");
    28. lock.unlock();
    29. } } }

    2.4 ReadWriteLock

    ReadWriteLock 也是一个接口,在它里面只定义了两个方法:
    1. public interface ReadWriteLock {
    2. /**
    3. * Returns the lock used for reading.
    4. *
    5. * @return the lock used for reading.
    6. */
    7. Lock readLock();
    8. /**
    9. * Returns the lock used for writing.
    10. *
    11. * @return the lock used for writing.
    12. */
    13. Lock writeLock();
    14. }
    一个用来获取读锁,一个用来获取写锁。
    读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的(排他的)。 每次只能有一个写线程,但是可以有多个线程并发地读数据。
    所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。
    ③ 互斥原则
    • 读-读能共存,
    • 读-写不能共存,
    • 写-写不能共存。

也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的 ReentrantReadWriteLock 实现了 ReadWriteLock 接口。
ReentrantReadWriteLock 里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和 writeLock()用来获取读锁和写锁。
下面通过几个例子来看一下 ReentrantReadWriteLock 具体用法。
假如有多个线程要同时进行读操作的话,先看一下 synchronized 达到的效果:

  1. package cn.wen;
  2. import java.util.concurrent.locks.ReentrantReadWriteLock;
  3. public class Test {
  4. private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5. public static void main(String[] args) {
  6. final Test test = new Test();
  7. new Thread() {
  8. public void run() {
  9. test.get(Thread.currentThread());
  10. };
  11. }.start();
  12. new Thread() {
  13. public void run() {
  14. test.get(Thread.currentThread());
  15. };
  16. }.start();
  17. }
  18. public synchronized void get(Thread thread) {
  19. long start = System.currentTimeMillis();
  20. while (System.currentTimeMillis() - start <= 1) {
  21. System.out.println(thread.getName() + "正在进行读操作");
  22. }
  23. System.out.println(thread.getName() + "读操作完毕");
  24. }
  25. }

改成读写锁

  1. package cn.wen;
  2. import java.util.concurrent.locks.ReentrantReadWriteLock;
  3. public class Test {
  4. private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5. public static void main(String[] args) {
  6. final Test test = new Test();
  7. new Thread() {
  8. public void run() {
  9. test.get(Thread.currentThread());
  10. };
  11. }.start();
  12. new Thread() {
  13. public void run() {
  14. test.get(Thread.currentThread());
  15. };
  16. }.start();
  17. }
  18. public void get(Thread thread) {
  19. rwl.readLock().lock();
  20. try {
  21. long start = System.currentTimeMillis();
  22. while (System.currentTimeMillis() - start <= 1) {
  23. System.out.println(thread.getName() + "正在进行读操作");
  24. }
  25. System.out.println(thread.getName() + "读操作完毕");
  26. } finally {
  27. rwl.readLock().unlock();
  28. }
  29. }
  30. }

说明 thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率。
注意:

  • 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
  • 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

    2.5 小结(重点)

    Lock 和 synchronized 有以下几点不同:
  1. Lock 是一个接口,而 synchronized 是 Java 中的关键字,synchronized 是内置的语言实现;
  2. synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁;
  3. Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断;
  4. 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
  5. Lock 可以提高多个线程进行读操作的效率。

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于 synchronized。

3、线程间通信

线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析
场景—-两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求
用线程间通信

3.1 synchronized 方案

  1. package cn.wen.sync;
  2. // 第一步 创建资源类 定义属性和车种方法
  3. class Share{
  4. // 初始值
  5. private int number = 0;
  6. // +1 的方法
  7. public synchronized void incr() throws InterruptedException {
  8. // 第二步 判断 干活 通知
  9. while (number != 0){
  10. this.wait();
  11. }
  12. // 如果number为0 则+1
  13. number++;
  14. System.out.println(Thread.currentThread().getName()+"::"+number);
  15. // 通知其他线程
  16. this.notifyAll();
  17. }
  18. // -1 的方法
  19. public synchronized void decr() throws InterruptedException {
  20. // 第二步 判断 干活 通知
  21. // 需要使用while 解决虚假唤醒
  22. while (number != 1){
  23. this.wait();
  24. }
  25. // 如果number为0 则+1
  26. number--;
  27. System.out.println(Thread.currentThread().getName()+"::"+number);
  28. // 通知其他线程
  29. this.notifyAll();
  30. }
  31. }
  32. public class ThreadDemo1 {
  33. public static void main(String[] args) {
  34. Share share = new Share();
  35. // 创建线程
  36. new Thread(() ->{
  37. for (int i = 0; i <= 10; i++) {
  38. try {
  39. share.incr();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. } finally {
  43. }
  44. }
  45. },"AA").start();
  46. new Thread(() ->{
  47. for (int i = 0; i <= 10; i++) {
  48. try {
  49. share.decr();
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. } finally {
  53. }
  54. }
  55. },"BB").start();
  56. }
  57. }

出现虚假唤醒的原因:

  1. 生产者唤醒了所有处于阻塞队列中的线程,我们希望的是生产者A唤醒的应该是两个消费者,而不是唤醒了生产者B
  2. 我们都知道,wait方法的作用是将线程停止执行并送入到阻塞队列中,但是wait方法还有一个操作就是释放锁。因此当生产者A执行wait方法时,该线程就会把它持有的对象锁释放,这样生产者B就可以拿到锁进入synchronized修饰的push方法中,即使它被卡在if判断,但被唤醒后它就会又添加一个产品了。

当wait在前一次不符合添加执行了wait 这个对象还会在当前位置 如果下次被唤醒的话就会执行下去
将if 判断改为while 再次进行判断就不会出现虚假唤醒

3.2、Lock的重入锁实现

  1. // 定义资源类 定义属性和方法
  2. class Share{
  3. // 初始值
  4. private int number = 0;
  5. // 创建Lock 可重入lock
  6. private Lock lock= new ReentrantLock();
  7. private Condition condition = lock.newCondition();
  8. // 操作方法 +1
  9. public void incr() throws InterruptedException {
  10. // 上锁
  11. lock.lock();
  12. try {
  13. // 判断
  14. while (number != 0){
  15. condition.wait();
  16. }
  17. // 干活
  18. number++;
  19. System.out.println(Thread.currentThread().getName()+"::"+number);
  20. // 通知
  21. condition.signalAll();
  22. }finally {
  23. // 解锁
  24. lock.unlock();
  25. }
  26. }
  27. // 操作方法 -1
  28. public void decr() throws InterruptedException {
  29. // 上锁
  30. lock.lock();
  31. try {
  32. // 判断
  33. while (number != 1){
  34. condition.wait();
  35. }
  36. // 干活
  37. number--;
  38. System.out.println(Thread.currentThread().getName()+"::"+number);
  39. // 通知
  40. condition.signalAll();
  41. }finally {
  42. // 解锁
  43. lock.unlock();
  44. }
  45. }
  46. }
  47. public class ThreadDemo2 {
  48. public static void main(String[] args) {
  49. Share share = new Share();
  50. new Thread(() ->{
  51. for (int i = 0; i < 10; i++) {
  52. try {
  53. share.incr();
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. },"AA").start();
  59. new Thread(() ->{
  60. for (int i = 0; i < 10; i++) {
  61. try {
  62. share.decr();
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. },"BB").start();
  68. new Thread(() ->{
  69. for (int i = 0; i < 10; i++) {
  70. try {
  71. share.incr();
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. },"CC").start();
  77. new Thread(() ->{
  78. for (int i = 0; i < 10; i++) {
  79. try {
  80. share.decr();
  81. } catch (InterruptedException e) {
  82. e.printStackTrace();
  83. }
  84. }
  85. },"DD").start();
  86. }
  87. }

3.3、线程间定制化通信

3.3.1、案例介绍

问题: A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C,按照
此顺序循环 10 轮
image.png
lock实现案例

  1. package cn.wen.lock;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. class ShareResource{
  6. // 属性 定义标志位
  7. private int flag = 1; // 1 AA 2BB 3CC
  8. // 创建Lock锁
  9. private Lock lock = new ReentrantLock();
  10. // 创建3个condition
  11. private Condition c1 = lock.newCondition();
  12. private Condition c2 = lock.newCondition();
  13. private Condition c3 = lock.newCondition();
  14. // 操作方法
  15. // 打印5次
  16. public void print5(int loop) throws InterruptedException {
  17. //上锁
  18. lock.lock();
  19. try {
  20. //判断
  21. while (flag != 1){
  22. c1.await();
  23. }
  24. // 干活
  25. for (int i = 1; i <= 5; i++) {
  26. System.out.println(Thread.currentThread().getName()+" :: "+i+" : 轮数:"+loop);
  27. }
  28. // 通知
  29. flag = 2;
  30. c2.signal();
  31. }finally {
  32. //释放锁
  33. lock.unlock();
  34. }
  35. }
  36. // 打印5次
  37. public void print10(int loop) throws InterruptedException {
  38. //上锁
  39. lock.lock();
  40. try {
  41. //判断
  42. while (flag != 2){
  43. c2.await();
  44. }
  45. // 干活
  46. for (int i = 1; i <= 10; i++) {
  47. System.out.println(Thread.currentThread().getName()+" :: "+i+" : 轮数:"+loop);
  48. }
  49. // 通知
  50. flag = 3;
  51. c3.signal();
  52. }finally {
  53. //释放锁
  54. lock.unlock();
  55. }
  56. }
  57. // 打印5次
  58. public void print15(int loop) throws InterruptedException {
  59. //上锁
  60. lock.lock();
  61. try {
  62. //判断
  63. while (flag != 3){
  64. c3.await();
  65. }
  66. // 干活
  67. for (int i = 1; i <= 15; i++) {
  68. System.out.println(Thread.currentThread().getName()+" :: "+i+" : 轮数:"+loop);
  69. }
  70. // 通知
  71. flag = 1;
  72. c1.signal();
  73. }finally {
  74. //释放锁
  75. lock.unlock();
  76. }
  77. }
  78. }
  79. public class ThreadDemo3 {
  80. public static void main(String[] args) {
  81. ShareResource shareResource = new ShareResource();
  82. new Thread(() ->{
  83. for (int i = 1; i <= 10; i++) {
  84. try {
  85. shareResource.print5(i);
  86. } catch (InterruptedException e) {
  87. e.printStackTrace();
  88. }
  89. }
  90. },"AA").start();
  91. new Thread(() ->{
  92. for (int i = 0; i < 10; i++) {
  93. try {
  94. shareResource.print10(i);
  95. } catch (InterruptedException e) {
  96. e.printStackTrace();
  97. }
  98. }
  99. },"BB").start();
  100. new Thread(() ->{
  101. for (int i = 0; i < 10; i++) {
  102. try {
  103. shareResource.print15(i);
  104. } catch (InterruptedException e) {
  105. e.printStackTrace();
  106. }
  107. }
  108. },"CC").start();
  109. }
  110. }

image.png

4、集合的线程安全

4.1 集合操作 Demo

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.UUID;
  4. /**
  5. * 集合线程安全案例
  6. */
  7. public class NotSafeDemo {
  8. /**
  9. * 多个线程同时对集合进行修改
  10. * @param args
  11. */
  12. public static void main(String[] args) {
  13. List list = new ArrayList();
  14. for (int i = 0; i < 100; i++) {
  15. new Thread(() ->{
  16. list.add(UUID.randomUUID().toString());
  17. System.out.println(list);
  18. }, "线程" + i).start();
  19. }
  20. }
  21. }

异常内容
java.util.ConcurrentModificationException
问题: 为什么会出现并发修改异常?
查看 ArrayList 的 add 方法源码

  1. /**
  2. * Appends the specified element to the end of this list.
  3. *
  4. * @param e element to be appended to this list
  5. * @return <tt>true</tt> (as specified by {@link Collection#add})
  6. */
  7. public boolean add(E e) {
  8. ensureCapacityInternal(size + 1); // Increments modCount!!
  9. elementData[size++] = e;
  10. return true;
  11. }

那么我们如何去解决 List 类型的线程安全问题?

4.2 Vector

Vector 是矢量队列,它是 JDK1.0 版本添加的类。继承于 AbstractList,实现了 List, RandomAccess, Cloneable 这些接口。 Vector 继承了 AbstractList,实现了 List;所以,它是一个队列,支持相关的添加、删除、修改、遍历等功能。 Vector 实现了 RandmoAccess 接口,即提供了随机访问功能RandmoAccess 是 java 中用来被 List 实现,为 List 提供快速访问功能的。在Vector 中,我们即可以通过元素的序号快速获取元素对象;这就是快速随机访问。 Vector 实现了 Cloneable 接口,即实现 clone()函数。它能被克隆。
和 ArrayList 不同,Vector 中的操作是线程安全的。
NotSafeDemo 代码修改

  1. package cn.wen.lock;
  2. import java.util.*;
  3. public class ThreadDemo4 {
  4. public static void main(String[] args) {
  5. // 创建ArrayList 集合 会出现异常
  6. // List<String> list = new ArrayList<>();
  7. // 使用Vector是第一种解决线程安全问题
  8. List<String > list = new Vector<>();
  9. // Collections可以解决线程安全的问题第二种
  10. // List<String> list = Collections.synchronizedList(new ArrayList<>());
  11. // 第三种
  12. // 创建多个线程
  13. for (int i = 0; i < 30; i++) {
  14. new Thread(() ->{
  15. // 添加内容
  16. list.add(UUID.randomUUID().toString().substring(0,8));
  17. // 获取内容
  18. System.out.println(list);
  19. },String.valueOf(i)).start();
  20. }
  21. }
  22. }

现在没有运行出现并发异常,为什么?
查看 Vector 的 add 方法

  1. /**
  2. * Appends the specified element to the end of this Vector.
  3. *
  4. * @param e element to be appended to this Vector
  5. * @return {@code true} (as specified by {@link Collection#add})
  6. * @since 1.2
  7. */
  8. public synchronized boolean add (E e){
  9. modCount++;
  10. ensureCapacityHelper(elementCount + 1);
  11. elementData[elementCount++] = e;
  12. return true;
  13. }

add 方法被 synchronized 同步修辞,线程安全!因此没有并发异常

4.3 Collections

Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的NotSafeDemo 代码修源码详情

  1. /**
  2. * Returns a synchronized (thread-safe) list backed by the specified
  3. * list. In order to guarantee serial access, it is critical that
  4. * <strong>all</strong> access to the backing list is accomplished
  5. * through the returned list.<p>
  6. *
  7. * It is imperative that the user manually synchronize on the returned
  8. * list when iterating over it:
  9. * <pre>
  10. * List list = Collections.synchronizedList(new ArrayList());
  11. * ...
  12. * synchronized (list) {
  13. * Iterator i = list.iterator(); // Must be in synchronized block
  14. * while (i.hasNext())
  15. * foo(i.next());
  16. * }
  17. * </pre>
  18. * Failure to follow this advice may result in non-deterministic behavior.
  19. *
  20. * <p>The returned list will be serializable if the specified list is
  21. * serializable.
  22. *
  23. * @param <T> the class of the objects in the list
  24. * @param list the list to be "wrapped" in a synchronized list.
  25. * @return a synchronized view of the specified list.
  26. */
  27. public static <T> List<T> synchronizedList(List<T> list) {
  28. return (list instanceof RandomAccess ?
  29. new SynchronizedRandomAccessList<>(list) :
  30. new SynchronizedList<>(list));
  31. }

4.4 CopyOnWriteArrayList(重点)

image.png

首先我们对 CopyOnWriteArrayList 进行学习,其特点如下: 它相当于线程安全的 ArrayList。和 ArrayList 一样,它是个可变数组;但是和 ArrayList 不同的时,它具有以下特性:

  1. 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
  2. 它是线程安全的。
  3. 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
  4. 迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
  5. 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

1. 独占锁效率低:采用读写分离思想解决
2. 写线程获取到锁,其他写线程阻塞
3. 复制思想:
当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来得及写会内存,其他的线程就会读到了脏数据。
这就是 CopyOnWriteArrayList 的思想和原理。就是拷贝一份。
NotSafeDemo 代码修改

  1. import java.util.*;
  2. import java.util.concurrent.CopyOnWriteArrayList;
  3. /**
  4. * 集合线程安全案例
  5. */
  6. public class NotSafeDemo {
  7. /**
  8. * 多个线程同时对集合进行修改
  9. * @param args
  10. */
  11. public static void main(String[] args) {
  12. List list = new CopyOnWriteArrayList();
  13. for (int i = 0; i < 100; i++) {
  14. new Thread(() ->{
  15. list.add(UUID.randomUUID().toString());
  16. System.out.println(list);
  17. }, "线程" + i).start();
  18. }
  19. }
  20. }

没有线程安全问题
原因分析(重点):动态数组与线程安全
下面从“动态数组”和“线程安全”两个方面进一步对 CopyOnWriteArrayList 的原理进行说明

  • “动态数组”机制
    • 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因
    • 由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,效率比较高。
  • “线程安全”机制
    • 通过 volatile 和互斥锁来实现的。
    • 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读取到的数据总是最新的”这个机制的保证。
    • 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”,就达到了保护数据的目的。

      4.5 小结(重点)

      1.线程安全与线程不安全集合
      集合类型中存在线程安全与线程不安全的两种,常见例如:
      ArrayList ——- Vector
      HashMap ——-HashTable
      但是以上都是通过 synchronized 关键字实现,效率较低
      2.Collections 构建的线程安全集合3.java.util.concurrent 并发包下
      CopyOnWriteArrayList CopyOnWriteArraySet 类型,通过动态数组与线程安
      全个方面保证线程安全

      5、多线程锁

      5.1 锁的八个问题演示

      1. class Phone {
      2. public static synchronized void sendSMS() throws Exception {
      3. //停留 4 秒
      4. TimeUnit.SECONDS.sleep(4);
      5. System.out.println("------sendSMS");
      6. }
      7. public synchronized void sendEmail() throws Exception {
      8. System.out.println("------sendEmail");
      9. }
      10. public void getHello() {
      11. System.out.println("------getHello");
      12. }
      13. }
      14. class Test {
      15. public static void main(String[] args) {
      16. new Thread(() ->{
      17. try{
      18. phone.sendSMS();
      19. } catch (Exception e){
      20. e.printStackTrace();
      21. }
      22. },"AA").start();
      23. Thread.sleep(100);
      24. new Thread(() ->{
      25. try{
      26. phone.sendEmail();
      27. } catch (Exception e){
      28. e.printStackTrace();
      29. }
      30. },"BB").start();
      31. }
      32. }
      /
      *
      @Description: 8 锁
      *
      1 标准访问,先打印短信还是邮件
      ———sendSMS
      ———sendEmail
      2 停 4 秒在短信方法内,先打印短信还是邮件
      ———sendSMS
      ———sendEmail
      3 新增普通的 hello 方法,是先打短信还是 hello
      ———getHello
      ———sendSMS
      4 现在有两部手机,先打印短信还是邮件
      ———sendEmail
      ———sendSMS
      5 两个静态同步方法,1 部手机,先打印短信还是邮件
      ———sendSMS
      ———sendEmail6 两个静态同步方法,2 部手机,先打印短信还是邮件
      ———sendSMS
      ———sendEmail
      7 1 个静态同步方法,1 个普通同步方法,1 部手机,先打印短信还是邮件
      ———sendEmail
      ———sendSMS
      8 1 个静态同步方法,1 个普通同步方法,2 部手机,先打印短信还是邮件
      ———sendEmail
      ———sendSMS
      结论:
      一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用其中的一个 synchronized 方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些 synchronized 方法锁的是当前对象 this,被锁定后,其它的线程都不能进入到当前对象的其它的
      synchronized 方法加个普通方法后发现和同步锁无关换成两个对象后,不是同一把锁了,情况立刻变化。
      synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁。
      具体表现为以下 3 种形式。
      对于普通同步方法,锁是当前实例对象。
      对于静态同步方法,锁是当前类的 Class 对象。
      对于同步方法块,锁是 Synchonized 括号里 配置 的 对象 **
      当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁, 所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同
      步方法之间,只要它们同一个类的实例对象!

      5.2、死锁的形成

      ```java package cn.wen.lock;

import java.util.*; import java.util.concurrent.TimeUnit;

public class ThreadDemo4 {

  1. static Object a = new Object();
  2. static Object b = new Object();
  3. public static void main(String[] args) {
  4. new Thread(() ->{
  5. try{
  6. // 当前有a锁
  7. synchronized (a) {
  8. System.out.println(Thread.currentThread().getName()+"持有锁a,试图获取锁b");
  9. // 这个目的是让A 和B 都创建线程 因为创建线程是不确定的
  10. TimeUnit.SECONDS.sleep(1);
  11. synchronized (b) {
  12. System.out.println(Thread.currentThread().getName() + "获取锁b");
  13. }
  14. }
  15. } catch (Exception e){
  16. e.printStackTrace();
  17. }
  18. },"A").start();
  19. new Thread(() ->{
  20. try{
  21. // 当前有a锁
  22. synchronized (b) {
  23. System.out.println(Thread.currentThread().getName()+"持有锁b,试图获取锁a");
  24. TimeUnit.SECONDS.sleep(1);
  25. synchronized (a) {
  26. System.out.println(Thread.currentThread().getName() + "获取锁a");
  27. }
  28. }
  29. } catch (Exception e){
  30. e.printStackTrace();
  31. }
  32. },"B").start();
  33. }

}

  1. <a name="TKhFM"></a>
  2. ### 6 Callable&Future 接口
  3. 目前我们学习了有两种创建线程的方法-一种是通过创建 Thread 类,另一种是通过使用 Runnable 创建线程。但是,Runnable 缺少的一项功能是,当线程终止时(即 run()完成时),我们无法使线程返回结果。为了支持此功能,Java 中提供了 Callable 接口。<br />**现在我们学习的是创建线程的第三种方案---Callable 接口**<br />**Callable 接口的特点如下(重点) **
  4. - 为了实现 Runnable,需要实现不返回任何内容的 run()方法,而对于Callable,需要实现在完成时返回结果的 call()方法。
  5. - call()方法可以引发异常,而 run()则不能。
  6. - 为实现 Callable 而必须重写 call 方法
  7. - 不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable
  8. ```java
  9. package cn.wen.callable;
  10. import java.util.concurrent.Callable;
  11. import java.util.concurrent.ExecutionException;
  12. import java.util.concurrent.FutureTask;
  13. // 比较两个接口
  14. // 实现Runnable接口
  15. class MyThread1 implements Runnable{
  16. @Override
  17. public void run() {
  18. }
  19. }
  20. // 实现Callable接口
  21. class MyThread2 implements Callable{
  22. @Override
  23. public Object call() throws Exception {
  24. return null;
  25. }
  26. }
  27. public class Demo1 {
  28. public static void main(String[] args) throws ExecutionException, InterruptedException {
  29. // Runnable 接口创建线程
  30. new Thread(new MyThread1(),"AA").start();
  31. // Callable 报错,不能这样创建
  32. // new Thread(new MyThread2(),"BB").start();
  33. //FutureTask创建项目
  34. FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2());
  35. // lam 表达式
  36. FutureTask<Integer> futureTask2 = new FutureTask<>(() ->{
  37. System.out.println(Thread.currentThread().getName()+" come in callable");
  38. return 1024;
  39. });
  40. // 创建一个线程
  41. new Thread(futureTask2,"lucy").start();
  42. new Thread(futureTask1,"lucy").start();
  43. while (!futureTask2.isDone()){
  44. System.out.println("wait.....");
  45. }
  46. // 调用FutureTask 的get方法
  47. System.out.println(futureTask2.get());
  48. System.out.println(Thread.currentThread().getName()+" come over");
  49. // 原理 未来任务
  50. /**
  51. * 1、 老师上课,口渴了,去买水不合适,讲课继续
  52. * 单开线程找班长买水,把水买回来,需要的时候直接get
  53. *
  54. * 2、
  55. */
  56. }
  57. }

6.2 Future 接口

当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下:

  • public boolean cancel(boolean mayInterrupt):用于停止任务。如果尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true 时才会中断任务。
  • public Object get()抛出 InterruptedException,ExecutionException:用于获取任务的结果。如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。
  • public boolean isDone():如果任务完成,则返回 true,否则返回 false可以看到 Callable 和 Future 做两件事-Callable 与 Runnable 类似,因为它封装了要在另一个线程上运行的任务,而 Future 用于存储从另一个线程获得的结果。实际上,future 也可以与 Runnable 一起使用。要创建线程,需要 Runnable。为了获得结果,需要 future

    6.3 FutureTask

    Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建 Thread 对象。因此,间接地使用 Callable 创建线程。
    核心原理:(重点)
    在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成

  • 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态

  • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
  • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法
  • 一旦计算完成,就不能再重新开始或取消计算
  • get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常
  • get 只计算一次,因此 get 方法放到最后

demo 案例

6.4 使用 Callable 和 Future

  1. /**
  2. * CallableDemo 案列
  3. */
  4. public class CallableDemo {
  5. /**
  6. * 实现 runnable 接口
  7. */
  8. static class MyThread1 implements Runnable {
  9. /**
  10. * run 方法
  11. */
  12. @Override
  13. public void run() {
  14. try {
  15. System.out.println(Thread.currentThread().getName() + "线程进入了 run
  16. 方法");
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. /**
  23. * 实现 callable 接口
  24. */
  25. static class MyThread2 implements Callable {
  26. /**
  27. * call 方法
  28. *
  29. * @return
  30. * @throws Exception
  31. */
  32. @Override
  33. public Long call() throws Exception {
  34. try {
  35. System.out.println(Thread.currentThread().getName() + "线程进入了 call
  36. 方法, 开始准备睡觉");
  37. Thread.sleep(1000);
  38. System.out.println(Thread.currentThread().getName() + "睡醒了");
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. return System.currentTimeMillis();
  43. }
  44. }
  45. public static void main(String[] args) throws Exception {
  46. //声明 runable
  47. Runnable runable = new MyThread1();
  48. //声明 callable
  49. Callable callable = new MyThread2();
  50. //future-callable
  51. FutureTask<Long> futureTask2 = new FutureTask(callable);
  52. //线程二
  53. new Thread(futureTask2, "线程二").start();
  54. for (int i = 0; i < 10; i++) {
  55. Long result1 = futureTask2.get();
  56. System.out.println(result1);
  57. }
  58. //线程一
  59. new Thread(runable, "线程一").start();
  60. }
  61. }

6.5 小结(重点)

  • 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成, 当主线程将来需要时,就可以通过 Future对象获得后台作业的计算结果或者执行状态
  • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果
  • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
  • 只计算一次

    7、JUC 三大辅助类

    JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时 Lock 锁的频繁操作。这三种辅助类为:
    CountDownLatch: 减少计数
    CyclicBarrier: 循环栅栏
    Semaphore: 信号灯
    下面我们分别进行详细的介绍和学习

    7.1 减少计数 CountDownLatch

    CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。

  • CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这 些线程会阻塞

  • 其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程不会阻塞)
  • 当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行

场景: 6 个同学陆续离开教室后值班同学才可以关门。
CountDownLatchDemo

  1. package cn.wen.juc;
  2. import java.util.concurrent.CountDownLatch;
  3. public class CountDownLatchDemo {
  4. // 六个同学离开教室才能锁门
  5. public static void main(String[] args) throws InterruptedException {
  6. // 创建一个CountDownLatch对象,设置初始值
  7. CountDownLatch countDownLatch = new CountDownLatch(6);
  8. // 6个同学陆续离开之后
  9. for (int i = 1; i <= 6; i++) {
  10. new Thread(() -> {
  11. System.out.println(Thread.currentThread().getName()+" 好同学离开了教室!");
  12. // 计数器减一
  13. countDownLatch.countDown();
  14. },String.valueOf(i)).start();
  15. }
  16. // 没有变成0 就等待
  17. countDownLatch.await();
  18. System.out.println(Thread.currentThread().getName()+" 班长锁门走人了!");
  19. }
  20. }

7.2 循环栅栏 CyclicBarrier

CyclicBarrier 看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。可以将 CyclicBarrier 理解为加 1 操作
场景: 集齐 7 颗龙珠就可以召唤神龙
CyclicBarrierDemo

  1. package cn.wen.juc;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. public class CyclicBarrierDemo {
  5. private static final int NUMBER = 7;
  6. public static void main(String[] args) {
  7. // 集齐7颗龙珠
  8. // 创建CyclicBarrier
  9. CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER,()->{
  10. System.out.println("******集齐7颗龙珠,召唤神龙");
  11. });
  12. // 集齐过程
  13. for (int i = 1; i <= 7; i++) {
  14. new Thread(() -> {
  15. System.out.println(Thread.currentThread().getName()+" 星龙珠收集到了");
  16. // 等待
  17. try {
  18. cyclicBarrier.await();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } catch (BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. },String.valueOf(i)).start();
  25. }
  26. }
  27. }

7.3、信号灯 Semaphore

Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方法获得许可证,release 方法释放许可
场景: 抢车位, 6 部汽车 3 个停车位
SemaphoreDemo

  1. package cn.wen.juc;
  2. import java.util.Random;
  3. import java.util.concurrent.Semaphore;
  4. import java.util.concurrent.TimeUnit;
  5. public class SemaphoreDemo {
  6. public static void main(String[] args) {
  7. // 创建Semaphore 设置许可数量
  8. Semaphore semaphore = new Semaphore(3);
  9. //模拟6辆车
  10. for (int i = 1; i <= 6; i++) {
  11. new Thread(() -> {
  12. try {
  13. //抢占
  14. semaphore.acquire();
  15. System.out.println(Thread.currentThread().getName()+" 抢到了车位");
  16. // 设置随机停车时间
  17. TimeUnit.SECONDS.sleep(new Random().nextInt(5));
  18. System.out.println(Thread.currentThread().getName()+" -------离开了车位");
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }finally {
  22. // 释放
  23. semaphore.release();
  24. }
  25. },String.valueOf(i)).start();
  26. }
  27. }
  28. }

8、读写锁

8.1 读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁
1. 线程进入读锁的前提条件:

  • 没有其他线程的写锁
  • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。

2.线程进入写锁的前提条件:

  • 没有其他线程的读锁
  • 没有其他线程的写锁

而读写锁有以下三个重要的特性:
1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
2)重进入:读锁和写锁都支持线程重进入。
3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
image.png
读写锁Demo

  1. package cn.wen.readwrite;
  2. import javax.sound.midi.Track;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.ReadWriteLock;
  7. import java.util.concurrent.locks.ReentrantReadWriteLock;
  8. class MyCache{
  9. // 创建map集合
  10. private Map<String, Object> map = new HashMap<>();
  11. private ReadWriteLock rwLock = new ReentrantReadWriteLock();
  12. // 放数据
  13. public void put(String key, Object value){
  14. rwLock.writeLock().lock();
  15. try {
  16. System.out.println(Thread.currentThread().getName()+" 正在写操作"+key);
  17. // 暂停也会
  18. TimeUnit.MICROSECONDS.sleep(300);
  19. // 放数据
  20. map.put(key, value);
  21. System.out.println(Thread.currentThread().getName()+" 写完了"+key);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }finally {
  25. rwLock.writeLock().unlock();
  26. }
  27. }
  28. // 取数据
  29. public Object get(String key){
  30. rwLock.readLock().lock();
  31. Object result = null;
  32. try {
  33. System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key);
  34. // 暂停也会
  35. TimeUnit.MICROSECONDS.sleep(300);
  36. // 放数据
  37. result = map.get(key);
  38. System.out.println(Thread.currentThread().getName()+" 读取完了"+key);
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }finally {
  42. rwLock.readLock().unlock();
  43. }
  44. return result;
  45. }
  46. }
  47. public class ReadWriteLockDemo {
  48. public static void main(String[] args) {
  49. MyCache myCache = new MyCache();
  50. // 创建线程
  51. for (int i = 1; i <= 5; i++) {
  52. final int num = i;
  53. new Thread(() ->{
  54. myCache.put(num+"",num+"");
  55. },String.valueOf(i)).start();
  56. }
  57. // 创建线程获取数据
  58. for (int i = 1; i <= 5; i++) {
  59. final int num = i;
  60. new Thread(() ->{
  61. myCache.get(num+"");
  62. },String.valueOf(i)).start();
  63. }
  64. }
  65. }
  66. /**
  67. 2 正在写操作2
  68. 2 写完了2
  69. 3 正在写操作3
  70. 3 写完了3
  71. 1 正在写操作1
  72. 1 写完了1
  73. 5 正在写操作5
  74. 5 写完了5
  75. 4 正在写操作4
  76. 4 写完了4
  77. 1 正在读取操作1
  78. 2 正在读取操作2
  79. 3 正在读取操作3
  80. 4 正在读取操作4
  81. 5 正在读取操作5
  82. 4 读取完了4
  83. 1 读取完了1
  84. 5 读取完了5
  85. 3 读取完了3
  86. 2 读取完了2
  87. **/

image.png
饥饿锁的形成是读锁一直占有,导致写锁获取不到。

8.2、 写锁降级为读锁

  1. package cn.wen.readwrite;
  2. import java.util.concurrent.locks.ReentrantReadWriteLock;
  3. // 演示写锁降级为读锁
  4. public class Demo1 {
  5. public static void main(String[] args) {
  6. // 可重入读写锁对象
  7. ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
  8. ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁
  9. ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock(); // 写锁
  10. // 锁降级
  11. // 1、获取写锁
  12. writeLock.lock();
  13. System.out.println("---wen");
  14. // 获取读锁
  15. readLock.lock();
  16. System.out.println("---read");
  17. // 释放写锁
  18. writeLock.unlock();
  19. System.out.println("释放写锁");
  20. // 释放读锁
  21. readLock.unlock();
  22. System.out.println("释放读锁");
  23. }
  24. }

image.png

8.3、 小结(重要)

  • 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
  • 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
  • 原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

    9、 阻塞队列

    9.1 BlockingQueue 简介

    Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员,包括他们各自的功能以及常见使用场景。阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
    image.png

  • 当队列是空的,从队列中获取元素的操作将会被阻塞

  • 当队列是满的,从队列中添加元素的操作将会被阻塞
  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或完全清空,使队列变得空闲起来并后续新增
  • 常用的队列主要有以下两种:

• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要 BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue 都给你一手包办了
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和 “消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度
大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

  • 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
  • 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起)直到队列中有空的位置,线程被自动唤醒

    9.2 BlockingQueue 核心方法

    image.pngBlockingQueue 的核心方法
    1.放入数据

  • offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当前执行方法的线程)

  • offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
  • put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.

2.获取数据

  • poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等time 参数规定的时间,取不到时返回 null
  • poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  • take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入;
  • drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    9.3、入门案例

    ```java package cn.wen.queue;

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;

public class BlockQueueDemo { public static void main(String[] args) throws InterruptedException { // 创建阻塞队列 BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);

  1. // 第一组

// System.out.println(blockingQueue.add(“a”)); // System.out.println(blockingQueue.add(“b”)); // System.out.println(blockingQueue.add(“c”)); // System.out.println(blockingQueue.element()); // //// System.out.println(blockingQueue.add(“w”)); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove());

  1. // 第二组

// System.out.println(blockingQueue.offer(“a”)); // System.out.println(blockingQueue.offer(“b”)); // System.out.println(blockingQueue.offer(“c”)); // System.out.println(blockingQueue.offer(“w”)); // // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll());

  1. // a b c
  2. //第三组

// blockingQueue.put(“a”); // blockingQueue.put(“b”); // blockingQueue.put(“c”); //// blockingQueue.put(“w”); // 阻塞 // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take());

  1. // 第四组
  2. System.out.println(blockingQueue.offer("a"));
  3. System.out.println(blockingQueue.offer("b"));
  4. System.out.println(blockingQueue.offer("c"));
  5. System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS));
  6. }

}

  1. <a name="xJMIV"></a>
  2. #### 9.4 常见的 BlockingQueue
  3. <a name="KGCcG"></a>
  4. ##### 9.4.1 ArrayBlockingQueue(常用)
  5. 基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。 <br />ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还 可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。 <br />**一句话总结: 由数组结构组成的有界阻塞队列。**=
  6. <a name="XoRjZ"></a>
  7. ##### 9.4.2 LinkedBlockingQueue(常用)
  8. 基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。 而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列 的并发性能。 <br />**ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个 类足以。 **<br />**一句话总结: 由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。**
  9. <a name="hr0zT"></a>
  10. ##### 9.4.3 DelayQueue
  11. DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。 <br />**一句话总结: 使用优先级队列实现的延迟无界阻塞队列。**
  12. <a name="Nb3SH"></a>
  13. ##### 9.4.4 PriorityBlockingQueue
  14. 基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并**不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者**。 <br />因此使用的时候要特别注意,**生产者生产数据的速度绝对不能快于消费者消费数据的速度**,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是**公平锁**。 <br />**一句话总结: 支持优先级排序的无界阻塞队列。**
  15. <a name="NAuM8"></a>
  16. ##### 9.4.5 SynchronousQueue
  17. 一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。声明一个SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。 <br />**公平模式和非公平模式的区别: **<br />• 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略; <br />• 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式, 如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。 <br />**一句话总结: 不存储元素的阻塞队列,也即单个元素的队列。**=
  18. <a name="iPp9W"></a>
  19. ##### 9.4.6 LinkedTransferQueue
  20. LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和transfer 方法。LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 <br />为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。 <br />**一句话总结: 由链表组成的无界阻塞队列。**
  21. <a name="H4HCg"></a>
  22. ##### 9.4.7 LinkedBlockingDeque
  23. LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。 <br />对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况 <br />• 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常 <br />• 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数 <br />**一句话总结: 由链表组成的双向阻塞队列**
  24. <a name="k4X0B"></a>
  25. ##### 9.5 小结
  26. **1. 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件 **<br />**满足,被挂起的线程又会自动被唤起 **<br />**2. 为什么需要 BlockingQueue? **在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
  27. <a name="hn2oK"></a>
  28. ### 10、ThreadPool 线程池
  29. 线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。 <br />例子: 10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。 <br />**线程池的优势: **线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。 <br />**它的主要特点为: **<br />• **降低资源消耗**: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。 <br />• **提高响应速度:** 当任务到达时,任务可以不需要等待线程创建就能立即执行。 <br />•** 提高线程的可管理性**: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。 <br />• **Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类**
  30. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25484710/1647931254804-e3f7039f-0a02-4b46-a90b-b9e8a5375324.png#clientId=ua6b7c778-0fcf-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=362&id=u8d51e196&margin=%5Bobject%20Object%5D&name=image.png&originHeight=427&originWidth=720&originalType=binary&ratio=1&rotation=0&showTitle=false&size=54898&status=done&style=none&taskId=ud071e54d-e6ba-4ddb-b339-cbd39c9e2fb&title=&width=610)
  31. <a name="eI77r"></a>
  32. #### 10.2 线程池参数说明
  33. 本次介绍 5 种类型的线程池
  34. <a name="HvgdL"></a>
  35. ##### 10.2.1 常用参数(重点)
  36. • corePoolSize 线程池的核心线程数 <br />• maximumPoolSize 能容纳的最大线程数 <br />• keepAliveTime 空闲线程存活时间 <br />• unit 存活的时间单位 <br />• workQueue 存放提交但未执行任务的队列 <br />• threadFactory 创建线程的工厂类 <br />• handler 等待队列满后的拒绝策略 <br />线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize -最大线程数 当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。 <br />总结起来,也就是一句话,**当提交的任务数大于(workQueue.size() +maximumPoolSize ),就会触发线程池的拒绝策略**。
  37. <a name="gfADz"></a>
  38. ##### 10.2.2 拒绝策略(重点)
  39. **CallerRunsPolicy**: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大 <br />**AbortPolicy**: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。 <br />**DiscardPolicy**: 直接丢弃,其他啥都没有 <br />**DiscardOldestPolicy**: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
  40. <a name="KZ6vF"></a>
  41. #### 10.3 线程池的种类与创建
  42. <a name="UhJ1a"></a>
  43. ##### 10.3.1 newCachedThreadPool(常用)
  44. **作用**:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程. <br />**特点**: <br />• 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE) <br />• 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟) <br />• 当线程池中,没有可用线程,会重新创建一个线程 <br />**创建方式**
  45. ```java
  46. /**
  47. * 可缓存线程池
  48. *
  49. * @return
  50. */
  51. public static ExecutorService newCachedThreadPool() {
  52. /**
  53. * corePoolSize 线程池的核心线程数
  54. * maximumPoolSize 能容纳的最大线程数
  55. * keepAliveTime 空闲线程存活时间
  56. * unit 存活的时间单位
  57. * workQueue 存放提交但未执行任务的队列
  58. * threadFactory 创建线程的工厂类:可以省略
  59. * handler 等待队列满后的拒绝策略:可以省略
  60. */
  61. return new ThreadPoolExecutor(0,
  62. Integer.MAX_VALUE,
  63. 60L,
  64. TimeUnit.SECONDS,
  65. new SynchronousQueue<>(),
  66. Executors.defaultThreadFactory(),
  67. new ThreadPoolExecutor.AbortPolicy());
  68. }

场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景

10.3.2 newFixedThreadPool(常用)

作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线
程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。特征:
• 线程池中的线程处于一定的量,可以很好的控制线程的并发量
• 线程可以重复被使用,在显示关闭之前,都将一直存在
• 超出一定量的线程被提交时候需在队列中等待
创建方式

  1. /**
  2. * 固定长度线程池
  3. * @return
  4. */
  5. public static ExecutorService newFixedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(10,
  16. 10,
  17. 0L,
  18. TimeUnit.SECONDS,
  19. new LinkedBlockingQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严 格限制的场景

10.3.3 newSingleThreadExecutor(常用)

作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的
newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
创建方式:

  1. /**
  2. * 单一线程池
  3. * @return
  4. */
  5. public static ExecutorService newSingleThreadExecutor(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(1, 1,
  16. 0L,
  17. TimeUnit.SECONDS,
  18. new LinkedBlockingQueue<>(),
  19. Executors.defaultThreadFactory(),
  20. new ThreadPoolExecutor.AbortPolicy());
  21. }

场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景

10.3.4 newScheduleThreadPool(了解)

作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池
特征:
1)线程池中具有指定数量的线程,即便是空线程也将保留
2)可定时或者延迟执行线程活动

  1. public static ScheduledExecutorService newScheduledThreadPool(int
  2. corePoolSize,
  3. ThreadFactory threadFactory) {
  4. return new ScheduledThreadPoolExecutor(corePoolSize,
  5. threadFactory);
  6. }

场景: 适用于需要多个后台线程执行周期任务的场景

10.3.5 newWorkStealingPool

jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务
创建方式

  1. public static ExecutorService newWorkStealingPool(int parallelism) {
  2. /**
  3. * parallelism:并行级别,通常默认为 JVM 可用的处理器个数
  4. * factory:用于创建 ForkJoinPool 中使用的线程。
  5. * handler:用于处理工作线程未处理的异常,默认为 null
  6. * asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列
  7. */
  8. return new ForkJoinPool(parallelism,
  9. ForkJoinPool.defaultForkJoinWorkerThreadFactory, null,
  10. true);
  11. }

场景: 适用于大耗时,可并行执行的场景
Demo

  1. package cn.wen.pool;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class ThreadPoolDemo {
  5. public static void main(String[] args) {
  6. // 获取线程池
  7. ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
  8. // 一池一线程
  9. ExecutorService threadPool2 = Executors.newFixedThreadPool(1);
  10. // 10个顾客请求
  11. // 可扩容线程 根据实际情况添加线程
  12. ExecutorService threadPool3 = Executors.newCachedThreadPool();
  13. try {
  14. for (int i = 1; i <= 10; i++) {
  15. // 执行
  16. threadPool1.execute(() -> {
  17. System.out.println(Thread.currentThread().getName()+" 办理业务!");
  18. });
  19. }
  20. }catch (Exception e){
  21. e.printStackTrace();
  22. }finally {
  23. threadPool1.shutdown();
  24. }
  25. }
  26. }

10.5 线程池底层工作原理(重要)

image.png
1. 在创建了线程池后,线程池中的线程数为零
2. 当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核程 立刻运行这个任务;
2.4 如 果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程 池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
4.2所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
image.png

10.6 注意事项(重要)

  1. 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参
    数,自定义线程池
    2. 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建
    o corePoolSize 线程池的核心线程数
    o maximumPoolSize 能容纳的最大线程数
    o keepAliveTime 空闲线程存活时间
    o unit 存活的时间单位
    o workQueue 存放提交但未执行任务的队列
    o threadFactory 创建线程的工厂类
    o handler 等待队列满后的拒绝策略
    3. 为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图
    image.png ```java package cn.wen.pool;

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo2 { public static void main(String[] args) { // 阿里巴巴手册不建议使用Executors来创建线程 // 建议自定义线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 2L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );

  1. try {
  2. for (int i = 1; i <= 10; i++) {
  3. // 执行
  4. threadPool.execute(() -> {
  5. System.out.println(Thread.currentThread().getName() + " 办理业务!");
  6. });
  7. }
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. } finally {
  11. threadPool.shutdown();
  12. }
  13. }

} // 最多容纳8个线程 max+blocking

  1. <a name="YEcWy"></a>
  2. ### ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25484710/1647934103237-d363a23f-89de-4030-b0ef-92b599122f32.png#clientId=ua6b7c778-0fcf-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=339&id=u1d9226fe&margin=%5Bobject%20Object%5D&name=image.png&originHeight=429&originWidth=821&originalType=binary&ratio=1&rotation=0&showTitle=false&size=71237&status=done&style=none&taskId=ue6ea19ed-42bb-4a2d-9d9b-fcb9a4b8b7d&title=&width=649.5)11、Fork/Join
  3. <a name="MciLY"></a>
  4. #### 11.1 Fork/Join 框架简介
  5. Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事情:
  6. > **Fork:把一个复杂任务进行分拆,大事化小 **
  7. > **Join:把分拆任务的结果进行合并**
  8. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25484710/1647934233487-b29df365-a708-41de-9e40-5e70ae7ed2d8.png#clientId=ua6b7c778-0fcf-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=383&id=uc3563ba4&margin=%5Bobject%20Object%5D&name=image.png&originHeight=457&originWidth=735&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41507&status=done&style=none&taskId=u463bd9c0-c0e0-42dc-92ac-aeec58ab255&title=&width=615.2000122070312)<br />1. **任务分割**:首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割 <br />2. **执行任务并合并结果**:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。在 Java 的 Fork/Join 框架中,使用两个类完成上述操作• **ForkJoinTask**:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集 成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类: <br /> a.RecursiveAction:用于没有返回结果的任务 <br /> b.RecursiveTask:用于有返回结果的任务 <br />• **ForkJoinPool**:ForkJoinTask 需要通过 ForkJoinPool 来执行 <br />• **RecursiveTask**: 继承后可以实现递归(自己调自己)调用的任务 <br />**Fork/Join 框架的实现原理 **<br />ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, <br />ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoi
  9. <a name="DKduZ"></a>
  10. #### 11.2 Fork 方法
  11. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25484710/1647934347760-3ae95b2a-23a6-44a4-969d-61f613a92f26.png#clientId=ua6b7c778-0fcf-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=497&id=ube9279cb&margin=%5Bobject%20Object%5D&name=image.png&originHeight=515&originWidth=658&originalType=binary&ratio=1&rotation=0&showTitle=false&size=52356&status=done&style=none&taskId=u5f27bc76-5722-4434-bdd3-01375909d91&title=&width=635)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25484710/1647934361433-0480c77b-9dc1-4553-8711-9152bcb35948.png#clientId=ua6b7c778-0fcf-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=368&id=ud4e42cdc&margin=%5Bobject%20Object%5D&name=image.png&originHeight=398&originWidth=666&originalType=binary&ratio=1&rotation=0&showTitle=false&size=46131&status=done&style=none&taskId=u7037e1b5-edf2-42c1-8343-3713b3eefd5&title=&width=615)<br />**Fork 方法的实现原理: **当我们调用 ForkJoinTask 的 fork 方法时,程序会把任务放在 ForkJoinWorkerThread 的 pushTask 的 **workQueue **中,异步地执行这个任务,然后立即返回结果
  12. ```java
  13. public final ForkJoinTask<V> fork() {
  14. Thread t;
  15. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
  16. ((ForkJoinWorkerThread)t).workQueue.push(this);
  17. else
  18. ForkJoinPool.common.externalPush(this);
  19. return this;
  20. }

pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用
ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:

  1. final void push(ForkJoinTask<?> task) {
  2. ForkJoinTask<?>[] a; ForkJoinPool p;
  3. int b = base, s = top, n;
  4. if ((a = array) != null) { // ignore if queue removed
  5. int m = a.length - 1; // fenced write for task visibility
  6. U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
  7. U.putOrderedInt(this, QTOP, s + 1);
  8. if ((n = s - b) <= 1) {
  9. if ((p = pool) != null) p.signalWork(p.workQueues, this);//执行
  10. }
  11. else if (n >= m)
  12. growArray();
  13. }
  14. }

11.3 join 方法

Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看ForkJoinTask 的 join 方法的实现,代码如下:

  1. public final V join() {
  2. int s;
  3. if ((s = doJoin() & DONE_MASK) != NORMAL)
  4. reportException(s);
  5. return getRawResult();
  6. }

它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回 什么结果,任务状态有 4 种:
已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)
• 如果任务状态是已完成,则直接返回任务结果。
• 如果任务状态是被取消,则直接抛出 CancellationException
• 如果任务状态是抛出异常,则直接抛出对应的异常
让我们分析一下 doJoin 方法的实现
1. 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
2. 如果没有执行完,则从任务数组里取出任务并执行。
3. 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

11.4 Fork/Join 框架的异常处理

ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null。
Demo

  1. package cn.wen.forkjoin;
  2. import java.awt.print.Pageable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ForkJoinPool;
  5. import java.util.concurrent.ForkJoinTask;
  6. import java.util.concurrent.RecursiveTask;
  7. class MyTask extends RecursiveTask<Integer> {
  8. // 拆分不超过10
  9. private static final Integer VALUE = 10;
  10. private int begin; //拆分开始值
  11. private int end; // 拆分的结束值
  12. private int result; // 返回结果
  13. // 创建有参数gzao
  14. public MyTask(int begin,int end){
  15. this.begin = begin;
  16. this.end = end;
  17. }
  18. @Override
  19. protected Integer compute() {
  20. // 判断两个相加是否大于10
  21. if ((end - begin) <= 10){
  22. // 相加操作
  23. for (int i = begin; i <= end; i++) {
  24. result += i;
  25. }
  26. }else {
  27. // 拆分
  28. int middle = (begin+end)/2;
  29. //拆分左边
  30. MyTask task1 = new MyTask(begin,middle);
  31. // 右边
  32. MyTask task2 = new MyTask(middle+1,end);
  33. // 调用
  34. task1.fork();
  35. task2.fork();
  36. //合并
  37. result = task1.join() + task2.join();
  38. }
  39. return result;
  40. }
  41. }
  42. public class ForkJoinDemo {
  43. public static void main(String[] args) throws ExecutionException, InterruptedException {
  44. MyTask task = new MyTask(0, 100);
  45. // 创建分支合并池
  46. ForkJoinPool forkJoinPool = new ForkJoinPool();
  47. ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(task);
  48. // 获取最终答案
  49. Integer result = forkJoinTask.get();
  50. System.out.println(result);
  51. // 关闭池对象
  52. forkJoinPool.shutdown();
  53. }
  54. }

12、CompletableFuture

12.1 CompletableFuture 简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类

12.2 Future 与 CompletableFuture

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:
1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
2)不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能
3)不支持链式调用
对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。
4)不支持多个 Future 合并比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,
执行某些函数,是没法通过 Future 实现的。
5)不支持异常处理
Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。

12.3 CompletableFuture 入门

12.3.1 使用 CompletableFuture

场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会
阻塞,最后我们在一个子线程中使其终止。

  1. /**
  2. * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们
  3. 在一个子线程中使其终止
  4. * @param args
  5. */
  6. public static void main(String[] args) throws Exception{
  7. CompletableFuture<String> future = new CompletableFuture<>();
  8. new Thread(() -> {
  9. try{
  10. System.out.println(Thread.currentThread().getName() + "子线程开始干活");
  11. //子线程睡 5 秒
  12. Thread.sleep(5000);
  13. //在子线程中完成主线程
  14. future.complete("success");
  15. }catch (Exception e){
  16. e.printStackTrace();
  17. }
  18. }, "A").start();
  19. //主线程调用 get 方法阻塞
  20. System.out.println("主线程调用 get 方法获取结果为: " + future.get());
  21. System.out.println("主线程完成,阻塞结束!!!!!!");
  22. }

12.3.2 没有返回值的异步任务
  1. /**
  2. * 没有返回值的异步任务
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. //运行一个没有返回值的异步任务
  8. CompletableFuture<Void> future = CompletableFuture.runAsync(() -
  9. > {
  10. try {
  11. System.out.println("子线程启动干活");
  12. Thread.sleep(5000);
  13. System.out.println("子线程完成");
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. });
  18. //主线程阻塞
  19. future.get();
  20. System.out.println("主线程结束");
  21. }

12.3.3 有返回值的异步任务
  1. /**
  2. * 没有返回值的异步任务
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. //运行一个有返回值的异步任务
  8. CompletableFuture<String> future =
  9. CompletableFuture.supplyAsync(() -> {
  10. try {
  11. System.out.println("子线程开始任务");
  12. Thread.sleep(5000);
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. return "子线程完成了!";
  17. });
  18. //主线程阻塞
  19. String s = future.get();
  20. System.out.println("主线程结束, 子线程的结果为:" + s);
  21. }

12.3.4 线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行
化。

  1. private static Integer num = 10;
  2. /**
  3. * 先对一个数加 10,然后取平方
  4. * @param args
  5. */
  6. public static void main(String[] args) throws Exception{
  7. System.out.println("主线程开始");
  8. CompletableFuture<Integer> future =
  9. CompletableFuture.supplyAsync(() -> {
  10. try {
  11. System.out.println("加 10 任务开始");
  12. num += 10;
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. return num;
  17. }).thenApply(integer -> {
  18. return num * num;
  19. });
  20. Integer integer = future.get();
  21. System.out.println("主线程结束, 子线程的结果为:" + integer);
  22. }

12.3.5 消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. return num;
  11. }).thenApply(integer -> {
  12. return num * num;
  13. }).thenAccept(new Consumer<Integer>() {
  14. @Override
  15. public void accept(Integer integer) {
  16. System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" +
  17. integer);
  18. }
  19. });
  20. }

12.3.6 异常处理

exceptionally 异常处理,出现异常时触发

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  4. int i= 1/0;
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. return num;
  8. }).exceptionally(ex -> {
  9. System.out.println(ex.getMessage());
  10. return -1;
  11. });
  12. System.out.println(future.get());
  13. }

handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("加 10 任务开始");
  5. num += 10;
  6. return num;
  7. }).handle((i,ex) ->{
  8. System.out.println("进入 handle 方法");
  9. if(ex != null){
  10. System.out.println("发生了异常,内容为:" + ex.getMessage());
  11. return -1;
  12. }else{
  13. System.out.println("正常完成,内容为: " + i);
  14. return i;
  15. }
  16. });
  17. System.out.println(future.get());
  18. }

12.3.7 结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. //第一步加 10
  4. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. return num;
  8. });
  9. //合并
  10. CompletableFuture<Integer> future1 = future.thenCompose(i ->
  11. //再来一个 CompletableFuture
  12. CompletableFuture.supplyAsync(() -> {
  13. return i + 1;
  14. }));
  15. System.out.println(future.get());
  16. System.out.println(future1.get());
  17. }

thenCombine 合并两个没有依赖关系的 CompletableFutures 任务

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("加 10 任务开始");
  5. num += 10;
  6. return num;
  7. });
  8. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("乘以 10 任务开始");
  10. num = num * 10;
  11. return num;
  12. });
  13. //合并两个结果
  14. CompletableFuture<Object> future = job1.thenCombine(job2, new
  15. BiFunction<Integer, Integer, List<Integer>>() {
  16. @Override
  17. public List<Integer> apply(Integer a, Integer b) {
  18. List<Integer> list = new ArrayList<>();
  19. list.add(a);
  20. list.add(b);
  21. return list;
  22. }
  23. }
  24. });
  25. System.out.println("合并结果为:" + future.get());
  26. }

合并多个任务的结果 allOf 与 anyOf
allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情

  1. /**
  2. * 先对一个数加 10,然后取平方
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. List<CompletableFuture> list = new ArrayList<>();
  8. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("加 10 任务开始");
  10. num += 10;
  11. return num;
  12. });
  13. list.add(job1);
  14. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  15. System.out.println("乘以 10 任务开始");
  16. num = num * 10;
  17. return num;
  18. });
  19. list.add(job2);
  20. CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  21. System.out.println("减以 10 任务开始");
  22. num = num * 10;
  23. return num;
  24. });
  25. list.add(job3);
  26. CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  27. System.out.println("除以 10 任务开始");
  28. num = num * 10;
  29. return num;
  30. });
  31. list.add(job4);
  32. //多任务合并
  33. List<Integer> collect =
  34. list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
  35. System.out.println(collect);
  36. }

anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个
future 结束

  1. /**
  2. * 先对一个数加 10,然后取平方
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. CompletableFuture<Integer>[] futures = new CompletableFuture[4];
  8. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  9. try{
  10. Thread.sleep(5000);
  11. System.out.println("加 10 任务开始");
  12. num += 10;
  13. return num;
  14. }catch (Exception e){
  15. return 0;
  16. }
  17. });
  18. futures[0] = job1;
  19. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  20. try{
  21. Thread.sleep(2000);
  22. System.out.println("乘以 10 任务开始");
  23. num = num * 10;
  24. return num;
  25. }catch (Exception e){
  26. return 1;
  27. }
  28. });
  29. futures[1] = job2;
  30. CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  31. try{
  32. Thread.sleep(3000);
  33. System.out.println("减以 10 任务开始");
  34. num = num * 10;
  35. return num;
  36. }catch (Exception e){
  37. return 2;
  38. }
  39. });
  40. futures[2] = job3;
  41. CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  42. try{
  43. Thread.sleep(4000);
  44. System.out.println("除以 10 任务开始");
  45. num = num * 10;
  46. return num;
  47. }catch (Exception e){
  48. return 3;
  49. }
  50. });
  51. futures[3] = job4;
  52. CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
  53. System.out.println(future.get());
  54. }

13、Volatile

13.1 volatile是什么

volatile在java语言中是一个关键字,用于修饰变量。被volatile修饰的变量后,表示这个变量在不同线程中是共享,编译器与运行时都会注意到这个变量是共享的,因此不会对该变量进行重排序。上面这句话可能不好理解,但是存在两个关键,共享和重排序。
变量的共享:
先来看一个被举烂了的例子:

  1. public class VolatileTest {
  2. boolean isStop = false;
  3. public void test() {
  4. Thread t1 = new Thread() {
  5. @Override
  6. public void run() {
  7. isStop = true;
  8. }
  9. };
  10. Thread t2 = new Thread() {
  11. @Override
  12. public void run() {
  13. while (!isStop) {
  14. }
  15. }
  16. };
  17. t2.start();
  18. t1.start();
  19. }
  20. public static void main(String args[]) throws InterruptedException {
  21. new VolatileTest().test();
  22. }
  23. }

上面的代码是一种典型用法,检查某个标记(isStop)的状态判断是否退出循环。但是上面的代码有可能会结束,也可能永远不会结束。因为每一个线程都拥有自己的工作内存,当一个线程读取变量的时候,会把变量在自己内存中拷贝一份。之后访问该变量的时候都通过访问线程的工作内存,如果修改该变量,则将工作内存中的变量修改,然后再更新到主存上。这种机制让程序可以更快的运行,然而也会遇到像上述例子这样的情况。
存在一种情况,isStop变量被分别拷贝到t1、t2两个线程中,此时isStop为false。t2开始循环,t1修改本地isStop变量称为true,并将isStop=true回写到主存,但是isStop已经在t2线程中拷贝过一份,t2循环时候读取的是t2 工作内存中的isStop变量,而这个isStop始终是false,程序死循环。我们称t2对t1更新isStop变量的行为是不可见的。
如果isStop变量通过volatile进行修饰,t2修改isStop变量后,会立即将变量回写到主存中,并将t1里的isStop失效。t1发现自己变量失效后,会重新去主存中访问isStop变量,而此时的isStop变量已经变成true。循环退出。

volatile boolean isStop = false;

13.2 volatile如何使用

volatile关键字一般用于标记变量的修饰,类似上述例子。《Java并发编程实战》中说,volatile只保证可见性,而加锁机制既可以确保可见性又可以确保原子性。当且仅当满足以下条件下,才应该使用volatile变量:
1、对变量的写入操作不依赖变量的当前值,或者确保只有单个线程变更变量的值。
2、该变量不会于其他状态一起纳入不变性条件中
3、在访问变量的时候不需要加锁。

逐一分析:
第一条说明volatile不能作为多线程中的计数器,计数器的count++操作,分为三步,第一步先读取count的数值,第二步count+1,第三步将count+1的结果写入count。volatile不能保证操作的原子性。上述的三步操作中,如果有其他线程对count进行操作,就可能导致数据出错。

  1. public class VolatileTest {
  2. private volatile int lower = 0;
  3. private volatile int upper = 5;
  4. public int getLower() {
  5. return lower;
  6. }
  7. public int getUpper() {
  8. return upper;
  9. }
  10. public void setLower(int lower) {
  11. if (lower > upper) {
  12. return;
  13. }
  14. this.lower = lower;
  15. }
  16. public void setUpper(int upper) {
  17. if (upper < lower) {
  18. return;
  19. }
  20. this.upper = upper;
  21. }
  22. }

述程序中,lower初始为0,upper初始为5,并且upper和lower都用volatile修饰。我们期望不管怎么修改upper或者lower,都能保证upper>lower恒成立。然而如果同时有两个线程,t1调用setLower,t2调用setUpper,两线程同时执行的时候。有可能会产生upper<lower这种不期望的结果。

  1. public void test() {
  2. Thread t1 = new Thread() {
  3. @Override
  4. public void run() {
  5. try {
  6. Thread.sleep(10);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. setLower(4);
  11. }
  12. };
  13. Thread t2 = new Thread() {
  14. @Override
  15. public void run() {
  16. try {
  17. Thread.sleep(10);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. setUpper(3);
  22. }
  23. };
  24. t1.start();
  25. t2.start();
  26. while (t1.isAlive() || t2.isAlive()) {
  27. }
  28. System.out.println("(low:" + getLower() + ",upper:" + getUpper() + ")");
  29. }
  30. public static void main(String args[]) throws InterruptedException {
  31. for (int i = 0; i < 100; i++) {
  32. VolatileTest volaitil = new VolatileTest();
  33. volaitil.test();
  34. }
  35. }

image.png
此时程序一直正常运行,但是出现的结果却是我们不想要的。

第三条:当访问一个变量需要加锁时,一般认为这个变量需要保证原子性和可见性,而volatile关键字只能保证变量的可见性,无法保证原子性。

最后贴个volatile的常见例子,在单例模式双重检查中的使用:

  1. public class Singleton {
  2. private static volatile Singleton instance=null;
  3. private Singleton(){
  4. }
  5. public static Singleton getInstance(){
  6. if(instance==null){
  7. synchronized(Singleton.class){
  8. if(instance==null){
  9. instance=new Singleton();
  10. }
  11. }
  12. }
  13. return instance;
  14. }
  15. }

new Singleton()分为三步,1、分配内存空间,2、初始化对象,3、设置instance指向被分配的地址。然而指令的重新排序,可能优化指令为1、3、2的顺序。如果是单个线程访问,不会有任何问题。但是如果两个线程同时获取getInstance,其中一个线程执行完1和3步骤,此时其他的线程可以获取到instance的地址,在进行if(instance==null)时,判断出来的结果为false,导致其他线程直接获取到了一个未进行初始化的instance,这可能导致程序的出错。所以用volatile修饰instance,禁止指令的重排序,保证程序能正常运行。(Bug很难出现,没能模拟出来)。
然而,《java并发编程实战中》中有对DCL的描述如下:”DCL的这种使用方法已经被广泛废弃了——促使该模式出现的驱动力(无竞争同步的执行速度很慢,以及JVM启动很慢)已经不复存在了,因而它不是一种高效的优化措施。延迟初始化占位类模式能带来同样的优势,并且更容易理解。”,其实我个小码畜的角度来看,服务端的单例更多时候做延迟初始化并没有很大意义,延迟初始化一般用来针对高开销的操作,并且被延迟初始化的对象都是不需要马上使用到的。然而,服务端的单例在大部分的时候,被设计为单例的类大部分都会被系统很快访问到。本篇文章只是讨论volatile,并不针对设计模式进行讨论,因此后续有时间,再补上替代上述单例的写法。