汪文君——多线程与架构设计

Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时间变慢。
Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,就会抛出OutOfMemory异常。
Xss 是指设定每个线程的堆栈大小。这个就要依据程序,看一个线程大约需要占用多少内存,可能会有多少线程同时运行等。
以上三个参数的设置都是默认以Byte为单位的,也可以在数字后面添加[k/K]或者[m/M]来表示KB或者MB。而且,超过机器本身的内存大小也是不可以的,否则就等着机器变慢而不是程序变慢了。
并发和并行:
image.png
并发是两个队列交替使用一台咖啡机,并行是两个队列同时使用两台咖啡机,如果串行,一个队列使用一台咖啡机,那么哪怕前面那个人便秘了去厕所呆半天,后面的人也只能死等着他回来才能去接咖啡,这效率无疑是最低的。
并发和并行都可以是很多个线程,就看这些线程能不能同时被(多个)cpu执行,如果可以就说明是并行,而并发是多个线程被(一个)cpu 轮流切换着执行。

一. 多线程基础

1. 快速认识线程

1.1描述

进程:对计算机来说 每一个任务就是一个进程(Process),在每个进程内至少要有一个线程(Thread)在运行。
线程:是程序执行的一个路径,每个线程都有自己的局部变量表、程序计数器(指向正在执行的指令指针)以及各自的生命周期。
现代操作系统中一般不止一个线程在运行,当启动了一个Java虚拟机(JVM)时,从操作系统开始就会创建一个新的进程(JVM进程),JVM进程中会派生或创建很多线程。

1.2 尝试并行运行

