说明:本篇笔记详细的整理与归纳并发编程相关的内容,深入源码。
学习于公开课程-> 尚硅谷2022版JUC并发编程(对标阿里P6-P7)课程。
线程基础知识复习
创建线程的四种方式
源码观察 start 一个线程
Java 底层是 C++实现的。我们常说的new Thread(()->{}).start()
,其中的start()
方法源码如下:
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
可知其实更底层的方法是**_private native void _**start0();
在 OpenJDK 中的类与 JNI 本地调用一般是一一对应的,Thread.java 对应的就是 Thread.c 文件。start0
其实就是 JVM_StartThread。此时查看源代码可以看到在 jvm.h 中找到了声明,jvm.cpp 中有实现。
而在jvm.cpp
中的2883行附近有线程启动的实现方式:
对应的jvm.cpp
大概464行附近的代码:
可以看出,java 中创建的线程在底层是让操作系统来分配线程的。
Java 多线程相关概念
一把锁:
Synchronize,后面会具体介绍。
两个并:
- 并发(concurrent):在同一个实体上的多个事件,是在一台处理器上“同时”处理多个任务,也即同一时刻其实是只有一个事情在发生。
- 并行(parallel):实在不同实体上的多个事件,实在多台处理器上同时处理多个任务,也即同一时刻,不止一个事情在发生,互相独立互不影响。
并发 VS 并行:
三个程:
- 进程:简单地说,在系统中运行的一个应用程序就是进程,每一个进程都有它自己的内存空间和系统资源。
- 线程:也被称为轻量级进程,在同一个进程内会有1个或多个线程,是大多数操作系统进行时序调度的基本单元。
- 管程:Monitor(监视器),也就是平时我们多说的锁。
Monitor 其实就是一种同步机制,它的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
JVM 中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个 Monitor 对象。Monitor 对象会和 Java 对象一同创建并销毁,它底层是由 C++来实现的。
public static void main(String[] args) {
Object o = new Object();
new Thread(() -> {
synchronized (o) {
// some codes here ...
}
}, "t1").start();
}
用户线程和守护线程
用户线程(User Thread):是系统的工作线程,它会完成这个程序需要完成的业务操作。
守护线程(Daemon Thread):是一种特殊的线程为其它线程服务的,在后台默默完成一些系统性的服务,比如垃圾回收线程就是典型的守护线程。守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束,以为着程序需要完成的业务操作已经结束,系统可以退出了。所以当系统只剩下守护线程的时候,java 虚拟机会自动退出。
一般情况下,不做特别说明配置,默认都是守护线程。
怎么判断线程是否是守护线程呢?其实在 Thread.java
类中存在方法:
可知,使用setDaemon()
方法可以设置一个线程是否为守护线程。
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
System.out.println("我是:" + Thread.currentThread().getName());
while (true) {
}
}, Thread.currentThread().isDaemon() ? "守护线程" : "用户线程");
t.setDaemon(true); // 由于设置了守护线程,所以Main线程结束后,守护线程自动结束,而不是死循环
t.start();
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t 主线程---end");
}
总结:
如果用户线程全部结束意味着程序需要完成的业务操作已经结束了,守护线程随着 JVM 一同结束工作。setDaemon(true)
方法必须在start()
之前调用,否则抛出 java.lang.IllegalThreadStateException 异常。
CompletableFuture
Future 接口
Future 是 JDK1.5 新增的接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 Future 把这个任务放到异步线程中执行。主线程继续处理其他任务,过一会儿再通过 Future 获取计算结果。
最常见实现类 FutureTask
Future 接口(实现类为FutureTask
)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
特点:多线程 + 有返回值 + 异步任务
package vip.zhenzicheng.concurrent_program.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @author zhenzicheng
* @date 2022-06-03 17:31
*/
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("----------- come in call() ");
return "hello Callable!";
}
}
Future 编码实战的优缺点
优点:Future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
package vip.zhenzicheng.concurrent_program.thread;
import java.util.concurrent.*;
/**
* 异步任务 + 线程池
*
* @author zhenzicheng
* @date 2022-06-03 17:53
*/
public class FutureTaskThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long start = System.currentTimeMillis();
FutureTask<String> task1 = new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return "task1 over";
});
FutureTask<String> task2 = new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(400);
return "task2 over";
});
FutureTask<String> task3 = new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(300);
return "task3 over";
});
threadPool.submit(task1);
threadPool.submit(task2);
threadPool.submit(task3);
// task1.get();
// task2.get();
// task3.get();
long end = System.currentTimeMillis();
System.out.println("cost:" + (end - start) / 1000.0);
threadPool.shutdown();
}
}
缺点:
get()
方法很容易导致阻塞,一般建议放在主线程的后面,同步阻塞的等待返回结果,如果结果没有计算完毕则阻塞。
一种不优雅的解决方式是在调用get()
方法时指定两个参数,一个是最长等待时间,一个是时间单位。例如get(3, TimeUnit.SECONDS)
即最长等待3s,超出时间如果get()
方法的返回值还没有计算完毕,则抛出java.util.concurrent.TimeoutException
异常。
- 可以通过
isDone()
方法轮询查看是否已经处理完毕,但轮询的方式会无谓的消耗 CPU 资源,也不见得能及时的得到计算返回结果。如果想要异步获取结果,通常都会以轮询的方式获取结果,尽量不要阻塞,可这样的处理依旧很不优雅。
结论:Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
处理复杂任务
对于简单的业务场景使用 Future 完全 OK,但当我们想要有如下功能:
- 回调通知:应对 Future 的完成时间,完成了可以主动通知我,也即回调方式,这种方式远比不断的使用
isDone()
方法轮训来的优雅。不过回调很多也会变为“回调地狱”。 - 创建异步任务:Future + 线程池。
- 多个任务前后依赖可以组合处理:比如现在我们想要将多个异步任务的计算结果组合起来,后一个异步任务的计算结果依赖于前一个异步任务的值。将多个异步计算合成一个异步计算,这几个异步计算互相独立。
- 对计算速度选最快:当 Future 集合中某个任务最快结束时,返回结果,返回第一名处理结果
由上面的功能需求可以发现,Future 对于简单的异步任务需求还可以满足,但大多不够优雅且功能也不尽人意。这是就引入了 CompletableFuture 以声明式的方式优雅的处理这些需求,Future 能做的和不能做的,CompletableFuture 都能做。
CompletableFuture 对 Future 的改进
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗不必要的 CPU 资源,因此 JDK1.8 中提出了 CompletableFuture。
从源码入手,查看CompletableFuture.java
的类关系图,可以发现它实现了两个接口,Future 在上面已经提到了,另一个接口是对原有功能的增强。
:::success
CompletionStage:代表异步计算过程中的某一阶段,一个阶段完成以后可能会触发另一个阶段,有些类似 Linux 系统的管道分隔符传参数。
:::
- CompletionStage 代表异步计算过程中的某一阶段,一个阶段完成以后可能会触发另一个阶段。
- 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。
比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println());
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。 :::success CompletableFuture :::
在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编排的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的 Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了 Future 和 CompletionStage 接口。
核心的四个静态方法创建线程
runAsync 无返回值
| public static _CompletableFuturerunAsync(Runnable _runnable ) | | —- | | public static _CompletableFuturerunAsync(Runnable _runnable , Executor executor) |
public static void noReturnFunc() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t --------come in");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(future.get());
}
supplyAsync 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) |
---|
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) |
public static void returnFunc() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello supplyAsync";
});
System.out.println(completableFuture.get());
}
关于 Executor executor 参数说明
当使用没有指定 Executor 的方法,会默认使用 ForkJoinPool.commonPool()
作为它的线程池执行异步代码。
如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码。
通用 Code 演示,减少阻塞和轮询
从 Java8开始引入了 CompletableFuture,它是 Future 的增强版,减少阻塞和轮询。
可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
package vip.zhenzicheng.concurrent_program.thread.sgg;
import java.util.concurrent.*;
/**
* @author zhenzicheng
* @date 2022-06-10 15:36
*/
public class CompletableFutureUseDemo {
static ExecutorService threadPool = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (result % 2 == 0) {
int i = 10 / 0;
}
System.out.println("----1s后出结果:" + result);
return result;
}, threadPool).whenComplete((result, exception) -> {
if (exception == null) {
System.out.println("----计算完成,更新变量:" + result);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
// 主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立即关闭,类似于守护线程。
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
threadPool.shutdown(); // 会将剩余任务执行完后优雅关闭,不接受新任务
}
}
电商网站的比价需求
主流的函数式编程
函数式编程包含了:Lambda + Stream + Chain + Java8函数式编程等。
:::success
Runnable:无输入无输出
:::
:::success
Function
代码实现
需求:实现一个比价需求,同一款产品搜索出同款产品在各大电商平台的售价,并且搜索出同一个产品在同一个电商平台下,各个入驻卖家售价是多少。
输出返回:出来结果希望是同款产品在不同地方的价格清单列表,返回List<String>
package vip.zhenzicheng.concurrent_program.thread.sgg;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 电商比价demo
*
* @author zhenzicheng
* @date 2022-06-10 19:18
*/
public class CompletableFutureMallDemo {
static List<NetMall> list = new ArrayList<>();
static {
list.addAll(Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("taobao"),
new NetMall("pdd"),
new NetMall("taote"),
new NetMall("meituandianshagn"),
new NetMall("huaweishagncheng")
));
}
public static List<String> getPrice(List<NetMall> list, String productName) {
return list.stream()
.map(netMall -> String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
return list.stream()
.map(netMall ->
CompletableFuture.supplyAsync(
() -> String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
}
public static void main(String[] args) {
// 常规方法
long s = System.currentTimeMillis();
// getPrice(list, "IPhone 13 Pro Max");
getPriceByCompletableFuture(list, "IPhone 13 Pro Max");
long e = System.currentTimeMillis();
System.out.println("----costTime:" + (e - s) + "毫秒");
}
}
@AllArgsConstructor
@Data
class NetMall {
private String netMallName;
public double calcPrice(String productName) {
try {
TimeUnit.MILLISECONDS.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
CompletableFuture 常用方法
1. 获得结果和触发计算
获取结果 | public T **get()** |
阻塞直到等到返回值,并需要显示声明抛出异常。 |
---|---|---|
public T **get(long timeout, TimeUnit unit)** |
等待指定时间,超时抛出异常。 | |
public T **join()** |
类似于get() ,区别是不需要显示声明抛出异常,但当运行时出现异常会抛出。 |
|
public T **getNow(T valueIfAbsent)** |
立即获取计算结果不阻塞,如果没有计算完成则返回指定的 valueIfAbsent,如果计算完成正常返回结果。 | |
主动触发计算 | public boolean complete(T value) | 是否打断get() ,即如果没有计算完成就打断并返回给定的 value,如果已经计算完成直接获取计算结果。 |
2. 对计算结果进行处理
thenApply(Function fn) |
对计算结果存在依赖关系,这两个线程串行化。当出现异常时不能继续执行串行后的内容。 |
---|---|
handle(BiFunction fn) |
与 thenApply 方法类似,不同的是传入参数变为两个,一个是上一步的结果一个是异常。可知即使调用多次handle 方法的串行过程中出现了异常,则当前出现问题的步骤不会继续执行,但是整体串行后的方法还是会继续执行。可以理解为被try-catch 。 |
package vip.zhenzicheng.concurrent_program.thread.sgg;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author zhenzicheng
* @date 2022-06-21 19:12
*/
public class CompletableFutureAPI2Demo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1;
}, threadPool).handle((f, e) -> {
int i = 10 / 0;
System.out.println("222");
return f + 2;
}).handle((f, e) -> {
System.out.println("333");
return f + 3;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算结果: " + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "----主线程正在执行其他任务");
threadPool.shutdown();
}
}
异常处理:exceptionally
相当于 try-catch
,而使用handle
+whenComplete
可以实现类似于try-finally
的功能。
3. 对计算结果进行消费
thenAccept()
方法对结果进行消费,接收任务的处理结果,消费后无返回值。
关于任务顺序执行的对比补充:
CompletableFuture 和线程池说明:
- 没有传入自定义线程池,都使用默认 ForkJoinPool;
- 当执行第一个任务时,传入了一个自定义线程池
- 调用
thenRun
方法执行第二个任务时,则第二个任务和第一个任务是公用一个线程池。 - 调用
thenRunAsync
执行第二个任务时,则第一个任务使用的是自定义传入的线程池,第二个任务使用的是 ForkJoin 线程池。
- 调用
- 有可能处理太快,根据系统优化切换原则,直接使用 main 线程处理。
当然其它例如:thenAccept
、thenAcceptAysnc
、thenApply
、thenApplyAsync
等,它们之间的区别同上。
源码分析 JDK11:
可知当调用thenRunAsync
后会使用默认的线程池,即只要 CPU 核心数大于1(支持并行)就会使用 ForkJoinPool。
4. 对计算速度进行选用
方法applyToEither
会返回计算更快结束的任务。
package vip.zhenzicheng.concurrent_program.thread.sgg;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @author zhenzicheng
* @date 2022-06-21 20:04
*/
public class CompletableFutureFastDemo {
public static void main(String[] args) {
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playA";
});
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playB";
});
CompletableFuture<String> result = futureA.applyToEither(futureB, f -> f + " is winner");
System.out.println(Thread.currentThread().getName() + "\t----" + result.join());
}
}
5. 对计算结果进行合并
两个 CompletionStage 任务都完成后,最终将两个任务的结果一起交给thenCombine
来处理,先完成的等待,等待其它分支任务结束。
package vip.zhenzicheng.concurrent_program.thread.sgg;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* 合并结果
*
* @author zhenzicheng
* @date 2022-06-21 20:19
*/
public class CompletableFutureCombineDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result = future1.thenCombine(future2, (result1, result2) -> {
System.out.println("----开始合并");
return result1 + result2;
});
System.out.println(result.join());
}
}
Java锁
乐观锁与悲观锁
悲观锁
认为自己在使用数据的时候一定有其它线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被其它线程修改。 synchronized 关键字与 Lock 的实现都是悲观锁
- 适用于写多的场景,先加锁可以保证写操作时数据正确
-
乐观锁
认为自己在使用数据时不会有其它线程修改数据或资源,不加锁。 在 Java 中通过使用无锁编程来实现,只是在更新的时候去判断,之前有没有别的线程更新了这个数据。 如果没有被更新则将自己修改的数据写入 如果已经被其它线程更新,则根据不同的实现方式执行不同的操作,比如放弃修改、重试争抢锁等等。
判断规则
- 版本号机制 Version
- CAS 算法,Java原子类中的递增操作就是通过 CAS 自旋实现
适用场景
读多场景,不加锁的特点能够使其读操作性能大幅提升 ```java public class SimpleLockDemo {
ReentrantLock lock = new ReentrantLock(); private AtomicInteger atomicInteger = new AtomicInteger();
// 保证多个线程使用同一个lock对象前提下的悲观锁 public void m2() { lock.lock(); try {
// 业务逻辑
} finally {
lock.unlock();
} }
// 悲观锁 public synchronized void m1() { // 业务逻辑 }
// 乐观锁调用方式 public void m3() { atomicInteger.incrementAndGet(); }
}
<a name="s02jB"></a>
## 8种情况演示锁运行案例
```java
public class Lock8Demo {
/**
* 加锁核心:线程操作资源类
* <pre>
* 1. 标准t1,t2两个线程访问,先打印发邮件
* 2. sendEmail增加sleep,还是先发邮件,sleep不会释放锁
* 3. 对于普通hello方法(无锁),先打印hello
* 4. 有两个资源类,分别调用两个资源类的两个不同加锁方法,先打印sendSMS,因为该方法不等待
* 5. 将sendEmail、sendSMS都改为静态同步方法,调用同一个资源类的两个加锁方法,还是先打印sendEmail
* 6. 在5基础上,分别调用两个资源类的不同的同步静态方法,还是先打印sendEmail
* 7. 一个静态同步方法,一个普通同步方法,同一个资源类,还是先打印sendSMS
* 8. 在7基础上,有两个资源类,还是先打印sendSMS
* <br/>
* 总结:
* 1-2 -> 一个对象里面如果有多个synchronized方法,某一个时刻一个同步方法被一个线程访问后就是对象锁,将整个对象(资源类)中所有同步方法都独占,但是普通方法可以访问。
* 也即某一个时刻只能有唯一线程去访问synchronized方法,锁当前this对象。
* 3-4 -> 普通方法与同步锁无关,换成两个对象后锁的是不同对象互不影响
* 5-6 -> 1) 对于普通同步方法,锁的是this当前实例对象,所有同步方法都是用同一把锁->this
* 2) 对于静态同步方法,锁的是当前Class对象,如Phone.class
* 3) 对于同步方法块,锁的是synchronized括号内的对象
* 例如:Object o = new Object();
* synchronized (o) {}
* 7-8 -> 当一个线程试图访问同步代码时必须先得到所,正常退出或者抛出异常时必须释放锁
* 所有的对象锁都是锁的this对象,就是new出来的具体对象,所有类锁锁的是Class对象
*/
public static void main(String[] args) {
Phone phone = new Phone(); // 创建资源类
Phone phone2 = new Phone();
new Thread(() -> {
phone.sendEmail();
}, "t1").start();
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
// phone.sendSMS();
// phone.hello();
phone2.sendSMS();
// phone2.sendEmail();
}, "t2").start();
}
}
// 资源类
class Phone {
public static synchronized void sendEmail() {
System.out.println("线程[" + Thread.currentThread().getName() + "] is coming");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----sendEmail");
}
public synchronized void sendSMS() {
System.out.println("----sendSMS");
}
// 普通方法
public void hello() {
System.out.println("----hello");
}
}
锁到底锁的是什么?
对象锁:如果当前对象被锁住,那么该对象中的同步方法同一时刻都只能有一个线程访问,但是普通方法可以被正常访问
类锁:锁的是 Class,该类的所有实例,同一时间只能有一个线程访问静态同步方法,普通方法和普通同步方法不受影响。
synchronized三种应用方式
- JDK 源码中的
notify()
使用synchronized加锁的3种方式
-c 对代码进行反编译(compile)
- -v(verbose)输出附加信息(包括行号、本地变量表,反汇编等详细信息)
synchronized 同步代码块
使用javap -c
反编译
发现管程进入了一次却退出了两次,是因为保证就算同步代码出现异常在 return 前也能够释放锁。
synchronized 底层是使用了monitorenter和monitorexit指令,一般情况下是一个 enter 两个 exit,如果自己在同步代码块手动抛出异常则只退出一次。synchronized普通同步方法
使用javap -v **.class
文件反编译
调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标识是否被设置。如果设置了,执行线程会先持有 monitor 锁,然后再执行方法,最后在方法完成(无论是正常完成还是异常退出)时释放 monitorsynchronized 静态同步方法
使用javap -v **.class
文件反编译
通过设置 ACC_STATIC、ACC_SYNCHRONIZED 两个访问标志位共同区分是否静态同步方法反编译 synchronized 锁的是什么?
为什么任何一个对象都可以成为一把锁?
什么是管程 monitor
管程(Monitors,也称为监视器)是一种程序结构,结构内的多个子程序(对象或模块)形成的多个工作线程互斥访问共享资源。这些共享资源一般是硬件设备或变量。对共享变量能够进行的所有操作集中在一个模块中。(把信号量及其操作原语“封装”在一个对象内部)管程实现了在一个时间点最多只有一个线程在执行管程的某个子程序。进程可以调用管程来实现进程级别的并发控制。
C++之 objectMonitor.hpp
在 HotSpot 虚拟机中,monitor 采用 ObjectMointor 实现
在 Java 中对应到 C++中代码文件:ObjectMonitor.java -> ObjectMonitor.cpp -> objectMonitor.hpp
在 Java 中 Object 类是所有对象的父类,而底层 JVM 中 Object 对应的类继承了 ObjectMonitor,所以每个对象都可以是一把锁。
核心参数说明
分析源码后可知,每个对象天生带有一个对象监视器(ObjectMonitor),每一个被锁住的对象都会和 Monitor 关联起来,也即任何一个对象都可以成为一把锁。获取锁在源码中的过程
指针指向 monitor 对象(也称为管程或监视器锁)的起始地址。每个对象都存在一个 monitor 与之关联,当一个 monitor 被某个线程持有后,它便处于锁定状态。在 Java 虚拟机(Hotspot)中,monitor 是由 ObjectMonitor 实现的,其主要数据结构如下(位于 Hotspot 虚拟机源码 ObjectMonitor.cpp 文件,C++实现)
_owner | 记录了当前是哪个线程所持有,指向了持有 ObjectMonitor 对象的线程 |
---|---|
_WaitSet | 存放处于 wait 状态的线程队列(无顺序) |
_EntryList | 存放处于等待锁(block)状态的线程队列 |
_recursions | 锁的重入次数 |
_count | 用来记录该线程获取锁的次数 |
公平和非公平锁
什么是公平/非公平锁?
公平锁
是指多个线程按照申请锁的顺序来获取锁,这里类似于排队买票,先来的人先买后来的人后买公平。Lock lock = new ReentrantLock(true); // 表示公平锁
非公平锁
是指多个线程获取锁的顺序并不是按照申请的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并发环境下,有可能造成优先级反转或饥饿(某个线程一直得不到锁)状态。Lock lock = new ReentrantLock(); // 默认是非公平锁
或Lock lock = new ReentrantLock(false);
为什么要有公平/非公平的设计?为什么默认是非公平?
恢复挂起的线程到真正锁的获取在 CPU 的角度看来时间差很明显。所以非公平锁能更充分的利用 CPU 的时间片,尽量减少 CPU 空闲状态时间。
使用多线程重点要考虑线程切换的开销,当采用非公平锁时,当1个线程请求锁获取同步状态再到释放同步状态后,刚释放锁的线程在此刻再次获取同步状态的概率变得非常大,所以减少了线程的开销。
应用场景?
如果为了更高的吞吐量,很显然公平锁是比较合适的,节省了很多线程之间切换时间。否则应该用公平锁,公平进行资源持有。
ReentrantLock 之构造方法
可以发现,默认创建的 ReentrantLock 底层就是用了非公平锁来实现的。并且出现了一个非常重要的类 AbstractQueuedSynchronizer。更多内容后面 AQS 章节会专门记录。
可重入锁(递归锁)
可重入锁:是指同一个线程在外层方法获取锁的时候,在进入线程内层的方法会自动获取锁(前提是锁对象是同一个),不会因为之前还没释放而阻塞。 Java 中 ReentrantLock 和 synchronized 都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
可重入锁种类
隐式锁(synchronized 关键字使用的锁):默认是可重入锁指的是可重复可递归调用的锁,在外层使用锁后内层仍然可以使用并且不会发生死锁。
显示锁(Lock):也有 ReentrantLock 这样的可重入锁。手动 Lock,所以需要加了几次锁就要释放几次锁,如果未释放锁在多线程会导致阻塞等待甚至死锁,单线程不影响。
synchronized 锁重入的实现机理
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。 当执行 monitorenter 时,如果目标锁对象的计数器为0,说明没有被其它线程所持有,Java 虚拟机将会该锁对象的持有线程设置为当前线程,并将其计数器+1。 在目标锁对象的计数器不为0的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机将计数器+1,否则需要等待直至有线程释放该锁。 当执行 monitorexit 时,Java 虚拟机则需将锁对象计数器-1。计数器为0代表锁已被释放。
原理在上面演示锁运行案例/管程C++源码中 ObjectMonitor.hpp 分析过
死锁及排查
什么是死锁?
指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因为争夺有限的资源而陷入死锁。
产生死锁主要原因
jps
/jps -l
打印java进程号(pid)和名称 -l打印时带上完整包名)jstack <pid>
类似于异常中打印堆栈信息,更加完整
图形化
- jconsole:连接要排查程序,在线程中点击排查死锁
写锁(独占锁)/ 读锁(共享锁)
自旋锁 SpinLock
无锁 -> 独占锁 -> 读写锁 -> 邮戳锁
无锁 -> 偏向锁 -> 轻量级锁 -> 重量锁
LockSupport 与线程中断
线程中断机制
什么是线程中断机制?
一个线程不应该由其它线程来强制中断或停止,而是应该由线程自己自行停止。所以 Thread.stop,Thread.suspend,Thread.resume 都已经被废弃。
在 Java 中没有办法立即停止一个线程,然而停止线程却显得尤为重要,比如要取消一个耗时操作。因此在 Java 中提供了一种用于停止线程的协商机制——中断,即中断标识协商机制。
中断只是一种协商机制,Java 没有给中断增加任何语法,中断的过程完全需要程序员自行实现。若要中断一个线程,需要手动调用线程的interrupt()
方法,该方法也仅仅是将线程对象的中断标识设置为 true。然后通过自己实现一段代码不断地检测当前线程的标志位,如果为 true,表示别的线程请求这条线程中断,这部分逻辑由我们自行实现。
每个线程对象都有一个中断标识位,用于表示线程是否被中断;通过调用对象的interrupt()
方法将该线程标识位设为 true,可以在其它线程中调用也可以在自己的线程中调用。
中断 API 三大方法
void interrupt(); | 中断此线程。 除非当前线程正在中断(始终允许), 否则将调用此线程的 checkAccess) 方法,这可能会导致抛出 SecurityException 。 如果该线程阻塞的调用 wait()) , wait(long)) ,或 wait(long, int)) 的方法 Object 类,或的 join()) , join(long)) , join(long, int)) , sleep(long)) ,或sleep(long, int)) ,这个类的方法,那么它的中断状态将被清除,并且将收到 InterruptedException 。 如果在 InterruptibleChannel上 的 I / O 操作中阻止该线程,则通道将关闭,线程的中断状态将被设置,线程将收到 ClosedByInterruptException 。 如果该线程在 Selector中 被阻塞,则线程的中断状态将被设置,它将立即从选择操作返回,可能具有非零值,就像调用选择器的 wakeup) 方法一样。 如果以前的条件都不成立,则将设置该线程的中断状态。 中断不活动的线程不会产生任何影响。 |
---|---|
static boolean interrupted(); | 静态方法,Thread.interrupted();判断线程是否被中断并清除当前中断状态。 做了两件事: 1. 返回当前线程的中断状态,测试当前线程是否已被中断。 1. 将当前线程的中断状态清零并重新设置为 false,即清除线程的中断中断状态 |
boolean isInterrupted(); | 实例方法,判断当前线程是否被中断(通过检查中断标志位) |
中断机制面试题3道
如何停止中断运行线程?
- 通过 volatile 变量实现
- 通过 AtomicBoolean
- 通过 Thread 自带中断 API 实例方法 ```java package vip.zhenzicheng.concurrent_program.thread.sgg.interrupt;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;
/**
- 线程中断demo *
- @author zhenzicheng
@date 2022-06-27 17:58 */ public class InterruptDemo {
static volatile boolean isStop = false; static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
public static void main(String[] args) { // m1_volatile(); // m2_atomicBoolean(); // m3_api(); }
private static void m3_api() { Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + "\t isInterrupted() 被修改为 true,程序停止");
break;
}
System.out.println("t1 ----hello interrupt api");
}
}, “t1”); t1.start();
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// t2希望t1停止 // new Thread(() -> { // t1.interrupt(); // }, “t2”).start();
// t1 自己要停止 t1.interrupt(); }
private static void m2_atomicBoolean() { new Thread(() -> {
while (true) {
if (atomicBoolean.get()) {
System.out.println(Thread.currentThread().getName() + "\t atomicBoolean 被修改为 true,程序停止");
break;
}
System.out.println("t1 ----hello atomicBoolean");
}
}, “t1”).start();
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
atomicBoolean.set(true);
}, “t2”).start(); }
private static void m1_volatile() { new Thread(() -> {
while (true) {
if (isStop) {
System.out.println(Thread.currentThread().getName() + "\t isStop 被修改为 true,程序停止");
break;
}
System.out.println("t1 ----hello volatile");
}
}, “t1”).start();
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
isStop = true;
interrupt() 源码
interrupt()
会在调用interrupt0()
方法来设置标志位,源码可知是 native 方法。
isInterrupted() 源码
同样的,通过调用了 native 方法,并传递了一个 false 标识(标识不清除中断状态)来获取线程是否被中断。
当前线程的中断标识为 true,是不是线程就立刻停止?
不会立刻停止。虽然被标识为中断,但是线程还是执行到结束。
package vip.zhenzicheng.concurrent_program.thread.sgg.interrupt;
import java.util.concurrent.TimeUnit;
/**
* @author zhenzicheng
* @date 2022-06-27 19:26
*/
public class InterruptDemo2 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 300; i++) {
System.out.println("----:" + i);
}
}, "t1");
t1.start();
System.out.println("t1线程默认中断标识:" + Thread.currentThread().isInterrupted());
try {
TimeUnit.MILLISECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt(); // true
System.out.println("t1线程调用 interrupt()后的中断标识:" + t1.isInterrupted());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1线程调用 interrupt()后的中断标识:" + t1.isInterrupted());
}
}
第二个demo中演示了关于interrupt()
方法更深入的细节,调用阻塞中线程导致的 InterruptException 后清除标识为导致死循环。解决办法就是在 catch 块中再次将线程标识为中断状态。
package vip.zhenzicheng.concurrent_program.thread.sgg.interrupt;
import java.util.concurrent.TimeUnit;
/**
* @author zhenzicheng
* @date 2022-06-27 19:46
*/
public class InterruptDemo3 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + "\t 中断标志位:" + Thread.currentThread().isInterrupted());
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
System.out.println("---- hello InterruptDemo3");
}
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
t1.interrupt();
}, "t2").start();
}
}
关于 Thread.interrupted() 谈谈理解
上文提到的2步操作,第一步返回中断状态,第二步清除中断状态。
与isInterrupted()
方法有什么区别呢?
由源码可知,其实底层调用的是同一个方法,只不过参数不同,静态方法会清空状态标志位而成员方法则不会。
LockSupport
java.util.concurrent.locks.LockSupport——用于创建锁和其他同步类的基本线程阻塞原语。
该类与使用它的每个线程关联一个许可证(在 Semaphore 类的意义上)。 如果许可证可用,将立即返回park ,并在此过程中消费; 否则可能会阻止。 如果尚未提供许可,则使用 unpark 获得许可。 (与 Semaphores 不同,许可证不会累积最多只有一个)可靠的使用需要使用 volatile(或原子)变量来控制何时停放或取消停放。 对于易失性变量访问保持对这些方法的调用的顺序,但不一定是非易失性变量访问。
方法 park 和 unpark 提供了阻止和解除阻塞线程的有效方法,这些线程没有遇到导致不推荐使用的方法 Thread.suspend 和 Thread.resume 无法用于此类目的的问题:一个线程调用 park 和另一个线程尝试 unpark 将保留活跃性,由于许可证。 此外,如果调用者的线程被中断,则会返回 park ,并且支持超时版本。 park 方法也可以在任何其他时间返回,“无理由”,因此通常必须在返回时重新检查条件的循环内调用。 在这个意义上, park 可以作为“忙碌等待”的优化,不会浪费太多时间旋转,但必须与 unpark 配对才能生效。
三种形式的 park 每个也支持 blocker 对象参数。 在线程被阻塞时记录此对象,以允许监视和诊断工具识别线程被阻止的原因。 (此类工具可以使用方法 getBlocker(Thread)) 访问阻止程序) 。)强烈建议使用这些表单而不是没有此参数的原始表单。 在锁实现中作为 blocker 提供的正常参数是 this 。
LockSupport 中最重要的两个方法park()
和unpark()
的作用分别是阻塞线程和解除阻塞线程。
| 阻塞 | LockSupport.park()
| 除非许可证可用,否则禁用当前线程以进行线程调度。
如果许可证可用,那么它被消耗并且呼叫立即返回; 否则当前线程因线程调度而被禁用,并且在发生以下三种情况之一之前处于休眠状态:
- 其他一些线程调用 unpark) ,当前线程作为目标; 要么
- 其他一些线程 interrupts) 当前线程; 要么
- 虚假的呼叫(即无缘无故)返回。
这种方法不报告是哪个线程导致该方法返回。 呼叫者应该首先重新检查导致线程停放的条件。 例如,呼叫者还可以确定返回时线程的中断状态。
底层源码:
|
| —- | —- | —- |
| | LockSupport.park(Object blocker)
| 方法含义与上面完全相同,只不过 blocker 代表负责此线程停放的同步对象。
底层源码:
依然是利用了 Unsafe.park()
|
| 唤醒 | LockSupport.unpark(Thread thread)
| 如果给定线程尚不可用,则为其提供许可。
如果该线程在 park 上被阻止,则它将解除阻止。 否则,它的下一次调用 park 保证不会阻止。
如果尚未启动给定线程,则不保证此操作完全没有任何效果。
如果参数为 null,则此操作无效。 |
线程等待唤醒机制
线程唤醒的3种方法
- 使用 Object 中的
wait()
方法让线程等待,使用 Object 中的notify()
方法或notifyAll()
方法唤醒线程。 - 使用 JUC 中 Condition 的
await()
方法让线程等待,使用signal()
方法唤醒线程 LockSupport 类可以阻塞当前线程以及唤醒指定被阻塞的线程。
Object -> wait() 与 notify()
使用 Object 的
wait()
与notify()
方法时需要注意,这两种方法都需要先持有某个对象的锁。/*使用同步代码块原生方式*/
private static void syncWaitNotify() {
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println(Thread.currentThread().getName() + "\t ----come in");
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t ----被唤醒");
}
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (o) {
o.notify();
System.out.println(Thread.currentThread().getName() + "\t ----发出通知");
}
}, "t2").start();
}
如果将
wait()
和notify()
上的同步代码块去掉后,直接抛出 IllegalMonitorStateException
如果先notify()
后wait()
可知线程会永远阻塞在那里直到被唤醒。Condition -> await() 与 signal()
与上面
wait()
与notify()
类似,都需要先进行lock.lock()
方法先加锁,否则调用await()
或signal()
时也会抛出 IllegalMonitorStateException 异常。
还有需要先阻塞再唤醒,否则一直阻塞。/*使用Lock与Condition*/
private static void lockAwaitSignal() {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t ----come in");
condition.await();
System.out.println(Thread.currentThread().getName() + "\t ----被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
lock.lock();
try {
condition.signal();
System.out.println(Thread.currentThread().getName() + "\t ----发出通知");
} finally {
lock.unlock();
}
}, "t2").start();
}
LockSupoort -> park() 与 unpark()
private static void lockSupportParkUnpark() {
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3); // 先阻塞,让t2先unpark
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + System.currentTimeMillis());
LockSupport.park();
}, "t1");
t1.start();
new Thread(() -> {
LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName() + "\t" + System.currentTimeMillis() + "----unpark over");
}, "t2").start();
}
由代码可知,LockSupport 支持不在锁代码块里执行
park()
与unpark()
方法,并且可以任意顺序,即使先unpark()
后依旧可以通过park()
,类似于先给许可证等待验证的时候直接通过。
总结:
- 支持不在锁代码块中执行
park()
与unpark()
方法 - 可以先唤醒(给与通行证)再阻塞(检查通行证),不用一定按照先加锁再解锁顺序。
- 最多只有1个通行证,不会累积。