1 基础理论
1.1 进程、线程、管程
- 进程:系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源
- 线程:也被称为轻量级进程,在同一个进程内基本会有1一个或多个线程,是大多数操作系统进行调度的基本单元。
管程:Monitor(监视器),也就是我们平时说的锁。Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
1.2 用户线程与守护线程
用户线程:是系统的工作线程,它会完成这个程序需要完成的业务操作。
- 守护线程:是一种特殊的线程,为其他线程服务的,在后台默默地完成一些系统性的服务,比如垃圾回收线程。
用户线程是默认的,守护线程是伴随用户线程生成的,如果用户线程结束,那么守护线程也会结束掉。
Java中可以手动设置守护线程:(设置要在start之前)
Thread a = new Thread(() -> {
}, "a");
// 将a设置为守护线程
a.setDaemon(true);
a.start();
2 基本关系
Future 定义了线程基本的使用方法。cancel``isCancel``get``isDone
Rannable 是一个函数式接口,只提供一个抽象的run()
方法。
RunnableFuture继承了上述两个接口,RunnableFuture的run()
方法没有返回值。
Callable接口提供的call()
方法有返回值,会抛异常。
FutureTask是RunnableFuture的具体实现,同时在FutureTask采用了构造注入的方式,在构造方法中引入了Callable,支持了有返回的线程创建。
Thread 接受 Rannable 来创建线程。
public class JucTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> a = new FutureTask<>(new MyThread());
Thread t1 = new Thread(a, "a");
t1.start();
System.out.println(a.get());
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
return "callable";
}
}
3 FutureTask
FutureTask实现了RunnableFuture,同时也支持有返回。
get()
方法会造成阻塞。- 如果想监控线程完成,可以轮询
isDone()
方法。但是同时会造成CPU资源损耗。
简而言之,FutureTask获取线程运行结果的方式不够优雅。
4 CompletableFuture
Java8引入的CompletableFuture,他是Future的增强版,减少了阻塞和轮询。可以传入回调对象,当异步任务完成或发生异常时,自动调用回调对象的回调放方法。
4.1 四个静态方法
无返回 Executor可以指定线程池,没有Executor使用默认线程池ForkJoinPoolpublic static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
有返回public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
// 自定义线程池,默认采用ForkJoinPool时,CompletableFuture被视为守护线程,主线程结束,子线程也会被停掉。
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture.supplyAsync(()->{
// 业务逻辑
int i = ThreadLocalRandom.current().nextInt(10);
return i;
}, threadPool).whenComplete((v, e) -> {
// 执行完成后的业务
}).exceptionally(e -> {
// 抛出异常后的处理
e.printStackTrace();
return null;
});
} catch (Exception e) {
e.printStackTrace();
} finally {
// 最后关闭线程池
threadPool.shutdown();
}
4.2 实战代码
比价:
public class ComparePrice {
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("tm"),
new NetMall("dd")
);
// 串行执行
public static List<String> getPrice(List<NetMall> list, String productName) {
return list.stream()
.map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calculatePrice(productName)))
.collect(Collectors.toList());
}
// 使用CompletableFuture并行执行
public static List<String> getPriceCompletableFuture(List<NetMall> list, String productName) {
return list.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calculatePrice(productName))))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args) {
long l = System.currentTimeMillis();
List<String> mysql = getPriceCompletableFuture(list, "mysql");
for (String a : mysql) {
System.out.println(a);
}
System.out.println("耗时: " + (System.currentTimeMillis() - l));
}
}
class NetMall{
private String netMallName;
public String getNetMallName() {
return netMallName;
}
public NetMall(String netMallName){
this.netMallName = netMallName;
}
public double calculatePrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
4.3 常用方法
- 获得结果和触发计算
T get()
T get(long timeout, TimeUnit unit)
:超过时间后不要了T join()
:不需要抛异常T getNow(T valueIfAbsent)
:如果拿到就用,拿不到用默认值valueIfAbsentboolean complete(T value)
:类似于getNow。返回是否打断的标志。如果没有执行完,打断线程返回true,并赋值为value
- 对计算结果进行处理
A执行完,执行B,B继续对结果进行加工处理
thenApply(Function<? super T,? extends U> fn)
: 有异常停止handle(BiFunction<? super T, Throwable, ? extends U> fn)
:有异常可以继续运行- 对计算结果进行消费
thenRun(Runnable action)
:A执行完,执行B,B不需要A的结果thenAccept(Consumer<? super T> action)
:A执行完,执行B,B需要A的结果,B无返回值- 对计算速度进行选用
<U> CompletableFuture<U> **applyToEither**(CompletionStage<? extends T> other, Function<? super T, U> fn)
- 对计算结果进行合并
<U,V> CompletableFuture<V> **thenCombine**(<br /> CompletionStage<? extends U> other,<br /> BiFunction<? super T,? super U,? extends V> fn)
4.4 扩展
Java8的函数式接口:
- Runnable —>
void run();
- Function —>
R apply(T t);
- Consumer —>
void accept(T t)
(BiConsumer —>void accept(T t, U u);
) - Supply —>
T get()
5 synchronized
静态同步方法加的是类锁
普通同步方法锁的是对象上所有带synchronized的方法
静态同步方法和普通同步方法之间没有竞争。
从JVM层面解释的话可以这么说:
类加载器加载class文件,初始化生成一个class在方法区,只有一份。静态同步方法(static synchronized
)锁的是这个Class对象,而class实例化的对象存储在堆中,可以有多个,普通同步方法所的是一个个实例对象。
5.1 反编译观察
反编译字节码命令 :javap -c [xxx].class
/javap -v [xxx].class
5.1.1 同步代码块
- 进入同步代码块之前,会先有一步monitor enter。
- 执行完,会进行monitor exit。
- 为防止死锁,还会有一次异常执行的monitor exit。
所以,一般情况下,monitor enter对应两个monitor exit。在代码中手动处理异常时,一个monitor enter可以对应一个monitor exit。
5.1.2 普通同步方法
- 同步方法会有一个flag值,ACC_SYNCHRONIZED来标记这个是一个同步方法。
- 如果设置了,线程执行前,先持有monitor锁,在执行方法。
-
5.1.3 静态同步方法
静态同步方法比普通同步方法在flag上多了一个ACC_STATIC。其他是一样的。
5.2 底层分析
monitor采用ObjectMonitor实现。
ObjectMonitor.cpp中的ObjectMonitor.hpp结构中有一个owner字段,指向了当前monitor的持有线程。
因为Java所有的类继承了Object,所以,所有的对象都可以获取锁。6 锁
6.1公平锁与非公平锁
6.2 可重入锁
每个锁对象都有一个锁计数器_count,一个指向持有该锁线程的指针_woner,该线程获取锁的次数_recursions。
当执行monitorenter时,如果锁目标对象的计数器为0,那么说明它没有被其他线程所持有,Java虚拟机会将该锁持有的线程设置为当前线程,并且将其计数器加1。
在目标锁对象的计数器不为0时,如果锁的持有线程还是当前线程,那么Java虚拟机就将计数器加1,否则不是该线程则需要等待,直至持有线程释放该锁。
当执行monitorexit时,Java虚拟机则将锁的对象计数器减1,计数器为0则代表锁被释放。6.2.1 隐式锁synchronized
6.2.2 显式锁Lock
6.3 死锁
死锁排查
jps -l
查看进程信息-
7 LockSupport与线程中断
7.1 中断
7.1.1 三个中断方法
中断是一种协商机制。
三个中断方法: public void interrupt()
- 实例方法,仅仅是设置线程的中断状态为true,发起一个协商不会立刻停止线程。
- 如果当前线程处于被阻塞状态,那么别的线程调用当前线程对象的
interrupt()
方法时,会抛出InterruptException异常。 - 在JDK8中如果中断的是不活动的线程,则不会设为true。(也就是说没有影响)
public static boolean interrupted()
判断线程是否被中断,并且清除当前中断状态。
- 返回当前线程的中断状态,测试当前前程是否已经被中断。
- 将当前线程的中断状态清零,并重新设置为false。
public boolean isInterrupted()
7.1.2 其他的中断方法
使用volatile
添加关键字volatile,使用变量可见。
static volatile boolean isStop = false;
public static void main(String[] args) {
new Thread(() -> {
while (true) {
if (isStop) {
System.out.println(Thread.currentThread().getName() + "\t 程序停止");
break;
}
System.out.println("hello volatile");
}
}, "t1").start();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch(InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
isStop = true;
}, "t2").start();
}
输出:
......
hello volatile
hello volatile
hello volatile
hello volatile
t1 程序停止
Atomic原子类
static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
public static void main(String[] args) {
new Thread(() -> {
while (true) {
if (atomicBoolean.get()) {
System.out.println(Thread.currentThread().getName() + "\t 程序停止");
break;
}
System.out.println("hello volatile");
}
}, "t1").start();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch(InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
atomicBoolean.set(true);
}, "t2").start();
}