1 基础理论

1.1 进程、线程、管程

  • 进程:系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源
  • 线程:也被称为轻量级进程,在同一个进程内基本会有1一个或多个线程,是大多数操作系统进行调度的基本单元。
  • 管程:Monitor(监视器),也就是我们平时说的。Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。

    1.2 用户线程与守护线程

  • 用户线程:是系统的工作线程,它会完成这个程序需要完成的业务操作。

  • 守护线程:是一种特殊的线程,为其他线程服务的,在后台默默地完成一些系统性的服务,比如垃圾回收线程。

用户线程是默认的,守护线程是伴随用户线程生成的,如果用户线程结束,那么守护线程也会结束掉。
Java中可以手动设置守护线程:(设置要在start之前)

  1. Thread a = new Thread(() -> {
  2. }, "a");
  3. // 将a设置为守护线程
  4. a.setDaemon(true);
  5. a.start();

2 基本关系

Future 定义了线程基本的使用方法。cancel``isCancel``get``isDone
Rannable 是一个函数式接口,只提供一个抽象的run()方法。
RunnableFuture继承了上述两个接口,RunnableFuture的run()方法没有返回值。
Callable接口提供的call()方法有返回值,会抛异常。
FutureTask是RunnableFuture的具体实现,同时在FutureTask采用了构造注入的方式,在构造方法中引入了Callable,支持了有返回的线程创建。
image.png
Thread 接受 Rannable 来创建线程。

  1. public class JucTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<String> a = new FutureTask<>(new MyThread());
  4. Thread t1 = new Thread(a, "a");
  5. t1.start();
  6. System.out.println(a.get());
  7. }
  8. }
  9. class MyThread implements Callable<String> {
  10. @Override
  11. public String call() throws Exception {
  12. return "callable";
  13. }
  14. }

3 FutureTask

FutureTask实现了RunnableFuture,同时也支持有返回。

  1. get()方法会造成阻塞。
  2. 如果想监控线程完成,可以轮询isDone()方法。但是同时会造成CPU资源损耗。

简而言之,FutureTask获取线程运行结果的方式不够优雅。

4 CompletableFuture

Java8引入的CompletableFuture,他是Future的增强版,减少了阻塞和轮询。可以传入回调对象,当异步任务完成或发生异常时,自动调用回调对象的回调放方法。
image.png

4.1 四个静态方法

无返回 Executor可以指定线程池,没有Executor使用默认线程池ForkJoinPool
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
有返回
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

  1. // 自定义线程池,默认采用ForkJoinPool时,CompletableFuture被视为守护线程,主线程结束,子线程也会被停掉。
  2. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  3. try {
  4. CompletableFuture.supplyAsync(()->{
  5. // 业务逻辑
  6. int i = ThreadLocalRandom.current().nextInt(10);
  7. return i;
  8. }, threadPool).whenComplete((v, e) -> {
  9. // 执行完成后的业务
  10. }).exceptionally(e -> {
  11. // 抛出异常后的处理
  12. e.printStackTrace();
  13. return null;
  14. });
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. } finally {
  18. // 最后关闭线程池
  19. threadPool.shutdown();
  20. }

4.2 实战代码