通过 【匿名内部类的方式创建线程,并重写其中的run方法】
run() :写入需 并行运行的逻辑
start(): 运行线程(开始执行run()中逻辑)

  1. package com.zoro.concurrent.chapter01;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * @author yx.jiang
  5. * @date 2021/7/27 10:26
  6. */
  7. public class TryConcurrency {
  8. public static void main(String[] args) {
  9. //看新闻
  10. new Thread() {
  11. @Override
  12. public void run() {
  13. browseNews();
  14. }
  15. }.start();
  16. //听歌
  17. enjoyMusic();
  18. }
  19. /**
  20. * 看新闻
  21. */
  22. public static void browseNews() {
  23. for (; ; ) {
  24. System.out.println("看新闻");
  25. sleep(1);
  26. }
  27. }
  28. /**
  29. * 听歌
  30. */
  31. public static void enjoyMusic() {
  32. for (; ; ) {
  33. System.out.println("听歌");
  34. sleep(1);
  35. }
  36. }
  37. /**
  38. * 睡眠
  39. *
  40. * @param seconds 时长
  41. */
  42. private static void sleep(int seconds) {
  43. try {
  44. TimeUnit.SECONDS.sleep(seconds);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. //输出如下:
  51. 听歌
  52. 看新闻
  53. 看新闻
  54. 听歌
  55. 看新闻
  56. 听歌
  57. 看新闻
  58. 听歌
  59. 看新闻
  60. 听歌
  61. ……

image.png
Java8可替换为①②:
===> new _Thread(() -> browseNews()).start();
===>_ _new _Thread(TryConcurrency::browseNews).start();

1.3 线程生命周期

5个主要阶段java.lang.Thread.State

  • NEW
  • RUNNABLE
  • RUNNING
  • BLOCKED
  • TERMINATED Java高并发编程详解 - 图3```java

    1. /**
    2. * Thread state for a thread which has not yet started.
    3. */
    4. NEW,
    5. /**
    6. * Thread state for a runnable thread. A thread in the runnable
    7. * state is executing in the Java virtual machine but it may
    8. * be waiting for other resources from the operating system
    9. * such as processor.
    10. */
    11. RUNNABLE,
    12. /**
    13. * Thread state for a thread blocked waiting for a monitor lock.
    14. * A thread in the blocked state is waiting for a monitor lock
    15. * to enter a synchronized block/method or
    16. * reenter a synchronized block/method after calling
    17. * {@link Object#wait() Object.wait}.
    18. */
    19. BLOCKED,
    20. /**
    21. * Thread state for a waiting thread.
    22. * A thread is in the waiting state due to calling one of the
    23. * following methods:
    24. * <ul>
    25. * <li>{@link Object#wait() Object.wait} with no timeout</li>
    26. * <li>{@link #join() Thread.join} with no timeout</li>
    27. * <li>{@link LockSupport#park() LockSupport.park}</li>
    28. * </ul>
    29. *
    30. * <p>A thread in the waiting state is waiting for another thread to
    31. * perform a particular action.
    32. *
    33. * For example, a thread that has called <tt>Object.wait()</tt>
    34. * on an object is waiting for another thread to call
    35. * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
    36. * that object. A thread that has called <tt>Thread.join()</tt>
    37. * is waiting for a specified thread to terminate.
    38. */
    39. WAITING,
    40. /**
    41. * Thread state for a waiting thread with a specified waiting time.
    42. * A thread is in the timed waiting state due to calling one of
    43. * the following methods with a specified positive waiting time:
    44. * <ul>
    45. * <li>{@link #sleep Thread.sleep}</li>
    46. * <li>{@link Object#wait(long) Object.wait} with timeout</li>
    47. * <li>{@link #join(long) Thread.join} with timeout</li>
    48. * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
    49. * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
    50. * </ul>
    51. */
    52. TIMED_WAITING,
    53. /**
    54. * Thread state for a terminated thread.
    55. * The thread has completed execution.
    56. */
    57. TERMINATED;

    ```

    1.3.1 NEW

    1.3.2 RUNNABLE

    1.3.3 RUNNING

    1.3.4 BLOCKED

    1.3.5 TERMINATED

    1.4 start() : 模板模式在Thread中的应用

    java.lang.Thread#start()

    Causes this thread to begin execution; the Java Virtual Machine calls the run method of this thread. 执行线程时,JVM会调用线程的run(),run()是被JNI方法start0()执行的。

  1. /**
  2. * Causes this thread to begin execution; the Java Virtual Machine
  3. * calls the <code>run</code> method of this thread.
  4. * <p>
  5. * The result is that two threads are running concurrently: the
  6. * current thread (which returns from the call to the
  7. * <code>start</code> method) and the other thread (which executes its
  8. * <code>run</code> method).
  9. * <p>
  10. * It is never legal to start a thread more than once.
  11. * In particular, a thread may not be restarted once it has completed
  12. * execution.
  13. *
  14. * @exception IllegalThreadStateException if the thread was already
  15. * started.
  16. * @see #run()
  17. * @see #stop()
  18. */
  19. public synchronized void start() {
  20. /**
  21. * This method is not invoked for the main method thread or "system"
  22. * group threads created/set up by the VM. Any new functionality added
  23. * to this method in the future may have to also be added to the VM.
  24. *
  25. * A zero status value corresponds to state "NEW".
  26. */
  27. if (threadStatus != 0)
  28. throw new IllegalThreadStateException();
  29. /* Notify the group that this thread is about to be started
  30. * so that it can be added to the group's list of threads
  31. * and the group's unstarted count can be decremented. */
  32. group.add(this);
  33. boolean started = false;
  34. try {
  35. start0();
  36. started = true;
  37. } finally {
  38. try {
  39. if (!started) {
  40. group.threadStartFailed(this);
  41. }
  42. } catch (Throwable ignore) {
  43. /* do nothing. If start0 threw a Throwable then
  44. it will be passed up the call stack */
  45. }
  46. }
  47. }
  48. private native void start0();

结论:

  • Thread被构造后的NEW状态,事实上threadStatus这个内部属性为0;
  • 不能两次启动Thread,否则会出现IllegalThreadStateException异常;
  • 线程启动后会被加入一个ThreadGroup中,TODO
  • 一个线程生命周期结束(TERMINATED),再次调用start方法是不允许的,即TERMINATED是不能再回到RUNNABLE/RUNNING状态的;

    1.4.1 模板模式

    严格来讲:创建线程仅有一种方式; 线程的执行单元是run()方法,实现线程执行单元有两种方法,即:继承Thread并重写run(),实现Runnable接口实现自己的业务逻辑;
    简例:
    ```java package com.zoro.concurrent.chapter01;

/**

  • 模板方法-例 *
  • @author yx.jiang
  • @date 2021/7/27 10:26 / public class TemplateMethod { /*

    • 打印 *
    • @param message 信息 */ public final void print(String message) { System.out.println(“##############”); wrapPrint(message); System.out.println(“##############”); }

      /**

    • 转换-打印 *
    • @param message 信息 */ protected void wrapPrint(String message) {

      }

      public static void main(String[] args) { new TemplateMethod() {

      1. @Override
      2. protected void wrapPrint(String message) {
      3. System.out.println("*" + message + "*");
      4. }

      }.print(“Hello Thread”);

      new TemplateMethod() {

      1. @Override
      2. protected void wrapPrint(String message) {
      3. System.out.println("-" + message + "-");
      4. }

      }.print(“Hello Thread”); } }

//输出

#

Hello Thread

#
#

-Hello Thread-

#
  1. <a name="qTI03"></a>
  2. ##### 售票(extends Thread)
  3. TODOindex的线程安全问题
  4. ```java
  5. package com.zoro.concurrent.chapter01;
  6. /**
  7. * 售票窗口-extends Thread
  8. *
  9. * @author yx.jiang
  10. * @date 2021/7/27 10:26
  11. */
  12. public class TicketWindow extends Thread {
  13. private final String name;
  14. private static final int MAX = 50;
  15. private static int index = 1;
  16. public TicketWindow(String name) {
  17. this.name = name;
  18. }
  19. @Override
  20. public void run() {
  21. while (index <= MAX) {
  22. System.out.println("柜台: " + name + ",当前号码是:" + (index++));
  23. }
  24. }
  25. /**
  26. * index存在线程安全问题
  27. *
  28. * {@code 共享资源很多、 共享资源要经过一系列复杂运算。。。}
  29. * 显然不可能用{@code static}一个个修饰,并且static修饰的变量生命周期很长【常量池、静态成员变量、全局变量——1.8后由 永久代——>堆】
  30. */
  31. public static void main(String[] args) {
  32. TicketWindow tw1 = new TicketWindow("一号");
  33. tw1.start();
  34. TicketWindow tw2 = new TicketWindow("二号");
  35. tw2.start();
  36. TicketWindow tw3 = new TicketWindow("三号");
  37. tw3.start();
  38. TicketWindow tw4 = new TicketWindow("四号");
  39. tw4.start();
  40. }
  41. }

1.4.2 策略模式

注:共享资源index仍存在线程安全问题

如何将线程的控制业务逻辑的运行彻底分开?

售票 implements Runnable

Thread类中的run是不能共享的,而Runnable很容易实现,同一Runnable构造不同的Thread实例;

  1. package com.zoro.concurrent.chapter01;
  2. /**
  3. * 售票窗口——implements Runnable
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. public class TicketWindowRunnable implements Runnable {
  9. private static final int MAX = 50;
  10. private static int index = 1;
  11. @Override
  12. public void run() {
  13. while (index <= MAX) {
  14. System.out.println(Thread.currentThread().getName() + " 的号码是:" + (index++));
  15. try {
  16. Thread.sleep(100);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. /**
  23. * index存在线程安全问题
  24. */
  25. public static void main(String[] args) {
  26. TicketWindowRunnable ticketWindowRunnable = new TicketWindowRunnable();
  27. Thread t1 = new Thread(ticketWindowRunnable, "1号");
  28. Thread t2 = new Thread(ticketWindowRunnable, "2号");
  29. Thread t3 = new Thread(ticketWindowRunnable, "3号");
  30. Thread t4 = new Thread(ticketWindowRunnable, "4号");
  31. t1.start();
  32. t2.start();
  33. t3.start();
  34. t4.start();
  35. }
  36. }

2. Thread构造函数

image.png
image.png

2.1 线程的名称

2.1.1 默认名称

com.zoro.concurrent.chapter01.ThreadConstructor#defaultThreadName

2.1.2 自定义名称

com.zoro.concurrent.chapter01.ThreadConstructor#costumeThreadName

  1. package com.zoro.concurrent.chapter01;
  2. import java.util.stream.IntStream;
  3. /**
  4. * Thread的构造方法
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class ThreadConstructor {
  10. private static final String PREFIX = "ALEX-";
  11. public static void main(String[] args) {
  12. // defaultThreadName();
  13. costumeThreadName();
  14. }
  15. /**
  16. * 自定义线程名
  17. */
  18. public static void costumeThreadName() {
  19. IntStream.range(0, 5).mapToObj(ThreadConstructor::createThread).forEach(Thread::start);
  20. /**
  21. * 输出结果如下:
  22. * ALEX-0
  23. * ALEX-1
  24. * ALEX-2
  25. * ALEX-3
  26. * ALEX-4
  27. */
  28. }
  29. /**
  30. * 创建线程
  31. * @param intName 数字
  32. * @return 自定义名称的线程
  33. */
  34. public static Thread createThread(int intName) {
  35. return new Thread(() -> System.out.println(Thread.currentThread().getName()), PREFIX + intName);
  36. }
  37. /**
  38. * 默认线程名
  39. */
  40. private static void defaultThreadName() {
  41. IntStream.range(0, 5).boxed()
  42. .map(i -> new Thread(
  43. () -> System.out.println(Thread.currentThread().getName()))
  44. )
  45. .forEach(Thread::start);
  46. /**
  47. * 输出结果如下:
  48. * Thread-0
  49. * Thread-1
  50. * Thread-4
  51. * Thread-3
  52. * Thread-2
  53. */
  54. }
  55. }

2.1.3 修改线程名称

在线程启动前通过setName()方法可进行修改

2.2 线程的父子关系

Thread的所有构造方法最终会去调用静态init()方法,可看出每一个新创建的线程都有一个父线程。

  1. private void init(ThreadGroup g, Runnable target, String name,
  2. long stackSize, AccessControlContext acc,
  3. boolean inheritThreadLocals) {
  4. if (name == null) {
  5. throw new NullPointerException("name cannot be null");
  6. }
  7. this.name = name;
  8. Thread parent = currentThread();
  9. SecurityManager security = System.getSecurityManager();
  10. ……
  11. }

线程最初状态为NEW ,没有执行start()之前,它只能算是Thread的实例,并不意味着一个新的线程被创建,因此currentThread()代表的将会是创建它的那个线程。

  • 一个线程的创建肯定是另一个线程完成的。
  • 被创建的线程的父线程是创建它的线程。

    2.3 Thread、ThreadGroup

  • main线程所在的ThreadGroup称为“main”;

  • 构造一个线程时未显示指定ThreadGroup,那么它将会和父线程同属于一个ThreadGroup;

除Group外,还会和父线程有同样的优先级、domain。

2.4 Thread、JVM虚拟机栈

2.4.1 Thread 与 Stacksize

栈内存通过-xss设置;一般地,stacksize越大,递归深度越深;stacksize越小,创建的线程数越多;

2.4.2 JVM内存结构

JVM

  1. 程序计数器线程私有
  2. java虚拟机栈线程私有
  3. 本地方法栈线程私有
  4. 堆内存
    1. 堆是GC的重要区域(含:新生代【Eden区,From Survivor区,To Survivor区】、老年代);
  5. 方法区
    1. 主要用于存储:已被虚拟机加载的类信息、常量、静态变量、即时编译器(JIT)编译后的代码等数据;
    2. 在HotSpot中,方法区还会被细分为持久代代码缓存区,代码缓存区用于存储编译后的本地代码(和硬件相关)以及JIT(Just In Time)编译器生成的代码。不同的JVM通常有不同的实现。
  6. Java8元空间
    1. 持久代被彻底删除,取而代之的是元空间。
    2. 元空间是堆内存的一部分,JVM为每个类加载器分配一块内存列表,进行线性分配,块的大小取决于类加载器的类型,sun/反射/代理对应的类加载器块会小一点,之前版本会单独卸载回收某个类,现在则是GC过程中发现某个类加载器已经具备回收的条件,则会将整个类加载器相关的元空间全部回收,以减少内存碎片,节省GC扫描和压缩的时间。

      2.4.3 Thread与虚拟机栈

      2.5 守护线程

      JVM中没有一个 非守护线程,则JVM的进程会退出。
      main线程是守护线程。

      3. Thread API

      3.1 sleep

      image.png

      3.1.1 介绍

      休眠时,不会放弃monitor锁的所有权。是个可中断(interrupt)方法

      3.1.2 TimeUnit代替Thread.sleep

      Thread.sleep能完成的TimeUnit同样能完成,而且更加清晰,对sleep提供了更好的封装。

      3.2 yield

      3.2.1 介绍

      提醒调度器我愿意放弃当前的cpu资源,如果CPU的资源不紧张,就会忽略这个提醒。

      3.2.2 yield和sleep

  • sleep会导致当前线程暂停指定的时间,没有CPU时间片的消耗。
  • yield只是堆CPU调度器的一个提示,如果CPU调度器没有忽略这个提示,它会导致线程上下文的切换。
  • sleep会导致线程短暂的block,会在给定的时间内释放CPU资源。
  • yield会使RUNNING状态的Thread进入RUNNABLE状态(如果CPU调度器没有忽略这个提示)。
  • sleep几乎百分之百的完成了指定时间的休眠,而yield的提示并不能一定担保。
  • 一个线程sleep另一个线程interrupt会捕获到中断信号,而yield则不会。

    3.3 设置线程优先级

    thread.setPriority(int) 取值1<= num <=10之间。

  • 对于root用户,它会hint操作系统你想要设置的优先级,否则它会被忽略。

  • 如果CPU比较忙,设置优先级可能会获取更多的CPU时间片,但是闲时优先级的高低几乎不会有任何作用。

main线程的优先级为5,线程start前设置方能生效,子线程的优先级不能大于父线程的优先级。

3.4 获取线程ID

thread.getId()

3.5 获取当前线程

Thread.currentThread()

3.6 设置线程上下文类加载器

TODO

3.7 interrupt

3.7.1 interrupt

3.7.2 isInterrupted

3.7.3 interrupted

3.7.4 interrupted注意事项

3.8 线程join

同样是个可中断(interrupt)方法。

3.8.1 join详解

join方法会使当前线程永远的等待下去,直到被另一个线程中断,或者join的线程执行结束;

  1. package com.zoro.concurrent.chapter01;
  2. import java.util.List;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.stream.Collectors;
  5. import java.util.stream.IntStream;
  6. /**
  7. * Thread-join
  8. *
  9. * @author yx.jiang
  10. * @date 2021/7/27 10:26
  11. */
  12. public class ThreadJoin {
  13. /**
  14. * ①join输出结果:
  15. * <p>thread1.join,thread2.join;main线程阻塞直至thread1、thread2执行完毕!</p>
  16. * ……
  17. * 1 # 8
  18. * 2 # 8
  19. * 2 # 9
  20. * 1 # 9
  21. * main # 0
  22. * main # 1
  23. * main # 2
  24. * ……
  25. *
  26. * ② 倘若注释掉 thread.join
  27. * 那么输出内容为:
  28. * <p>thread1,thread2,main交替输出</p>
  29. * ……
  30. * 1 # 4
  31. * 2 # 4
  32. * main # 4
  33. * 1 # 5
  34. * 2 # 5
  35. * main # 5
  36. * 1 # 6
  37. * 2 # 6
  38. * main # 6
  39. * ……
  40. */
  41. public static void main(String[] args) throws InterruptedException{
  42. List<Thread> threadList = IntStream.range(1, 3).mapToObj(ThreadJoin::createThread).collect(Collectors.toList());
  43. threadList.forEach(Thread::start);
  44. /*for (Thread thread : threadList) {
  45. thread.join();
  46. }*/
  47. for (int i = 0; i < 10; i++) {
  48. System.out.println(Thread.currentThread().getName() + " # " + i);
  49. shortSleep();
  50. }
  51. }
  52. /**
  53. * 创建线程
  54. * @param seq 线程名
  55. * @return
  56. */
  57. private static Thread createThread(int seq) {
  58. return new Thread(() -> {
  59. for (int i = 0; i < 10; i++) {
  60. System.out.println(Thread.currentThread().getName() + " # " + i);
  61. shortSleep();
  62. }
  63. }, String.valueOf(seq));
  64. }
  65. /**
  66. * 短暂睡眠
  67. */
  68. private static void shortSleep() {
  69. try {
  70. TimeUnit.SECONDS.sleep(1);
  71. } catch (InterruptedException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }

3.8.2 join结合实战

查询各个航班公司的航班信息【接口或调用方式不同】,可通过多线程‘并行’查询。

  1. package com.zoro.concurrent.chapter03;
  2. import java.util.ArrayList;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.stream.Collectors;
  6. /**
  7. * 航班查询-例
  8. *
  9. * @author yx.jiang
  10. * @date 2021/7/27 10:26
  11. */
  12. public class FightQueryExample {
  13. /**
  14. * 合作的航空公司
  15. */
  16. public static List<String> fightCompany = Arrays.asList("CSA","CEA", "HNA");
  17. public static void main(String[] args) {
  18. List<String> results = search("北京", "上海");
  19. System.out.println("=======查询结果=======");
  20. results.forEach(System.out::println);
  21. }
  22. /**
  23. * 查询并汇总
  24. *
  25. * @param origin 出发地
  26. * @param destination 目的地
  27. * @return 各航空公司航班汇总数据
  28. */
  29. public static List<String> search(String origin, String destination) {
  30. final List<String> result = new ArrayList<>();
  31. //创建【各个航空公司】航班数据的 查询线程
  32. List<FightQueryTask> taskList = fightCompany.stream()
  33. .map(airLine -> createSearchTask(airLine, origin, destination))
  34. .collect(Collectors.toList());
  35. //启用线程
  36. taskList.forEach(FightQueryTask::start);
  37. taskList.forEach(task -> {
  38. try {
  39. task.join();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. });
  44. //在此之前,当前线程会被阻塞,获取每一个查询线程的结果,并加入到result中
  45. taskList.stream().map(FightQueryTask::get).forEach(result::addAll);
  46. return result;
  47. }
  48. public static FightQueryTask createSearchTask(String airline, String origin, String destination){
  49. return new FightQueryTask(airline, origin, destination);
  50. }
  51. }
  52. /**
  53. * 输出结果:
  54. * [ CSA ]-query 从 北京 到 上海
  55. * [ HNA ]-query 从 北京 到 上海
  56. * [ CEA ]-query 从 北京 到 上海
  57. * 航班:[ HNA ] 列表查询成功
  58. * 航班:[ CSA ] 列表查询成功
  59. * 航班:[ CEA ] 列表查询成功
  60. * =======查询结果=======
  61. * [ CSA ]-2
  62. * [ CEA ]-6
  63. * [ HNA ]-2
  64. */

3.9关闭线程

废弃的方法 stop()关闭时不会释放掉monitor的锁。
3.9.1 正常关闭

  1. 线程结束生命周期 正常结束
  2. 捕获中断信号关闭线程 ```java package com.zoro.concurrent.chapter03;

import java.util.concurrent.TimeUnit;

/**

  • 线程中断退出 *
  • @author yx.jiang
  • @date 2021/7/27 10:26 */ public class InterruptThreadExit {

    /**

    • 以下两种方式均会导致线程的正常结束,输出如下: *
    • 开始工作~
    • 系统即将中断
    • 工作完毕 */ public static void main(String[] args) throws InterruptedException { Thread thread = new Thread() {

      1. /*@Override
      2. public void run() {
      3. System.out.println("开始工作~");
      4. while (!isInterrupted()) {
      5. //working
      6. }
      7. System.out.println("工作完毕");
      8. }*/
      9. @Override
      10. public void run() {
      11. System.out.println("开始工作~");
      12. for (;;){
      13. try {
      14. TimeUnit.SECONDS.sleep(1);
      15. } catch (InterruptedException e) {
      16. break;
      17. }
      18. }
      19. System.out.println("工作完毕");
      20. }

      };

      thread.start(); TimeUnit.SECONDS.sleep(1); System.out.println(“系统即将中断”); thread.interrupt(); } } ```

  1. 使用volatile开关控制

由于线程的interrupt表示可能被擦除,或者逻辑单元中不会调用任何可中断方法,所以使用volatile修饰的开关flag关闭线程也是一种做法。

  1. package com.zoro.concurrent.chapter03;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * 线程中断退出
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class FlagThreadExit {
  10. static class MyTask extends Thread{
  11. /**
  12. * 已关闭
  13. */
  14. private volatile boolean closed = false;
  15. @Override
  16. public void run() {
  17. System.out.println("开始工作~");
  18. while (!closed && !isInterrupted()){
  19. //正在运行
  20. }
  21. System.out.println("工作完毕");
  22. }
  23. public void setClosed(boolean closed) {
  24. this.closed = closed;
  25. }
  26. }
  27. /**
  28. * 输出分别为:
  29. * ①中断-interrupt
  30. * 开始工作~
  31. * 系统即将中断
  32. * 已中断
  33. * 工作完毕
  34. *
  35. * ②关闭-closed
  36. * 开始工作~
  37. * 系统即将关闭
  38. * 系统已关闭
  39. * 工作完毕
  40. */
  41. public static void main(String[] args) throws InterruptedException {
  42. MyTask myTask = new MyTask();
  43. myTask.start();
  44. TimeUnit.SECONDS.sleep(1);
  45. System.out.println("系统即将中断");
  46. myTask.interrupt();
  47. System.out.println("已中断");
  48. // System.out.println("系统即将关闭");
  49. // myTask.setClosed(true);
  50. // System.out.println("系统已关闭");
  51. }
  52. }

3.9.2 异常退出
一个线程的执行单元(Thread/Runnable中的run())中不允许抛出checked异常,如在线程运行过程中需要捕获checked异常来判断是否有运行下去的必要,可以通过将checked异常封装为unchecked异常(RuntimeException)抛出进而结束线程的生命周期。
3.9.3 进程假死

线程虽然存在,但没有任何输出、不进行任何作业

阻塞、死锁等。
通过jconsole、jstack、jvisualvm等工具进行死锁的判断。

4. 线程安全与数据同步

4.1 数据同步

4.1.1 数据不一致问题

将号码最大值改为500。
上文例子中的线程安全问题

  1. 某个号码被略过
  2. 某个号码出现多次
  3. 号码超出最大值500

    4.2 synchronized关键字

    在jdk1.5前,要解决上述问题需使用synchronized关键字(提供了一种排他机制)。

    4.2.1 什么是synchronized

  • synchronized关键字提供了一种锁的机制,能够确保共享变量的互斥访问,从而防止数据不一致问题的出现。
  • synchronized关键字包括monitor enter 和 monitor exit两个JVM指令,它能够保证在任何时候任何线程执行到monitor enter成功之前都必须从主内存中获取数据,而不是从缓存中,在monitor exit运行成功之后,共享变量被更新后的值必须刷入主内存。
  • synchronized的指令严格遵守java happens-before 规则,一个monitor exit 指令之前必定要有一个monitor enter。

    4.2.2 synchronized的用法

    synchronized可以用于对代码块或方法进行修饰,而不能够用于对class以及变量进行修饰。
  1. 同步方法

[default|public|private|protected] synchronized [static] type method()。示例如下:
_public synchronized void _sync(){
……
}
_public static synchronized void _staticSync(){
……
}

  1. 同步代码块

private final _Object MUTEX = _new _Object();
_public void _sync(){
_synchronized
(MUTEX) {
……
}
}
改写叫号程序:
无论运行多少次,不再出现数据不一致的问题。

  1. package com.zoro.concurrent.chapter04;
  2. /**
  3. * 售票窗口——implements Runnable
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. public class TicketWindowRunnable implements Runnable {
  9. private static final int MAX = 50;
  10. private static int index = 1;
  11. private static final Object MUTEX = new Object();
  12. @Override
  13. public void run() {
  14. synchronized (MUTEX) {
  15. while (index <= MAX) {
  16. System.out.println(Thread.currentThread().getName() + " 的号码是:" + (index++));
  17. try {
  18. Thread.sleep(100);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }
  25. /**
  26. * index存在线程安全问题
  27. */
  28. public static void main(String[] args) {
  29. TicketWindowRunnable ticketWindowRunnable = new TicketWindowRunnable();
  30. Thread t1 = new Thread(ticketWindowRunnable, "1号");
  31. Thread t2 = new Thread(ticketWindowRunnable, "2号");
  32. Thread t3 = new Thread(ticketWindowRunnable, "3号");
  33. Thread t4 = new Thread(ticketWindowRunnable, "4号");
  34. t1.start();
  35. t2.start();
  36. t3.start();
  37. t4.start();
  38. }
  39. }

4.3 深入synchronized关键字

4.3.1 线程堆栈分析

synchronization 关键字提供了一种互斥机制(在同一时刻只能有一个线程访问同步资源),准确的讲:是某线程获取了与mutex关联的monitor锁。(将synchronized(mutex)称为

  1. package com.zoro.concurrent.chapter04;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * mutex锁
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class Mutex {
  10. private static final Object MUTEX = new Object();
  11. /**
  12. * 使用资源
  13. */
  14. public void accessResource() {
  15. synchronized (MUTEX) {
  16. try {
  17. TimeUnit.MINUTES.sleep(10);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. public static void main(String[] args) {
  24. final Mutex mutex = new Mutex();
  25. for (int i = 0; i < 5; i++) {
  26. new Thread(mutex::accessResource).start();
  27. }
  28. }
  29. }

image.png
Thread-1 拥有资源
image.png
Thead-0、Thead-2、Thead-3、Thead-4 正在BLOCKED
使用jstack命令打印进程的堆栈信息:
jstack <pid> 如: jstack 32776 32776对应 jconsole 中的 pid
image.png
image.png
可以看到仅Thread-0拥有,而其他线程均在阻塞等待。

4.3.2 JVM指令分析

使用 jdk 命令 javap 对 Mutex class 进行反汇编,输出了大量的JVM指令,可以发现monitor enter 和 monitor exit是成对出现的(有时会出现一个monitor enter对应多个monitor exit),但是每一个monitor exit前必有对应的monitor enter。

  1. 在src目录下键入如下命令并运行:

javap -c com.zoro.concurrent.chapter04.Mutex
如若在 com.zoro.concurrent.chapter04 下 直接键入 javap -c Mutex

  1. 亦可通过idea插件jclasslib bytecode viewer查看

查看之前先编译build
image.png

  1. Compiled from "Mutex.java"
  2. public class com.zoro.concurrent.chapter04.Mutex {
  3. public com.zoro.concurrent.chapter04.Mutex();
  4. Code:
  5. 0: aload_0
  6. 1: invokespecial #1 // Method java/lang/Object."<init>":()V
  7. 4: return
  8. public void accessResource();
  9. Code:
  10. 0: getstatic #2 // Field MUTEX:Ljava/lang/Object;
  11. 3: dup
  12. 4: astore_1
  13. 5: monitorenter
  14. 6: getstatic #3 // Field java/util/concurrent/TimeUnit.MINUTES:Ljava/util/concurrent/TimeUnit;
  15. 9: ldc2_w #4 // long 10l
  16. 12: invokevirtual #6 // Method java/util/concurrent/TimeUnit.sleep:(J)V
  17. 15: goto 23
  18. 18: astore_2
  19. 19: aload_2
  20. 20: invokevirtual #8 // Method java/lang/InterruptedException.printStackTrace:()V
  21. 23: aload_1
  22. 24: monitorexit
  23. 25: goto 33
  24. 28: astore_3
  25. 29: aload_1
  26. 30: monitorexit
  27. 31: aload_3
  28. 32: athrow
  29. 33: return
  30. Exception table:
  31. from to target type
  32. 6 15 18 Class java/lang/InterruptedException
  33. 6 25 28 any
  34. 28 31 28 any
  35. public static void main(java.lang.String[]);
  36. Code:
  37. 0: new #9 // class com/zoro/concurrent/chapter04/Mutex
  38. 3: dup
  39. 4: invokespecial #10 // Method "<init>":()V
  40. 7: astore_1
  41. 8: iconst_0
  42. 9: istore_2
  43. 10: iload_2
  44. 11: iconst_5
  45. 12: if_icmpge 42
  46. 15: new #11 // class java/lang/Thread
  47. 18: dup
  48. 19: aload_1
  49. 20: dup
  50. 21: invokevirtual #12 // Method java/lang/Object.getClass:()Ljava/lang/Class;
  51. 24: pop
  52. 25: invokedynamic #13, 0 // InvokeDynamic #0:run:(Lcom/zoro/concurrent/chapter04/Mutex;)Ljava/lang/Runnable;
  53. 30: invokespecial #14 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V
  54. 33: invokevirtual #15 // Method java/lang/Thread.start:()V
  55. 36: iinc 2, 1
  56. 39: goto 10
  57. 42: return
  58. static {};
  59. Code:
  60. 0: new #16 // class java/lang/Object
  61. 3: dup
  62. 4: invokespecial #1 // Method java/lang/Object."<init>":()V
  63. 7: putstatic #2 // Field MUTEX:Ljava/lang/Object;
  64. 10: return
  65. }

选片段,着重分析:
①获取到MUTEX引用,然后执行②monitorenter JVM指令,休眠结束后goto至③monitorexit的位置(astore存储引用至本地变量表:aload从本地变量表加载引用;getstatic 从class中获得静态属性)

  1. public void accessResource();
  2. Code:
  3. 0: getstatic ①获取MUTEX
  4. 3: dup
  5. 4: astore_1
  6. 5: monitorenter ②执行monitorenter JVM指令
  7. 6: getstatic #3 // Field java/util/concurrent/TimeUnit.MINUTES:Ljava/util/concurrent/TimeUnit;
  8. 9: ldc2_w #4 // long 10l
  9. 12: invokevirtual #6 // Method java/util/concurrent/TimeUnit.sleep:(J)V
  10. 15: goto 23 ③跳转到23
  11. 18: astore_2
  12. 19: aload_2
  13. 20: invokevirtual #8 // Method java/lang/InterruptedException.printStackTrace:()V
  14. 23: aload_1
  15. 24: monitorexit ⑤执行monitorexit JVM指令
  16. 25: goto 33
  17. 28: astore_3
  18. 29: aload_1
  19. 30: monitorexit
  20. 31: aload_3
  21. 32: athrow
  22. 33: return
  1. Monitorenter

每个对象都与一个monitor相关联,一个monitor的lock的锁只能被一个线程在同一时间获得,在一个线程尝试获得与对象关联monitor的所有权会发生如下的几件事:

  1. 如果monitor的计数器为0,则意味着monitor的lock还没被获得,某个线程获得后立即对该计数器加一,自此该线程就是这个monitor的所有者了。
  2. 如果一个已经拥有该monitor所有权的线程重入,就会导致monitor的计数器再次累加,
  3. 如果monitor已经被其他线程所拥有,则其他线程尝试获取该monitor的所有权时,会被陷入阻塞状态直到monitor计数器变为0,才能再次尝试获取对monitor的所有权。
    1. Monitorexit

释放对monitor的所有权,想要释放对某个对象关联的monitor的所有权的前提是,你曾经获取了所有权。释放monitor所有权就是将monitor的计数器减一,如果计数器的结果为0,就意味着该线程不再拥有该monitor的所有权,也就是解锁。同时被该monitor block的线程将再次尝试获取对该monitor的所有权。

4.3.3 使用synchronized要注意的问题

  1. 与monitor关联的对象不能为空

    1. private final Object mutex = null;
    2. public void syncMethod() {
    3. synchronized (mutex){
    4. //TODO
    5. }
    6. }
  2. synchronized作用域太大

对整个线程的执行单元进行了synchronized同步,从而丧失了并发能力,synchronized应尽可能地只作用于共享资源(数据)的读写作用域。

  1. public class Task implements Runnable{
  2. @Override
  3. public synchronized void run() {
  4. //TODO
  5. }
  6. }
  1. 不同的synchronized企图锁相同的方法

构造了5个线程,也构造了5个Runnable实例,Runnable作为线程执行逻辑单元传递给Thread,synchronized根本互斥不了与之对应的作用域,线程之间进行monitor lock的争抢只能发生在与monitor关联的同一个引用上,下面的代码每一个线程争抢的monitor关联的引用都是彼此独立的,so不能起到互斥作用。

  1. public static class Task implements Runnable{
  2. private final Object MUTEX = new Object();
  3. @Override
  4. public synchronized void run() {
  5. //TODO
  6. synchronized (MUTEX) {
  7. //TODO
  8. }
  9. }
  10. }
  11. public static void main(String[] args) {
  12. for (int i = 0; i < 5; i++) {
  13. new Thread(Task::new).start();
  14. }
  15. }
  1. 多个锁的交叉导致死锁

    1. private final Object MUTEX_READ = new Object();
    2. private final Object MUTEX_WRITE = new Object();
    3. public void read () {
    4. synchronized (MUTEX_READ) {
    5. synchronized (MUTEX_WRITE) {
    6. //...
    7. }
    8. }
    9. }
    10. public void write () {
    11. synchronized (MUTEX_WRITE) {
    12. synchronized (MUTEX_READ) {
    13. //...
    14. }
    15. }
    16. }

    4.4 This Monitor 和 Class Monitor

    4.4.1 this monitor

    ```java package com.zoro.concurrent.chapter04;

import java.util.concurrent.TimeUnit;

/**

  • this monitor
  • synchronized 修饰同一个对象的不同方法 *
  • @author yx.jiang
  • @date 2021/7/27 10:26 */ public class ThisMonitor01 { public synchronized void method1() {

    1. System.out.println(Thread.currentThread().getName() + " 进入method1");
    2. try {
    3. TimeUnit.MINUTES.sleep(10);
    4. } catch (InterruptedException e) {
    5. e.printStackTrace();
    6. }

    }

    public synchronized void method2() {

    1. System.out.println(Thread.currentThread().getName() + " 进入method2");
    2. try {
    3. TimeUnit.MINUTES.sleep(10);
    4. } catch (InterruptedException e) {
    5. e.printStackTrace();
    6. }

    }

    public static void main(String[] args) {

    1. ThisMonitor01 thisMonitor01 = new ThisMonitor01();
    2. new Thread(thisMonitor01::method1, "T1").start();
    3. new Thread(thisMonitor01::method2, "T2").start();

    } }

//输出: T1 进入method1

  1. `jconsole` 查看pid和线程信息<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/708204/1627453298175-8c63f191-b6c7-45c7-ab01-15f6dd950e88.png#height=690&id=u56bdbee8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1380&originWidth=2559&originalType=binary&ratio=1&size=162234&status=done&style=none&width=1279.5)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/708204/1627453324439-8a8ab39e-7077-40ef-9d90-e33bba15e10e.png#height=146&id=udeeb2075&margin=%5Bobject%20Object%5D&name=image.png&originHeight=292&originWidth=925&originalType=binary&ratio=1&size=31964&status=done&style=none&width=462.5)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/708204/1627453311939-7a3ca0da-38f2-4f63-9cd3-3db6cb3b3fb5.png#height=132&id=u49d845cb&margin=%5Bobject%20Object%5D&name=image.png&originHeight=264&originWidth=898&originalType=binary&ratio=1&size=25567&status=done&style=none&width=449)<br />`jstack pid` 分析线程的堆栈信息:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/708204/1627453576229-be17e20e-3ae4-438b-a373-78388a82f30b.png#height=246&id=ua2fafe5c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=492&originWidth=1421&originalType=binary&ratio=1&size=103026&status=done&style=none&width=710.5)<br />修改为: **synchronized修饰method1; method2中synchronized同步代码块,锁的是this**
  2. ```java
  3. package com.zoro.concurrent.chapter04;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * this monitor
  7. * synchronized修饰method1; method2中synchronized同步代码块,锁的是this
  8. *
  9. * @author yx.jiang
  10. * @date 2021/7/27 10:26
  11. */
  12. public class ThisMonitor02 {
  13. public synchronized void method1() {
  14. System.out.println(Thread.currentThread().getName() + " 进入method1");
  15. try {
  16. TimeUnit.MINUTES.sleep(10);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. public synchronized void method2() {
  22. System.out.println(Thread.currentThread().getName() + " 进入method2");
  23. try {
  24. TimeUnit.MINUTES.sleep(10);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. public static void main(String[] args) {
  30. ThisMonitor02 thisMonitor01 = new ThisMonitor02();
  31. new Thread(thisMonitor01::method1, "T1").start();
  32. new Thread(thisMonitor01::method2, "T2").start();
  33. }
  34. }

效果一样.

4.4.2 class monitor

  1. package com.zoro.concurrent.chapter04;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * class monitor
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class ClassMonitor01 {
  10. public static synchronized void method1() {
  11. System.out.println(Thread.currentThread().getName() + " 进入method1 ");
  12. try {
  13. TimeUnit.MINUTES.sleep(10);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. public static synchronized void method2() {
  19. System.out.println(Thread.currentThread().getName() + " 进入method2 ");
  20. try {
  21. TimeUnit.MINUTES.sleep(10);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. public static void main(String[] args) {
  27. new Thread(ClassMonitor01::method1, "T1").start();
  28. new Thread(ClassMonitor01::method2, "T2").start();
  29. }
  30. }
  31. //输出
  32. T1 进入method1

image.png
image.png
image.png
更改method02为同步代码块,锁ClassMonitor02.class

  1. package com.zoro.concurrent.chapter04;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * class monitor
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class ClassMonitor02 {
  10. public static synchronized void method1() {
  11. System.out.println(Thread.currentThread().getName() + " 进入method1 ");
  12. try {
  13. TimeUnit.MINUTES.sleep(10);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. public static void method2() {
  19. synchronized (ClassMonitor02.class) {
  20. System.out.println(Thread.currentThread().getName() + " 进入method2 ");
  21. try {
  22. TimeUnit.MINUTES.sleep(10);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. public static void main(String[] args) {
  29. new Thread(ClassMonitor02::method1, "T1").start();
  30. new Thread(ClassMonitor02::method2, "T2").start();
  31. }
  32. }

堆栈信息, 结果同上.

4.5 程序死锁的原因及诊断

4.5.1 程序死锁

  1. 交叉锁可导致程序出现死锁

线程A持有资源R1的锁等待获取R2的锁,线程B持有R2的锁等待获取R1的锁.

  1. 内存不足

两个线程T1,T2,执行某个任务,T1获取10Mb内存,T2获取了20Mb内存,如果每个执行单元都需要30Mb的内存,但是剩余的可用内存刚好为20Mb,那么两个线程有可能都在等彼此能够释放内存资源

  1. 一问一答式的数据交换

一问一答,由于某种原因导致服务端错过了客户端的请求, 此时客户端和服务端都在等待双方发送数据

  1. 数据库锁

不论是表级别的还是行级别的锁, 如: 某个线程执行了for update语句退出了事务,其他线程在访问该数据库时都将陷入死锁

  1. 文件锁

某线程获取了文件锁意外退出,其他读取该文件的线程将会进入死锁直到系统释放文件句柄资源

  1. 死循环引起的死锁

查看线程堆栈信息不会发现死锁的现象,但是程序不工作,CPU占有率居高不下, 这种死锁又称为系统假死.
致命且难排查的死锁现象

4.5.2 程序死锁举例

  • 见4.5.1.1 和 4.3.3.4;
  • 多线程下hashmap导致的死循环;

    4.5.3 死锁诊断

  1. 交叉锁引起的死锁

jstack 或 jconsole 查看; 一般交叉锁引起的死锁线程都会进入BLOCKED状态, CPU资源占用不高, 容易借助工具发现
jstack-1 PID 会直接发现死锁的信息;

  1. 死循环引起的死锁(假死)

因为工作线程并未BLOCKED, 而是始终处于RUNNABLE状态, CPU居高不下, 甚至都不能正常运行你的命令
可使用jstack, jconsole, jvisualvm, jProfiler(收费)进行诊断.

5. 线程通信

5.1 同步阻塞和异步非阻塞

5.1.1 同步阻塞消息处理

例: 客户端提交Event至服务器, 服务器收到客户端请求之后开辟线程处理客户请求, 经过较复杂的业务计算后将结果返给客户端.

  • 同步Event提交, 客户端等待时间过长(提交Event时长 + 接受Event创建Thread时长 + 业务处理时长 + 返回结果时长) 会陷入阻塞, 导致二次提交Event耗时过长.
  • 由于客户端提交的Event数量不多, 导致系统同时受理业务数量有限, 也就是系统整体的吞吐量不高
  • 这种一个线程处理一个Event的方式, 会导致出现频繁的创建开启与销毁, 从而增加系统额外开销.
  • 在业务达到峰值的时候, 大量的业务处理线程阻塞会导致频繁的CPU上下文切换, 从而降低系统性能

    5.1.2 异步非阻塞消息处理

    客户端提交Event后会得到一个相应的工单并且立即返回, Event则会被放置在Event队列中, 服务端有若干个工作线程, 不断的从Event队列中获取任务并且进行异步处理, 最后将处理结果保存至另外一个结果集中,如果客户端想要获得处理结果, 则可凭借工单号再次查询

    5.2 单线程通信

    5.2.1 初识notify和wait

    EventQueue
    ```java package com.zoro.concurrent.chapter05;

import java.util.LinkedList;

/**

  • 【线程间通信】事件队列
  • 队列满 - 最多容纳多少Event,好比一个系统最多同时能够受理多少业务一样
  • 队列空 - 当所有的Event都被处理并且没有新的Event被提交的时候,此时队列是空的状态
  • 有Event但没有满 - 有新的event被提交,但是此时没有达到队列的上限 *
  • @author yx.jiang
  • @date 2021/7/27 10:26 */ public class EventQueue { private final int max;

    static class Event {

    }

    private final LinkedList eventQueue = new LinkedList<>();

    private static final int DEFAULT_MAX_EVENT = 10;

    public EventQueue() {

    1. this(DEFAULT_MAX_EVENT);

    }

    public EventQueue(int max) {

    1. this.max = max;

    }

    /**

    • 提交事件到队列尾 *
    • @param event 事件 */ public void offer(Event event) { synchronized (eventQueue) {

      1. if (eventQueue.size() >= max) {
      2. console("队列满了。");
      3. try {
      4. eventQueue.wait();
      5. } catch (InterruptedException e) {
      6. e.printStackTrace();
      7. }
      8. }
      9. console("事件已提交。");
      10. eventQueue.addLast(event);
      11. eventQueue.notify();

      } }

      /**

    • 从对头获取数据 *
    • @return event */ public Event take() { synchronized (eventQueue) {

      1. if (eventQueue.isEmpty()) {
      2. console("队列空了。");
      3. try {
      4. eventQueue.wait();
      5. } catch (InterruptedException e) {
      6. e.printStackTrace();
      7. }
      8. }
      9. Event event = eventQueue.removeFirst();
      10. this.eventQueue.notify();
      11. console("事件 " + event + " 已处理");
      12. return event;

      } }

      private void console(String message) { System.out.printf(“%s:%s \n”, Thread.currentThread().getName(), message); } }

  1. <a name="Lp3yn"></a>
  2. ##### EventClient
  3. ```java
  4. package com.zoro.concurrent.chapter05;
  5. /**
  6. * 事件客户端
  7. *
  8. * @author yx.jiang
  9. * @date 2021/7/27 10:26
  10. */
  11. public class EventClient {
  12. /**
  13. * 输出如下:
  14. * Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@6f497ac0 已处理
  15. * Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@2b474a9c 已处理
  16. * Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@3a867800 已处理
  17. * Consumer:队列空了。
  18. * Producer:事件已提交。
  19. * Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@4f032d53 已处理
  20. * Consumer:队列空了。
  21. * Producer:事件已提交。
  22. * Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@6bb8d109 已处理
  23. * Consumer:队列空了。
  24. * Producer:事件已提交。
  25. */
  26. public static void main(String[] args) {
  27. final EventQueue eventQueue = new EventQueue();
  28. new Thread(() -> {
  29. for (; ; ) {
  30. eventQueue.offer(new EventQueue.Event());
  31. }
  32. }, "Producer").start();
  33. new Thread(() -> {
  34. for (; ; ) {
  35. eventQueue.take();
  36. }
  37. }, "Consumer").start();
  38. }
  39. }

5.2.2 wait和notify详解

Object中的方法
image.png
image.png

  • Object的wait方法必须拥有该对象的monitor,也就是wait方法必须在同步方法中使用
  • 当前线程执行了该对象的wait方法后, 将会放弃对该monitor的所有权并且进入与该对象关联的wait set中, 也就是说一旦线程执行了某个object的wait方法之后, 它就会释放对该对象monitor的所有权, 其他线程也会有机会继续争抢该monitor的所有权

image.png

5.2.3 wait,notify注意事项

  • wait方法是可中断方法, [被打断后会收到中断异常interruptException, 同时interrupt标识也会被擦除]
  • 线程执行了某个对象的wait方法后,会加入与之对应的wait set 中, 每一个对象的monitor都有与之对应的wait set
  • 当线程进入wait set 之后, notify可将其唤醒, 也就是从wait set中弹出, 同时中断wait中的线程也会将其唤醒
  • 必须在同步方法中使用wait和notify方法,因为执行wait和notify的前提是必须持有同步方法的monitor的所有权

错误示例: 需在同步代码块中
private void _testWait () {
_try
{
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void _testNotify() {
_this
.notify();
}

  • 同步代码块的monitor必须与执行wait notify方法的对象一致,简单的说就是用哪个对象的monitor进行同步,就只能用哪个对象进行wait和notify操作.

错误示例: 应当用this
private final _Object MUTEX = _new _Object();
_private synchronized void _testWait() {
_try
{
MUTEX.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

_private synchronized void _testNotify() {
MUTEX.notify();
}

5.2.4 wait和sleep

  1. 都可以使线程进入阻塞状态
  2. 均是可中断方法, 被中断后都会收到中断异常
  3. wait是Object的方法, sleep是Thread的方法
  4. wait需要在同步代码块中执行, 而sleep不需要
  5. 线程在同步方法中执行sleep时,并不会释放monitor锁, 而wait方法则会释放monitor的锁
  6. sleep方法短暂休眠后会主动退出阻塞, 而wait(未指定wait时间)则需要被其他线程notify/中断后才能退出阻塞

    5.3 多线程间通信

    EventQueueEventClient 在多线程下会出现以下问题:

  7. LinkedList 为空时执行removeFirst方法

  8. LinkedList元素为10时执行addList方法

    5.3.1 改进:

    将临界值的判断if更改改为while, 将notify 更改为 notifyAll ```java package com.zoro.concurrent.chapter05;

import java.util.LinkedList;

/**

  • 【线程间通信】事件队列
  • 队列满 - 最多容纳多少Event,好比一个系统最多同时能够受理多少业务一样
  • 队列空 - 当所有的Event都被处理并且没有新的Event被提交的时候,此时队列是空的状态
  • 有Event但没有满 - 有新的event被提交,但是此时没有达到队列的上限 *
  • @author yx.jiang
  • @date 2021/7/27 10:26 */ public class EventQueueSafe { private final int max;

    static class Event {

    }

    private final LinkedList eventQueue = new LinkedList<>();

    private static final int DEFAULT_MAX_EVENT = 10;

    public EventQueueSafe() {

    1. this(DEFAULT_MAX_EVENT);

    }

    public EventQueueSafe(int max) {

    1. this.max = max;

    }

    /**

    • 提交事件到队列尾 *
    • @param event 事件 */ public void offer(Event event) { synchronized (eventQueue) {

      1. while (eventQueue.size() >= max) {
      2. console("队列满了。");
      3. try {
      4. eventQueue.wait();
      5. } catch (InterruptedException e) {
      6. e.printStackTrace();
      7. }
      8. }
      9. console("事件已提交。");
      10. eventQueue.addLast(event);
      11. eventQueue.notifyAll();

      } }

      /**

    • 从对头获取数据 *
    • @return event */ public Event take() { synchronized (eventQueue) {

      1. while (eventQueue.isEmpty()) {
      2. console("队列空了。");
      3. try {
      4. eventQueue.wait();
      5. } catch (InterruptedException e) {
      6. e.printStackTrace();
      7. }
      8. }
      9. Event event = eventQueue.removeFirst();
      10. this.eventQueue.notifyAll();
      11. console("事件 " + event + " 已处理");
      12. return event;

      } }

      private void console(String message) { System.out.printf(“%s:%s \n”, Thread.currentThread().getName(), message); } }

  1. <a name="YkSuU"></a>
  2. #### 5.3.2 线程休息室wait set
  3. <a name="AwQgt"></a>
  4. ### 5.4 自定义显式锁BooleanLock
  5. > BooleanLock类似java.utils包下的Lock
  6. <a name="Fa7Bn"></a>
  7. #### 5.4.1 synchronized 的缺陷
  8. 1. 无法控制阻塞时长
  9. 2. 阻塞不可被中断
  10. ```java
  11. package com.zoro.concurrent.chapter05;
  12. import java.util.concurrent.TimeUnit;
  13. /**
  14. * synchronized缺陷
  15. *
  16. * @author yx.jiang
  17. * @date 2021/7/27 10:26
  18. */
  19. public class SynchronizedDefect {
  20. //同步方法
  21. public synchronized void syncMethod() {
  22. try {
  23. TimeUnit.HOURS.sleep(1);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. /**
  29. * 1.
  30. * T1线程先进入同步方法(t1启动后主线程休眠了2ms), T2线程启动执行{@link SynchronizedDefect#syncMethod}时会进入阻塞,
  31. * T2何时能获取{@link SynchronizedDefect#syncMethod}的执行权, 取决于T1何时释放; 如果T2计划最多等待1min获得执行权, 否则就放弃
  32. * 这种方式是无法做到的, 即: 阻塞时长无法控制
  33. *
  34. * 2.
  35. * TT2因争抢某个monitor的锁而进入阻塞状态, 它是无法中断的, 虽然可以设置tt2线程的interrupt标识, 但是synchronized不像sleep和wait
  36. * 那样可以获得中断信号
  37. */
  38. public static void main(String[] args) throws InterruptedException {
  39. //----------------------1.阻塞时长无法控制-------------------------
  40. SynchronizedDefect defect = new SynchronizedDefect();
  41. Thread t1 = new Thread(defect::syncMethod, "T1");
  42. //确保t1启动
  43. t1.start();
  44. TimeUnit.MILLISECONDS.sleep(2);
  45. Thread t2 = new Thread(defect::syncMethod, "T2");
  46. t2.start();
  47. //-----------------------2.阻塞不可被中断-----------------------
  48. SynchronizedDefect defect2 = new SynchronizedDefect();
  49. Thread tt1 = new Thread(defect2::syncMethod, "T1");
  50. //确保tt1启动
  51. tt1.start();
  52. TimeUnit.MILLISECONDS.sleep(2);
  53. Thread tt2 = new Thread(defect2::syncMethod, "T2");
  54. tt2.start();
  55. //确保tt2启动
  56. TimeUnit.MILLISECONDS.sleep(2);
  57. tt2.interrupt();
  58. //true
  59. System.out.println(tt2.isInterrupted());
  60. //BLOCKED
  61. System.out.println(tt2.getState());
  62. }
  63. }

5.4.2 显示锁BooleanLock

1. Lock接口
  1. package com.zoro.concurrent.chapter05.lock;
  2. import java.util.List;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 锁
  6. *
  7. * @author yx.jiang
  8. * @date 2021/7/27 10:26
  9. */
  10. public interface Lock {
  11. /**
  12. * 永远阻塞,除非获取到了锁, 和synchronized类似, 但是该方法是可以被中断的
  13. * 中断时会抛出{@link InterruptedException}
  14. */
  15. void lock() throws InterruptedException;
  16. /**
  17. * 除了可以被中断外, 还增加了对应的超时功能
  18. *
  19. * @param mills 超时时长
  20. */
  21. void lock(long mills) throws InterruptedException, TimeoutException;
  22. /**
  23. * 释放锁
  24. */
  25. void unLock();
  26. /**
  27. * 获取当前有哪些线程被阻塞
  28. * @return 被阻塞的线程
  29. */
  30. List<Thread> getBlockedThreads();
  31. }

2. 实现BooleanLock
  1. package com.zoro.concurrent.chapter05.lock;
  2. import java.util.ArrayList;
  3. import java.util.Collection;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import java.util.concurrent.TimeoutException;
  7. import static java.lang.System.currentTimeMillis;
  8. import static java.lang.Thread.currentThread;
  9. /**
  10. * 显式BooleanLock
  11. *
  12. * @author yx.jiang
  13. * @date 2021/7/27 10:26
  14. */
  15. public class BooleanLock implements Lock {
  16. /**
  17. * 当前拥有锁的线程
  18. */
  19. private Thread currentThread;
  20. /**
  21. * false: 当前该锁没有被任何线程获得或者已经释放
  22. * true: 该锁已被某个线程({@link BooleanLock#currentThread)获得
  23. */
  24. private boolean locked = false;
  25. /**
  26. * 存储 [在获取当前线程时被阻塞的线程]
  27. */
  28. private final List<Thread> blockedList = new ArrayList<>();
  29. @Override
  30. public void lock() throws InterruptedException {
  31. //使用同步代码块的方式进行同步
  32. synchronized (this) {
  33. //如果当前锁已经被某个线程获得, 则该线程将加入阻塞队列, 并且使当前线程wait释放对this monitor的所有权
  34. while (locked) {
  35. blockedList.add(currentThread());
  36. this.wait();
  37. }
  38. //如果当前锁没有被其他线程获得, 则该线程将尝试从阻塞队列中删除自己
  39. // (注: 如果当前线程从未进如阻塞队列, 删除方法无任何影响; 如果当前线程是从wait set中被唤醒的, 则需要从阻塞队列中将自己删除)
  40. blockedList.remove(currentThread());
  41. //指定locked开关为true
  42. this.locked = true;
  43. //记录获取锁的线程
  44. this.currentThread = currentThread();
  45. }
  46. }
  47. @Override
  48. public void lock(long mills) throws InterruptedException, TimeoutException {
  49. synchronized (this) {
  50. //mills不合法,默认调用lock()方法, 亦可抛出异常
  51. if (mills <= 0) {
  52. this.lock();
  53. } else {
  54. long remainingMills = mills;
  55. long endMills = currentTimeMillis() + remainingMills;
  56. while (locked) {
  57. //如果remainingMills<=0, 意味着当前线程被其他线程唤醒或者在指定的wait时间到了之后还没获得锁, 这时抛出超时异常
  58. if (remainingMills <= 0) {
  59. throw new TimeoutException("经" + mills + "ms未能获取锁");
  60. }
  61. if (!blockedList.contains(currentThread())) {
  62. blockedList.add(currentThread);
  63. }
  64. //等待remainingMills毫秒后,该值起初是由其他线程传入的, 但在多次wait过程中会重新计算
  65. this.wait(remainingMills);
  66. //重新计算remainingMills[剩余时间]
  67. remainingMills = endMills - currentTimeMillis();
  68. }
  69. //获得该锁, 并且从block列表中删除当前线程, 将locked的状态改为true并指定获得锁的线程就是当前线程
  70. blockedList.remove(currentThread());
  71. this.locked = true;
  72. this.currentThread = currentThread();
  73. }
  74. }
  75. }
  76. @Override
  77. public void unLock() {
  78. synchronized (this) {
  79. //判断当前线程是否为获取锁的那个线程, 止呕加了锁的线程才能解锁
  80. if (currentThread == currentThread()) {
  81. //设置锁的状态为false
  82. this.locked = false;
  83. //通知其他在wait set 中的线程, 可以再次尝试获取锁了, 亦可使用 this.notify();
  84. this.notifyAll();
  85. }
  86. }
  87. }
  88. @Override
  89. public List<Thread> getBlockedThreads() {
  90. return Collections.unmodifiableList(blockedList);
  91. }
  92. }

fixed: 某个线程被中断, 他将有可能还存在于blockList中:
image.png

  1. package com.zoro.concurrent.chapter05.lock;
  2. import java.util.ArrayList;
  3. import java.util.Collections;
  4. import java.util.List;
  5. import java.util.Optional;
  6. import java.util.concurrent.TimeoutException;
  7. import static java.lang.System.currentTimeMillis;
  8. import static java.lang.Thread.currentThread;
  9. /**
  10. * 显式BooleanLock
  11. * <p>fixed: 某个线程被中断, 他将有可能还存在于blockList中 </p>
  12. *
  13. * @author yx.jiang
  14. * @date 2021/7/27 10:26
  15. */
  16. public class BooleanLockFixed implements Lock {
  17. /**
  18. * 当前拥有锁的线程
  19. */
  20. private Thread currentThread;
  21. /**
  22. * false: 当前该锁没有被任何线程获得或者已经释放
  23. * true: 该锁已被某个线程({@link BooleanLockFixed#currentThread)获得
  24. */
  25. private boolean locked = false;
  26. /**
  27. * 存储 [在获取当前线程时被阻塞的线程]
  28. */
  29. private final List<Thread> blockedList = new ArrayList<>();
  30. @Override
  31. public void lock() throws InterruptedException {
  32. //使用同步代码块的方式进行同步
  33. synchronized (this) {
  34. //如果当前锁已经被某个线程获得, 则该线程将加入阻塞队列, 并且使当前线程wait释放对this monitor的所有权
  35. while (locked) {
  36. //暂存当前线程
  37. final Thread tempThread = currentThread();
  38. try {
  39. blockedList.add(tempThread);
  40. this.wait();
  41. } catch (InterruptedException e) {
  42. //⚠如果当前线程在wait时被中断, 则从blockedList中将其删除, 避免内存泄露
  43. blockedList.remove(tempThread);
  44. //继续抛出中断异常
  45. throw e;
  46. }
  47. }
  48. //如果当前锁没有被其他线程获得, 则该线程将尝试从阻塞队列中删除自己
  49. // (注: 如果当前线程从未进如阻塞队列, 删除方法无任何影响; 如果当前线程是从wait set中被唤醒的, 则需要从阻塞队列中将自己删除)
  50. blockedList.remove(currentThread());
  51. //指定locked开关为true
  52. this.locked = true;
  53. //记录获取锁的线程
  54. this.currentThread = currentThread();
  55. }
  56. }
  57. @Override
  58. public void lock(long mills) throws InterruptedException, TimeoutException {
  59. synchronized (this) {
  60. //mills不合法,默认调用lock()方法, 亦可抛出异常
  61. if (mills <= 0) {
  62. this.lock();
  63. } else {
  64. long remainingMills = mills;
  65. long endMills = currentTimeMillis() + remainingMills;
  66. while (locked) {
  67. final Thread tempThread = currentThread();
  68. try {
  69. //如果remainingMills<=0, 意味着当前线程被其他线程唤醒或者在指定的wait时间到了之后还没获得锁, 这时抛出超时异常
  70. if (remainingMills <= 0) {
  71. throw new TimeoutException("经" + mills + "ms未能获取锁");
  72. }
  73. if (!blockedList.contains(tempThread)) {
  74. blockedList.add(currentThread);
  75. }
  76. //等待remainingMills毫秒后,该值起初是由其他线程传入的, 但在多次wait过程中会重新计算
  77. this.wait(remainingMills);
  78. //重新计算remainingMills[剩余时间]
  79. remainingMills = endMills - currentTimeMillis();
  80. } catch (InterruptedException e) {
  81. blockedList.remove(tempThread);
  82. throw e;
  83. }
  84. }
  85. //获得该锁, 并且从block列表中删除当前线程, 将locked的状态改为true并指定获得锁的线程就是当前线程
  86. blockedList.remove(currentThread());
  87. this.locked = true;
  88. this.currentThread = currentThread();
  89. }
  90. }
  91. }
  92. @Override
  93. public void unLock() {
  94. synchronized (this) {
  95. //判断当前线程是否为获取锁的那个线程, 止呕加了锁的线程才能解锁
  96. if (currentThread == currentThread()) {
  97. //设置锁的状态为false
  98. this.locked = false;
  99. Optional.of(currentThread().getName() + " 释放锁.").ifPresent(System.out::println);
  100. //通知其他在wait set 中的线程, 可以再次尝试获取锁了, 亦可使用 this.notify();
  101. this.notifyAll();
  102. }
  103. }
  104. }
  105. @Override
  106. public List<Thread> getBlockedThreads() {
  107. return Collections.unmodifiableList(blockedList);
  108. }
  109. }

3. 使用BooleanLock
  1. 多个线程通过lock()方法争抢锁
  2. 可中断被阻塞的线程
  3. 阻塞的线程可超时 ```java package com.zoro.concurrent.chapter05.lock;

import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.IntStream;

import static java.lang.Thread.currentThread; import static java.util.concurrent.ThreadLocalRandom.current;

/**

  • lock-test *
  • @author yx.jiang
  • @date 2021/7/27 10:26 / public class BooleanLockTest { /*

    • 定义{@link BooleanLock} */ private final Lock lock = new BooleanLock();

      /**

    • 使用try..finally语句块确保lock每次都能被正确释放 */ public void syncMethod() { try {

      1. //加锁
      2. lock.lock();
      3. int randomInt = current().nextInt(10);
      4. System.out.println(currentThread() + "获得锁.");
      5. TimeUnit.SECONDS.sleep(randomInt);

      } catch (InterruptedException e) {

      1. e.printStackTrace();

      } finally {

      1. //释放锁
      2. lock.unLock();

      } }

      public void syncMethodTimeoutable() { try {

      1. //加锁
      2. lock.lock(1000);
      3. int randomInt = current().nextInt(10);
      4. System.out.println(currentThread() + "获得锁.");
      5. TimeUnit.SECONDS.sleep(randomInt);

      } catch (InterruptedException | TimeoutException e) {

      1. e.printStackTrace();

      } finally {

      1. //释放锁
      2. lock.unLock();

      } }

      public static void main(String[] args) { BooleanLockTest booleanLockTest = new BooleanLockTest(); // multiThreadGrapeTest(booleanLockTest); // interruptBlockedThreadTest(booleanLockTest); lockTimeoutReleaseTest(booleanLockTest); }

      /**

    • 阻塞线程可超时
    • 输出如下:
    • Thread[Thread-0,5,main]获得锁.
    • java.util.concurrent.TimeoutException: 经1000ms未能获取锁
    • at com.zoro.concurrent.chapter05.lock.BooleanLock.lock(BooleanLock.java:65)
    • at com.zoro.concurrent.chapter05.lock.BooleanLockTest.syncMethodTimeoutable(BooleanLockTest.java:43)
    • at java.lang.Thread.run(Thread.java:748)
    • Thread-0 释放锁. *
    • @param booleanLockTest 测试 */ private static void lockTimeoutReleaseTest(BooleanLockTest booleanLockTest) { try {

      1. new Thread(booleanLockTest::syncMethodTimeoutable).start();
      2. TimeUnit.MILLISECONDS.sleep(2);
      3. Thread t2 = new Thread(booleanLockTest::syncMethodTimeoutable, "T2");
      4. t2.start();
      5. TimeUnit.MILLISECONDS.sleep(10);

      } catch (InterruptedException e) {

      1. e.printStackTrace();

      } }

      /**

    • <2>可中断被阻塞的线程
    • 输出如下:
    • Thread[T1,5,main]获得锁.
    • java.lang.InterruptedException
    • at java.lang.Object.wait(Native Method)
    • at java.lang.Object.wait(Object.java:502)
    • at com.zoro.concurrent.chapter05.lock.BooleanLock.lock(BooleanLock.java:40)
    • at com.zoro.concurrent.chapter05.lock.BooleanLockTest.syncMethod(BooleanLockTest.java:27)
    • at java.lang.Thread.run(Thread.java:748) *
    • @param booleanLockTest 测试 */ private static void interruptBlockedThreadTest(BooleanLockTest booleanLockTest) { try {

      1. new Thread(booleanLockTest::syncMethod, "T1").start();
      2. TimeUnit.MILLISECONDS.sleep(2);
      3. Thread t2 = new Thread(booleanLockTest::syncMethod, "T2");
      4. t2.start();
      5. TimeUnit.MILLISECONDS.sleep(2);
      6. t2.interrupt();

      } catch (InterruptedException e) {

      1. e.printStackTrace();

      } }

      /**

    • <1>多线程通过lock()方法争抢锁
    • 输出如下:
    • Thread[Thread-0,5,main]获得锁.
    • Thread-0 释放锁.
    • Thread[Thread-9,5,main]获得锁.
    • Thread-9 释放锁.
    • Thread[Thread-1,5,main]获得锁.
    • Thread-1 释放锁.
    • Thread[Thread-8,5,main]获得锁.
    • Thread-8 释放锁.
    • Thread[Thread-2,5,main]获得锁.
    • Thread-2 释放锁.
    • Thread[Thread-7,5,main]获得锁.
    • Thread-7 释放锁.
    • Thread[Thread-3,5,main]获得锁.
    • Thread-3 释放锁.
    • Thread[Thread-6,5,main]获得锁.
    • Thread-6 释放锁.
    • Thread[Thread-5,5,main]获得锁.
    • Thread-5 释放锁.
    • Thread[Thread-4,5,main]获得锁.
    • Thread-4 释放锁. *
    • @param booleanLockTest 测试 */ private static void multiThreadGrapeTest(BooleanLockTest booleanLockTest) { IntStream.range(0, 10)
      1. .mapToObj(i -> new Thread(booleanLockTest::syncMethod))
      2. .forEach(Thread::start);
      } }

```