阻塞命令式编程
原有的tomcat业务处理是业务线程池中的一个线程处理,request和response都是在这个线程进行业务处理,执行我们编写代码命令。后面的命令需要等到前面的命令产生的数据输入。阻塞式顺序执行。中间的操作如果调用别的服务,或者新开线程处理,也需要阻塞等待数据返回。最后客户端等待所有命令执行完毕,获得执行结果返回结束阻塞。
使用Mq异步解耦的方法
A服务不阻塞等待B服务,而是发送Mq让B服务异步消费,自己直接返回客户端。但是如果客户端需要B服务处理的结果,那么通知客户端成为问题,所以之前都是异步处理不用实时给客户返回的任务。不适合客户端需要实时等待处理结果的业务
如果实在要使用:
- 那么 http 协议想要获取B服务的处理结果,只有通过轮询,长轮询的方法去向服务器取结果。
- 轮询:ajax定时调用后端开放的查询api
- 长轮询:服务器端接收请求,hold住请求,知道有数据才返回,感受上就相当于服务器向客户端推送。
- 异步回调的协议:
- websocket,ws://192.168.5.6/xxoo
ServerSentEvent轮询
实现的结果,服务器给客户端推消息
<script>
var sse = new EventSource("/url")
//事件返回
sse.onmessage = function(data){
data.data//可以取到数据
}
</script>
@requestMapping(value ="/sse",produces="text/event-stream;charset="utf-8")
public object xxoo(){
return new data();
}
@RestController
@RequestMapping(path = "/sse")
public class SseRest {
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "/subscribe")
public SseEmitter subscribe(String id) {
// 超时时间设置为1小时
SseEmitter sseEmitter = new SseEmitter(3600000L);
sseCache.put(id, sseEmitter);
// 超时回调 触发
sseEmitter.onTimeout(() -> sseCache.remove(id));
// 结束之后的回调触发
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
@GetMapping(path = "/push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
// 发送消息
sseEmitter.send(content);
}
return "over";
}
@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
// 执行完毕,断开连接
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}
@GetMapping(path = "/push-all")
public String pushAll(String content) throws IOException {
for (String s : sseCache.keySet()) {
SseEmitter sseEmitter = sseCache.get(s);
if (sseEmitter != null) {
// 发送消息
sseEmitter.send(content);
}
}
return "over";
}
}
异步回调和事件(消息)通知的区别
异步回调
A线程开一个新线程B处理业务方法C,并注册了一个回调函数D,A继续执行或执行结束,该回调函数D最后是在另外一个B线程执行。本质上C和D之间还是阻塞的,我们并不能用这种回调的方式解决阻塞问题。
事件通知
不同的线程通过订阅和发布事件通道中的事件,来执行具体的业务逻辑。使用了观察者模式
tomcat 下的响应式编程如何实现
业务线程池不处理具体的servlet业务。通过事件通道,基于事件通知由新建的业务线程池进行处理,出于容错机制,我们可以资源隔离,不同的线程池都不阻塞等待,处理不同的业务。
响应式系统的主流框架
project reactor
- netty
- servlet3.1
- spring webflux
- spring data reactive repositories
什么样的服务是可以异步MQ处理
- 边界明确:和主业务不是一个服务。
- 不需要实时向客户端返回结果。
微服务内的响应式web
响应式web,业务线程收到请求,发布事件,直接返回,不再阻塞。
微服务内需要达到响应式,需要解决以下两个问题服务调用阻塞
通过事件/消息驱动,异步的发布和订阅事件的方法reactor解耦数据库连接阻塞
支持reactor,可以主动推消息的数据库,MongoDB等
servlet3.0/3.1
3.0 我们先通过request.startAsync()获取到该请求对应的AsyncContext,然后调用AsyncContext的start()方法进行异步处理,处理完毕后需要调用complete()方法告知Servlet容器。
3.1 IO 也是非阻塞的,添加读写监听器。readListener onAllDataRead 。
webFlux的使用
引入webFlux包,使用的web容器就变成了netty了。
package com.mashibing.admin;
import java.util.Random;
import java.util.stream.IntStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.WebSession;
import com.mashibing.admin.pojo.Person;
import com.mashibing.admin.service.PersonService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// 注解式
// 函数式
@RestController
@RequestMapping("/person")
public class PersonController {
@Autowired
PersonService personSrv;
@GetMapping("")
Mono<Object> get(String name){
System.out.println("线程 get" + Thread.currentThread().getName());
System.out.println("---1");
// 异步
Mono<Object> mono = Mono.create(sink -> {
// 组装数据序列
System.out.println("线程 create" + Thread.currentThread().getName());
sink.success(personSrv.getPerson());
})
.doOnSubscribe(sub -> {
// 1 订阅
System.out.println("xxx");
})
.doOnNext(data -> {
// 得到数据
System.out.println("data:" + data);
})
.doOnSuccess(onSuccess -> {
// 整体完成
System.out.println("onSuccess");
});
System.out.println("---2");
// SpringMvc 值 在这个环节准备好
// 得到一个包装 数据序列 -> 包含特征 -> 容器 拿到这个序列 -> 执行序列里的方法
// Ajax a() -> b(c()) ->
// 1, 写回调接口 , 让b调
// 2, 直接传方法过去
// 看起来 像是异步,实质上,阻塞的过程 在容器内部
return mono;
}
@GetMapping("xxoo")
// ServerHttpRequest webFlux 中特有
// 拓展思维,SpringCloud Gateway
Mono<Object> get2(ServerHttpRequest request,String name
,
WebSession session
){
//System.out.println("name:" + name);
if(StringUtils.isEmpty(session.getAttribute("code"))) {
System.out.println("我要set了~");
session.getAttributes().put("code", 250);
}
System.out.println("code = " + session.getAttribute("code"));
return Mono.just("么么哒");
}
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sse(){
// 1. 封装对象
Flux<String> flux = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "xxoo" + i;
}))
.doOnSubscribe(sub -> {
System.out.println("sub 了");
})
.doOnComplete(() -> {
System.out.println("doOnComplete");
})
.doOnNext(data -> {
System.out.println("有data了~" + data);
})
;
// 2. 对象 连带里面的方法 给了容器
return flux;
}
}