1 传统的多线程通信
synchronized的通信锁机制
synchronized+lockObj.wait()+lockObj.notify()
概览
lockObj.wait()方法
在当前线程中调用方法:lockObj.wait()会使当前线程进入等待(某对象)状态 ,令当前线程挂起并放弃CPU、同步资源并等待,使别的线程可访问并修改共享资源,而当前线程排队等候其他线程调用notify()或notifyAll()方法唤醒,唤醒后等待重新获得对监视器的所有权后才能继续执行。
调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁),调用此方法后,当前线程将释放对象监控权 ,然后进入等待,在当前线程被notify后,要重新获得监控权,然后从断点处继续代码的执行。
lockObj.notify()/notifyAll()方法
在当前线程中调用方法: 共享资源.notify()。
功能:唤醒等待该对象监控权的一个/所有线程。 调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁)。本线程如果还有代码要执行,则会等本线程交出同步监视器后再唤醒其他线程。The awakened thread will not be able to proceed until the current thread relinquishes the lock on this object.
notify():唤醒正在排队等待同步资源的线程中优先级最高者结束等待。
notifyAll():唤醒正在排队等待资源的所有线程结束等待。
wait()、notify()、notifyAll()这三个方法只有在synchronized方法或synchronized代码块中才能使用,否则会报java.lang.IllegalMonitorStateException
异常。因为这三个方法必须有锁对象调用,而任意对象都可以作为synchronized的同步锁,因此这三个方法只能在Object类中声明。
代码
class Ecoco10ApplicationTests {
private Object object = new Object();
@Test
void contextLoads() throws InterruptedException {
Thread thread = new Thread() {
@SneakyThrows
@Override
public void run() {
synchronized (object) {
System.out.println("线程1" + System.currentTimeMillis());
int i = 0;
while (i <= 10) {
if (i == 2)
object.wait();//不能使用this.wait();而是同步资源对象调用wait()方法以阻塞在其身上的线程。
System.out.println("线程A" + i);
i++;
}
object.notify();//通知唤醒其他线程:在当前线程释放了同步监视器之后。
}
}
};
Thread thread2 = new Thread() {
@SneakyThrows
@Override
public void run() {
synchronized (object) {
int j = 0;
while (j <= 10) {
System.out.println(" 线程B" + j);
if (j == 5)
object.notify();//不会立马就会唤醒其他线程,而是会继续执行本线程,直到交出同步监视器。
if (j == 8)
object.wait();
j++;
}
object.notify();
}
}
};
thread.start();
thread2.start();
Thread.sleep(50000);
}
}
//打印输出
线程A0
线程A1
线程B0
线程B1
线程B2
线程B3
线程B4
线程B5
线程B6
线程B7
线程B8
线程A2
线程A3
线程A4
线程A5
线程A6
线程A7
线程A8
线程A9
线程A10
线程B9
线程B10
使用Lock的通信锁机制
lock+ await+ signal来实现。可参考lock一节或生产者和消费者问题一节
线程中断机制
中断机制一些概念
- 一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。所以,
Thread.stop、Thread.suspend、Thread. resume都已经被废弃了。 - 在Java中没有办法立即停止一条线程,然而停止线程却显得尤为重要,如取消一个耗时操作。因此,Java提供了一种用于停止线程的机制—中断协商机制。中断只是一种协商机制,Java没有给中断增加任何语法,中断的过程完全需要程序员自己实现。
- 若要中断一个线程,我们需要自己手动调用该线程的
interrupt()
方法,该方法也仅仅是将线程对象的中断标识设为true。需要时刻谨记的:中断只是一种协同机制,调用interrupt()和interrupted()
修改的只是中断标识位而已,而不是立即stop打断。 - 每个线程对象中都有一个标识,用于标识线程是否被中断;该标识位为true表示中断,为false表示未中断;通过调用线程对象的
interrupt()
方法将线程的标识位设为true;该方法可以在别的线程中调用,也可以在自己的线程中调用。
参考文档:什么是中断
三种方式实现优雅的线程中断
- 通过一个volatile变量实现
- 通过AtomicBoolean
- 通过Thread类自带的中断Interrupt机制AP方法实现,即
isInterrupted()和interrupt()
方法 ```java package com.fly.ecoco10;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;
public class InterruptDemo { static volatile boolean isStop = false; static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
public static void main(String[] args) {
useByVolatile();//通过一个volatile变量实现
useByAtomicBoolean();//通过AtomicBoolean
useByInterrupt();//通过Interrupt中断机制
}
/**
* 方式一:通过一个volatile变量实现
*/
public static void useByVolatile() {
new Thread(() -> {
while (true) {
if (isStop) {
System.out.println("-----isStop = true,程序结束。");
break;
}
System.out.println("------hello isStop");
}
}, "t1").start();
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
isStop = true;
}, "t2").start();
}
/**
* 方式二:通过AtomicBoolean
*/
public static void useByAtomicBoolean() {
new Thread(() -> {
while (true) {
if (atomicBoolean.get()) {
System.out.println("-----atomicBoolean.get() = true,程序结束。");
break;
}
System.out.println("------hello atomicBoolean");
}
}, "t1").start();
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
atomicBoolean.set(true);
}, "t2").start();
}
/**
* 方式三:通过Interrupt中断机制来实现
*/
public static void useByInterrupt() {
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("-----isInterrupted() = true,程序结束。");
break;
}
System.out.println("------hello Interrupt");
}
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
t1.interrupt();//修改t1线程的中断标志位为true
}, "t2").start();
}
}
**关于Interrupt中断机制的方法说明**
> **实例方法:void interrupt( )**
- `interrupt( )`仅仅是设置线程的中断状态未true,不会停止线程,真正决定是否终止线程必须由线程内部自己决定。
- 着重说明:线程如果因为调用了wait()、join()、sleep()方法而被阻塞时候,此时调用了实例的interupt()方法,这时候就会抛出InterruptedException,并会将中断标识位清空(即设置为false)。解决:在catch里面再调用一次interrupt()方法以将中断状态置true。
```java
public class JUCTest {
public static void main(String[] args) {
Thread thread = new Thread() {
@Override
public void run() {
super.run();
while (true) {
//这里不会中断,因为线程处于Sleep的时候调用了interrupt,此时会报错,并将中转状态置为空,即默认的不中断,所以if判断失败
if (Thread.currentThread().isInterrupted()) {
System.out.println("中断输出");
break;
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
}
}
};
thread.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.interrupt();
}
}
//演示说明,会一直输出,而不会停止
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.efly.gulimall.gulimallproduct.JUCTest$1.run(JUCTest.java:20)
111
111
111
111
111
//解决方案
public class JUCTest {
public static void main(String[] args) {
Thread thread = new Thread() {
@Override
public void run() {
super.run();
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("中断输出");
break;
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
//只需在这里加一句即可
Thread.currentThread().interrupt();
e.printStackTrace();
}
System.out.println("111");
}
}
};
thread.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.interrupt();
}
}
//结果演示
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.efly.gulimall.gulimallproduct.JUCTest$1.run(JUCTest.java:22)
111
中断输出
实例方法:boolean isInterrupted( )
判断当前线程是否被中断(通过检查中断标识位) 实例方法。
静态方法:static boolean interrupted( )
判断线程是否被中断,并清除当前中断状态,这个方法做了两件事:
- 返回当前线程的中断状态
- 将当前线程的中断状态设为false,类似于取消中断标识位 ```java System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted()); System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted()); System.out.println(“111111”); Thread.currentThread().interrupt();///——false—-> true System.out.println(“222222”); System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted()); System.out.println(Thread.currentThread().getName()+”—-“+Thread.interrupted());
//输出结果 main—-false//先输出中断标识位即默认值为false,并将中断标识为设置为false。类似于取消中断 main—-false// 111111 222222 main—-true//实例方法:将中断标识位改为true main—-false//静态方法:将中断标识位改为false
<a name="gYEDv"></a>
# 2 LockSupport
> **概述说明**
参考文档:[LockSupport的使用](https://blog.csdn.net/TZ845195485/article/details/118404659)
1. 通过`park()`和`unpark(thread)`方法来实现阻塞当前线程和唤醒指定线程的操作。
1. LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。
1. 归根结底,LockSupport调用的Unsafe中的native代码。
1. 官方对于LockSupport的描述及核心两个方法的源码解读
```java
Basic thread blocking primitives for creating locks and other synchronization classes.This class associates, with each thread that uses it, a permit (in the sense of the Semaphore class). A call to park will return immediately if the permit is available, consuming it in the process; otherwise it may block. A call to unpark makes the permit available, if it was not already available. (Unlike with Semaphores though, permits do not accumulate. There is at most one.)
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语,LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),permit只有两个值1和零,默认是零.可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
////源码解读////
//java.util.concurrent.locks.LockSupport类
package java.util.concurrent.locks;
import sun.misc.Unsafe;
public class LockSupport {
public static void park() {
UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
}
//sun.misc.Unsafe类
package sun.misc;
public final class Unsafe {
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
}
使用LockSupport解决的痛点
- LockSupport不用持有锁块,不用加锁,程序性能好
- 先后顺序:不容易导致卡死(因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞)
使用:park()和unpark(threadObj)
park():先判断permit是否为1,如果是的话则不阻塞,直接继续执行,并将permit置为0。否则一直阻塞,直到被唤醒(其他线程调用了unpark)或被中断。由于permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时, park方法才会被唤醒然后会将permit再次设置为0并返回。If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one ofthree things happens:Some other thread invokes unpark with the current thread as the target; or Some other thread interrupts the current thread; or The call spuriously (that is, for no
reason) returns.
小案例
package com.fly.ecoco10;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo {
public static void main(String[] args) {
System.out.println("before当前时间:" + System.currentTimeMillis());
LockSupport.unpark(Thread.currentThread());//如果先调用unpark则会将permit置为1,此时再调用park()就不会阻塞了
LockSupport.park();//判断permit是否O,如果是的话会阻塞直到别的线程将该线程的permit置为1,然后消费permit并解除阻塞从而继续执行
System.out.println("after当前时间:" + System.currentTimeMillis());
}
}
//效果
before当前时间:1644562613283
after当前时间:1644562613284
unpark(threadObj):唤醒处于阻塞状态的指定线程,调用unpark(threadObj)
方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,pemit值还是1),其会自动唤醒theadObj线程,即之前阻塞中的LockSupport.park()方法会立即返回。
小案例
public class LockSupportDemo {
public static void main(String[] args) throws InterruptedException{
//线程1
Thread thread1 = new Thread(){
@SneakyThrows
@Override
public void run() {
System.out.println("进入线程1,当前时间:" + System.currentTimeMillis());
LockSupport.park();
System.out.println("线程1第一次被unpark,当前时间:" + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(20);//休眠20毫秒
LockSupport.park();
System.out.println("线程1第二次被unpark了,当前时间:" + System.currentTimeMillis());
}
};
thread1.start();
TimeUnit.MILLISECONDS.sleep(20);//主线程休眠20毫秒
//线程2
Thread thread2 = new Thread() {
@SneakyThrows
@Override
public void run() {
super.run();
//调用unpark(threadObj)方法后,就会将threadObj线程的许可permit设置成1。threadObj从而自动解除锁定并消耗一个permit
//注意多次调用unpark方法,不会累加,permit值还是1。
LockSupport.unpark(thread1);
TimeUnit.MILLISECONDS.sleep(10);//休眠10毫秒
LockSupport.unpark(thread1);
}
};
thread2.start();
}
}
//结果
进入线程1,当前时间:1644563471703
线程1第一次被unpark,当前时间:1644563471723
线程1第二次被unpark了,当前时间:1644563471743
图解说明
面试题
Q:为什么可以先唤醒线程后阻塞线程?
A:因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
Q:为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?
A:因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行。
3 阻塞队列BlockingQueue
概念
原有的问题
在同步机制基础板块我们已经看到了形成Java并发程序设计基础的底层构建块。然而,对于实际编程来说,应该尽可能远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便得多、要安全得多。
为什么要用阻塞队列
对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。 使用队列,可以安全的从一个线程向另一个线程传递数据 。 例如, 考虑银行转账程序, 转账线程将转账指令对象插入一个队列中 ,而不是直接访问银行对象。 另一个线程从队列中取出指令执行转账 。 只有该线程可以访问该银行对象的内部。 因此不需要同步(当然, 线程安全的队列类的实现者不能不考虑锁和条件, 但是 , 那是他们的问题而不是我们使用者的问题 )。在生产者-消费者的设计模式中,生产者线程向队列插人元素,消费者线程则取出它们。
简而言之:阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮我们一手包办了。而在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
参考文档:阻塞队列,另对于阻塞队列的使用可以和RabbitMQ结合学习。
使用
BlockingQueue接口及其实现类
java.util.concurrent.BlockingQueue
阻塞队列是属于一个接口,底下有七个实现类,其中着重要掌握的是ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue。
- ArrayBlockQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列(有界,但是界限非常大,相当于无界,可以当成无界使用)
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列(生产一个,消费一个,不存储元素,不消费不生产)
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
BlockingQueue的具体方法
阻塞队列方法分为3 类,其取决于当队列满或空时它们的响应方式。
- 如果将队列当作线程管理工具来使用即阻塞与唤醒, 将要用到put和take方法。
- 当试图向满的队列中添加或从空的队列中移出元素时,add 、remove和element操作抛出异常 。
- 当然 , 在一个多线程程序中 , 队列会在任何时候空或满 , 因此, 一定要使用offer 、 poll和peek方法作为替代。 这些方法如果不能完成任务, 只是给出一个错误提示而不会抛出异常 。
| 方法分类 | 方法类型 | | | 说明 |
| —- | —- | —- | —- | —- |
| | 插入 | 移除 | 检查 | |
| 安全推荐组 | offer(e) | poll() | peak() | 我们使用offer(e)方法,添加元素时候,如果阻塞队列满了后,会返回false,否者返回true
同时在取poll()的时候,如果队列已空,那么会返回null
peak():拿第一个元素但不移除,如果为空,则返回null | | 超时退出组 | offer(e,time,unit) | poll(time,unit) | | 当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出。
使用offer插入的时候,可以指定时间,如果2秒还没有插入,那么就放弃插入,同理poll方法 | | 阻塞唤醒组
| put(e) | take() | / | 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 | | 抛出异常组 | add(e) | remove() | element() | 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full
当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException
element():Retrieves, but does not remove, the head of this queue,如果队列为空,则会抛异常:NoSuchElementException |
offer和poll方法的使用案例,通用该案例我们可以显而易见的发现,在使用阻塞队列后,在多线程通信里面,阻塞队列封装好了一套解决方案。
package com.fly.ecoco10;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo3 {
static ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
public static void main(String[] args) {
new Thread() {
@Override
public void run() {
super.run();
try {
System.out.println("进入:" + System.currentTimeMillis());
//1秒=1000毫秒(MILLISECONDS);1毫秒=1000微秒(MICROSECONDS);1微秒=1000纳秒(NANOSECONDS)
//System.currentTimeMillis()统计的是milliseconds,即毫秒。
Object object = arrayBlockingQueue.poll(2, TimeUnit.SECONDS);//等待2秒,如果还没有元素则自动解除阻塞
System.out.println("进入:" + System.currentTimeMillis());
System.out.println("队列头节点:" + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
//效果
进入:1644479566544
结束:1644479568544
队列头节点:null
//我们针对上面的方法做下改造:加一个子线程往队列里面添加数据
public class BlockingQueueDemo3 {
static ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
public static void main(String[] args) {
new Thread() {
@Override
public void run() {
super.run();
try {
System.out.println(System.currentTimeMillis());
//1秒=1000毫秒(MILLISECONDS);1毫秒=1000微秒(MICROSECONDS);1微秒=1000纳秒(NANOSECONDS)
//System.currentTimeMillis()统计的是milliseconds,即毫秒。
//等待200毫秒,如果还没有元素则
Object object = arrayBlockingQueue.poll(2, TimeUnit.SECONDS);//间隔2秒钟
System.out.println(System.currentTimeMillis());
System.out.println("队列头节点:" + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
//主线程休息10毫妙确保上面的线程先进入并阻塞
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
//往队列里面添加数据
arrayBlockingQueue.offer(1,2,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
//改造后的数据效果:已经拿到了其他线程插入队列的值(间隔在45毫秒)
进入:1644479786442
结束:1644479786487
队列头节点:1
SynchronousQueue类的使用
SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储的BlockingQueue,每一个put操作必须等待一个take操作,否者不能继续添加元素。
下面我们测试SynchronousQueue添加元素的过程:一个生产者线程,一个消费者线程,我们从最后的运行结果可以看出,每次生产者线程向阻塞队列添加元素后就会阻塞等待消费者线程,消费着线程消费后就会处于挂起状态,等待生产者在生产数据,从而周而复始,形成 一存一取的状态。
package com.fly.ecoco10;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
static BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
public static void main(String[] args) {
//生产的线程分别put了A、B、C这三个字段
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put A 生产时间:" + System.currentTimeMillis());
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + " put B 生产时间:" + System.currentTimeMillis());
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + " put C 生产时间:" + System.currentTimeMillis());
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "prodThread").start();
//消费线程使用take,消费阻塞队列中的内容,并且每次消费前,都等待5秒
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + " take A 消费时间:" + System.currentTimeMillis());
System.out.println("");
TimeUnit.MILLISECONDS.sleep(10);
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + " take B 消费时间:" + System.currentTimeMillis());
System.out.println("");
TimeUnit.MILLISECONDS.sleep(10);
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + " take B 消费时间:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "conThread").start();
}
}
//运行结果
prodThread put A 生产时间:1644484873150
conThread take A 消费时间:1644484873160
prodThread put B 生产时间:1644484873160
conThread take B 消费时间:1644484873170
prodThread put C 生产时间:1644484873170
conThread take B 消费时间:1644484873180
//我们从最后的运行结果可以看出,每次生产者线程向阻塞队列添加元素后就会阻塞等待消费者线程,消费着线程消费后就会处于挂起状态,等待生产者在生产数据。
//从而周而复始,形成 一存一取的状态。
应用:生产者消费者
4 CompletableFuture异步编排
概述
Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。
作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
核心概述CompleteableFuture(参考文档:线程池加异步编排)
需求:主线程需要拿到子线程的执行结果(回调)、多个线程间的串行和并行。
1、使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
2、从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
3、CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。
4、多个线程可以组合可以串行、并行调用。
使用:创建异步对象(runAsyn/supplyAsyn)
CompletableFuture 提供了四个静态方法来创建一个异步操作。
/*
无Executor:没有指定Executor,则会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
有Executor:则使用指定的线程池运行。
runAsync:方法不支持返回值。
supplyAsync:方法支持返回值。
*/
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
计算完成时的回调方法
包括:whenComplete/exceptionally/handle
whenComplete/exceptionally
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。注:方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
主要是下面的方法:
/*
whenComplete可以处理正常和异常的计算结果,
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务,串行执行
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行,异步执行
BiConsumer<? super T,? super Throwable>可以定义处理业务
*/
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,?
super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,?
super Throwable> action,
Executor executor);
//exceptionally处理异常情况
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
案例
@Autowired
private ThreadPoolExecutor executor;
@RequestMapping("/update")
public void update() throws Throwable {
CompletableFuture future = CompletableFuture.supplyAsync((Supplier<Object>) () -> {
//打印当前的线程名
System.out.println(Thread.currentThread().getName());
try {
//线程先休眠3秒中
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//supplyAsync可以有返回值
return 1024;
}, executor).whenCompleteAsync((o, throwable) -> {
//拿到上面线程执行的返回值
System.out.println("-------o=" + o.toString());
//打印输出线程名(如果使用的是Async则线程名可能不一致)
System.out.println(Thread.currentThread().getName());
//System.out.println("-------throwable=" + throwable);
}, executor).exceptionally(throwable -> {
//exceptionally:捕获异常
System.out.println("throwable=" + throwable);
return 6666;
});
//非阻塞方法
System.out.println("Main");
//阻塞方法
System.out.println(future.get());
}
执行结果
handle
handle 是执行任务完成时对结果的处理。handle 是在任务完成后再执行,还可以处理异常的任务。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,
Executor executor);
案例
@RequestMapping("/update2")
public void update2() throws InterruptedException, ExecutionException {
CompletableFuture future = CompletableFuture.supplyAsync((Supplier<Object>) () -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
}, executor).handleAsync(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object o, Throwable throwable) {
//拿到返回值和异常
System.out.println(Thread.currentThread().getName());
return o;
}
@Override
public <V> BiFunction<Object, Throwable, V> andThen(Function<? super Object, ? extends V> after) {
System.out.println(Thread.currentThread().getName());
return null;
}
},executor);
}
handle 方法和whenComplete方法的区别
总体上,whenComplete方法和handle方法是类似的。
区别主要在于接收的参数:whenComplete接收的是BiConsumer,handler接收的是BiFunction。
顾名思义,BiConsumer是直接消费的,而BiFunction是有返回值的,
BiConsumer没有返回值,而BiFunction是有的;
线程串行化方法
包括:thenApply/thenAccept/thenRun。
注:带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
/*
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
*/
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
Executor executor)
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
/*
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果
*/
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
/*
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,
只是处理完任务后,执行thenRun的后续操作
*/
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
代码演示
@RequestMapping("/update3")
public void update3() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName() + "\t completableFuture");
//int i = 10 / 0;
return 1024;
}
}).thenApply(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer o) {
System.out.println("thenApply方法,上次返回结果:" + o);
return o * 2;
}
}).whenComplete(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer o, Throwable throwable) {
System.out.println("-------o=" + o);
System.out.println("-------throwable=" + throwable);
}
}).exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer integer, Throwable throwable) {
System.out.println("handle o=" + integer);
System.out.println("handle throwable=" + throwable);
return 8888;
}
});
System.out.println(future.get());
}
线程并行化方法
包括:both/either/allof/anyof
并行两任务必须都完成(both/combine)
两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
案例
@RequestMapping("/update4")
public void update4() {
CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getId() +"&&"+ System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, executor).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getId() +"&&"+ System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}, executor), (t, u) -> {
return t + u;
}).whenComplete((t, u) -> {
System.out.println(t);
System.out.println(Thread.currentThread().getId() +"&&"+ System.currentTimeMillis());
});
}
//注意:thenCombine是并行运行,其结果可知,supplyAsync和thenCombine的参数1是同时开始的,
whenComplete是会等前面的CompletableFuture运行完成后再运行
98&&1629795033202
99&&1629795033202
helloworld
98&&1629795038202
打印结果:hello world! CompletableFuture
并行两任务其中一个完成即可(either)
当两个任务中,任意一个future任务完成的时候,执行任务
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
案例
@RequestMapping("/update3")
public void update3() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).applyToEither(CompletableFuture.completedFuture("World"), (t) -> {
return t;
}).whenComplete((t, u) -> {
System.out.println(t);
});
}
//两个任务谁先处理完谁先执行either方法,最后输出结果
World
并行多任务全部或其一
包括方法:allOf/anyOf
//allOf:等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
//anyOf:只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
案例
@RequestMapping("/update3")
public void update3() throws ExecutionException, InterruptedException {
//1.0 待执行的任务列表
CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getId() + "&&" + System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> stringCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getId() + "&&" + System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture<String> stringCompletableFuture3 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getId() + "&&" + System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "nice";
});
List<CompletableFuture<String>> completableFutures = Arrays.asList(stringCompletableFuture1, stringCompletableFuture2, stringCompletableFuture3);
final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[]{}));
//轮询得到返回值
allCompleted.thenRun(() -> {
completableFutures.stream().forEach(future -> {
try {
System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
});
}
结果:
54&&1629796351570
48&&1629796351570
55&&1629796351570
get future at:1629796356570, result:hello
get future at:1629796356570, result:world
get future at:1629796356570, result:nice
一些注意点
- 在使用异步编排CompletableFuture对象设置值要注意数据是否是在ThreadLocal里,如果是在主线程的ThreadLocal,则不能直接使用,要先读值,再赋值。
//1.0 主线程
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
//2.0 子线程:把旧RequestAttributes放到新线程的RequestContextHolder中,该RequestContextHolder对象数据即在当前Thread下
RequestContextHolder.setRequestAttributes(attributes);
// 远程查询所有的收获地址列表
List<MemberAddressVo> address;
}, threadPoolExecutor);
整合案例与总结
串行化案例
需求:三个CompletableFuture,第一个CompletableFuture根据手机号查询Id,第二个CompletableFuture根据Id查询校区,第三个CompletableFuture根据校区查询该校区排名前10报名的学生。串行化代码如下:
@RequestMapping("/update5")
public void update5() {
//1.0 第一个任务
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return search1();
}
}, executor);
//2.0 第二个任务//不能这样定义,因为如果这样的话future2会自动创建并执行
//CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(new Supplier<Object>() {
// @Override
// public Object get() {
// return search2();
// }
//}, executor);
//2.0 第二个任务
String returnValue = future1.thenApplyAsync(new Function<Integer, String>() {
@Override
public String apply(Integer userId) {
return search2(userId);
}
}, executor).exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
return null;
}
}).get();
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis()+"&&"+returnValue);
}
private Integer search1() {
try {
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
//查询时间默认5秒钟
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return 1;
}
}
private String search2(Integer userId) {
try {
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
//查询时间默认5秒钟
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return "gz";
}
}
结果:
pool-2-thread-1&&1629799019179
//子线程:时隔5秒拿到上一个线程的执行返回值
pool-2-thread-2&&1629799024180
//主线程:时隔5秒后拿到上一个线程的执行返回值
http-nio-8003-exec-1&&1629799029180&&gz
需要注意的一点:要得到子线程的返回值,还是得用get方法。
串行+并行化案例
先串行去查询商品的,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作。
@RequestMapping("/update5")
public Student update5() throws ExecutionException, InterruptedException {
Student student = new Student();
//1.0 第一个任务(第一个任务先执行,内部需要执行5分钟)
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer integer = search1();
student.setUserId(integer);
return integer;
}
}, executor);
//2.0 第二个任务(第二个任务和第一个任务可以并行执行,内部也需要执行5分钟)
CompletableFuture<Void> future2 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
String areaSchool = search2();
student.setAreaSchool(areaSchool);
}
}, executor);
//3.0 第三个任务(需要依靠第一个任务返回的数据,其内部执行也需要5秒)
CompletableFuture<Void> future3 = future1.thenAcceptAsync(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
integer = integer + 10;
student.setAge(integer);
}
});
//4.0 使用allOf达到必须所有任务执行完的效果
CompletableFuture<Void> futureAll = CompletableFuture.allOf(future2, future3);
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis() + "&&Student" + student);
//阻塞主进程,等待子进程全部执行完毕!
futureAll.get();
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis() + "&&Student" + student);
return student;
}
//搜索子方法
private Integer search1() {
try {
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
//查询时间默认5秒钟
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return 1;
}
}
private String search2() {
try {
System.out.println(Thread.currentThread().getName() + "&&" + System.currentTimeMillis());
//查询时间默认5秒钟
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return "gz";
}
}
结果(为了对比直观,这里把currentTimeMillis的前几位都截掉了)
pool-2-thread-1&&385015
pool-2-thread-2&&385015
http-nio-8003-exec-1&&385016&&StudentStudent(userId=null, areaSchool=null, age=null)
ForkJoinPool.commonPool-worker-4&&395015
http-nio-8003-exec-1&&395015&&StudentStudent(userId=1, areaSchool=gz, age=11)
总结
CompletableFuture的优点
异步任务结束时,会自动回调某个对象的方法(whenComplete/handle)
异步任务出错时,会自动回调某个对象的方法(exceptionally)
主线程设置好回调后,不再关心异步任务的执行
多个CompletableFuture可以串行执行(thenApply/thenAccept/thenRun)
多个CompletableFuture可以并行执行(both/either/allof/anyof)