比价:

  1. public class ComparePrice {
  2. static List<NetMall> list = Arrays.asList(
  3. new NetMall("jd"),
  4. new NetMall("tm"),
  5. new NetMall("dd")
  6. );
  7. // 串行执行
  8. public static List<String> getPrice(List<NetMall> list, String productName) {
  9. return list.stream()
  10. .map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calculatePrice(productName)))
  11. .collect(Collectors.toList());
  12. }
  13. // 使用CompletableFuture并行执行
  14. public static List<String> getPriceCompletableFuture(List<NetMall> list, String productName) {
  15. return list.stream()
  16. .map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calculatePrice(productName))))
  17. .collect(Collectors.toList())
  18. .stream()
  19. .map(CompletableFuture::join)
  20. .collect(Collectors.toList());
  21. }
  22. public static void main(String[] args) {
  23. long l = System.currentTimeMillis();
  24. List<String> mysql = getPriceCompletableFuture(list, "mysql");
  25. for (String a : mysql) {
  26. System.out.println(a);
  27. }
  28. System.out.println("耗时: " + (System.currentTimeMillis() - l));
  29. }
  30. }
  31. class NetMall{
  32. private String netMallName;
  33. public String getNetMallName() {
  34. return netMallName;
  35. }
  36. public NetMall(String netMallName){
  37. this.netMallName = netMallName;
  38. }
  39. public double calculatePrice(String productName) {
  40. try {
  41. TimeUnit.SECONDS.sleep(1);
  42. } catch(InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
  46. }
  47. }

4.3 常用方法

  1. 获得结果和触发计算
    1. T get()
    2. T get(long timeout, TimeUnit unit):超过时间后不要了
    3. T join():不需要抛异常
    4. T getNow(T valueIfAbsent):如果拿到就用,拿不到用默认值valueIfAbsent
    5. boolean complete(T value):类似于getNow。返回是否打断的标志。如果没有执行完,打断线程返回true,并赋值为value
  2. 对计算结果进行处理

A执行完,执行B,B继续对结果进行加工处理

  1. thenApply(Function<? super T,? extends U> fn): 有异常停止
  2. handle(BiFunction<? super T, Throwable, ? extends U> fn):有异常可以继续运行
    1. 对计算结果进行消费
  3. thenRun(Runnable action):A执行完,执行B,B不需要A的结果
  4. thenAccept(Consumer<? super T> action):A执行完,执行B,B需要A的结果,B无返回值
    1. 对计算速度进行选用
  5. <U> CompletableFuture<U> **applyToEither**(CompletionStage<? extends T> other, Function<? super T, U> fn)
    1. 对计算结果进行合并
  6. <U,V> CompletableFuture<V> **thenCombine**(<br /> CompletionStage<? extends U> other,<br /> BiFunction<? super T,? super U,? extends V> fn)

    4.4 扩展

    Java8的函数式接口:
  1. Runnable —> void run();
  2. Function —>R apply(T t);
  3. Consumer —> void accept(T t)(BiConsumer —> void accept(T t, U u);
  4. Supply —> T get()

    5 synchronized

    静态同步方法加的是类锁
    普通同步方法锁的是对象上所有带synchronized的方法
    静态同步方法和普通同步方法之间没有竞争。

从JVM层面解释的话可以这么说:
类加载器加载class文件,初始化生成一个class在方法区,只有一份。静态同步方法(static synchronized)锁的是这个Class对象,而class实例化的对象存储在堆中,可以有多个,普通同步方法所的是一个个实例对象。

5.1 反编译观察

反编译字节码命令 :javap -c [xxx].class/javap -v [xxx].class

5.1.1 同步代码块

  1. 进入同步代码块之前,会先有一步monitor enter。
  2. 执行完,会进行monitor exit。
  3. 为防止死锁,还会有一次异常执行的monitor exit。

所以,一般情况下,monitor enter对应两个monitor exit。在代码中手动处理异常时,一个monitor enter可以对应一个monitor exit。

5.1.2 普通同步方法

  1. 同步方法会有一个flag值,ACC_SYNCHRONIZED来标记这个是一个同步方法。
  2. 如果设置了,线程执行前,先持有monitor锁,在执行方法。
  3. 方法完成后,不论是否正常执行,都会释放monitor。

    5.1.3 静态同步方法

    静态同步方法比普通同步方法在flag上多了一个ACC_STATIC。其他是一样的。

    5.2 底层分析

    monitor采用ObjectMonitor实现。
    ObjectMonitor.cpp中的ObjectMonitor.hpp结构中有一个owner字段,指向了当前monitor的持有线程。
    因为Java所有的类继承了Object,所以,所有的对象都可以获取锁。

    6 锁

    6.1公平锁与非公平锁

    非公平锁 减少cpu上下文切换的损耗。

    6.2 可重入锁

    每个锁对象都有一个锁计数器_count,一个指向持有该锁线程的指针_woner,该线程获取锁的次数_recursions。
    当执行monitorenter时,如果锁目标对象的计数器为0,那么说明它没有被其他线程所持有,Java虚拟机会将该锁持有的线程设置为当前线程,并且将其计数器加1。
    在目标锁对象的计数器不为0时,如果锁的持有线程还是当前线程,那么Java虚拟机就将计数器加1,否则不是该线程则需要等待,直至持有线程释放该锁。
    当执行monitorexit时,Java虚拟机则将锁的对象计数器减1,计数器为0则代表锁被释放。

    6.2.1 隐式锁synchronized

    6.2.2 显式锁Lock

    lock必须跟着unlock,否则造成死锁。

    6.3 死锁

    死锁排查

  4. jps -l查看进程信息

  5. jstack [进程号]

    7 LockSupport与线程中断

    7.1 中断

    7.1.1 三个中断方法

    中断是一种协商机制。
    三个中断方法:

  6. public void interrupt()

    1. 实例方法,仅仅是设置线程的中断状态为true,发起一个协商不会立刻停止线程。
    2. 如果当前线程处于被阻塞状态,那么别的线程调用当前线程对象的interrupt()方法时,会抛出InterruptException异常。
    3. 在JDK8中如果中断的是不活动的线程,则不会设为true。(也就是说没有影响)
  7. public static boolean interrupted()

判断线程是否被中断,并且清除当前中断状态。

  1. 返回当前线程的中断状态,测试当前前程是否已经被中断。
  2. 将当前线程的中断状态清零,并重新设置为false。
    1. public boolean isInterrupted()

实例方法,判断当前线程是否被中断。

7.1.2 其他的中断方法

使用volatile

添加关键字volatile,使用变量可见。

  1. static volatile boolean isStop = false;
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. while (true) {
  5. if (isStop) {
  6. System.out.println(Thread.currentThread().getName() + "\t 程序停止");
  7. break;
  8. }
  9. System.out.println("hello volatile");
  10. }
  11. }, "t1").start();
  12. try {
  13. TimeUnit.MILLISECONDS.sleep(20);
  14. } catch(InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. new Thread(() -> {
  18. isStop = true;
  19. }, "t2").start();
  20. }

输出:

  1. ......
  2. hello volatile
  3. hello volatile
  4. hello volatile
  5. hello volatile
  6. t1 程序停止

Atomic原子类

  1. static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. while (true) {
  5. if (atomicBoolean.get()) {
  6. System.out.println(Thread.currentThread().getName() + "\t 程序停止");
  7. break;
  8. }
  9. System.out.println("hello volatile");
  10. }
  11. }, "t1").start();
  12. try {
  13. TimeUnit.MILLISECONDS.sleep(20);
  14. } catch(InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. new Thread(() -> {
  18. atomicBoolean.set(true);
  19. }, "t2").start();
  20. }