Reactive 是什么?
在说 WebFlux 之前,我们先看一下 Reactive 是什么
public static void main(String[] args) {
// Mono: 0-1 个元素
// Flux: 0-N 个元素
String[] strs = {"1", "2", "3"};
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("接收到数据: " + integer);
try{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完,调用 request 再请求一个数据
this.subscription.request(1);
// 或者已经达到目标,后面不在接收数据
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常
throwable.printStackTrace();
// 不再接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("全部数据处理完了");
}
};
// JDK8 的 stream 操作
Flux.fromArray(strs).map(Integer::parseInt)
// 最终操作,就是 JDK9 的响应式流
.subscribe(subscriber);
}
接收到数据: 1
接收到数据: 2
接收到数据: 3
全部数据处理完了
我们可以看出来 Reacotor 其实就是 JDK8 的 **Stream 流操作 + JDK 9 的 Reacotre Stream 响应式流的整合
Mono
Mono **是一个返回 0:1 的关系,返回 0 个或 1 个数据**
@RestController
public class TestController {
@GetMapping("/1")
private String get1() {
return "some string";
}
@GetMapping("/2")
private Mono<String> get2() {
return Mono.just("some string");
}
}
get1:是传统的访问模式,get2:是 webFlux 的访问模式,两者有什么区别呢,通过浏览器访问,发现两者并没有什么区别,都是返回字符串“some string”
我们稍微调整一下让代码增加一点耗时操作
@GetMapping("/1")
private String get1() {
log.info("get1 start");
String result = doSomething();
log.info("get1 end");
return result;
}
@GetMapping("/2")
private Mono<String> get2() {
log.info("get2 start");
Mono<String> result = Mono.fromSupplier(this::doSomething);
log.info("get2 end");
return result;
}
public String doSomething() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "some string";
}
我们添加了日志打印,继续来看一下执行时间,我们可以看出 get1 花费了5秒的时间,而 get2 几乎没有花费时间,这就大大节约了容器的线程等待时间
Flux
Flux **是一个 1:N 关系,返回 1 个或 N 个数据,注意: `produces = “text/event-stream”**` 才可以真确返回流数据
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> get3() {
log.info("get3 start");
Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "flux data - " + i;
}));
log.info("get3 end");
return result;
}
效果如下所示:
SSE 技术
我们知道 Flux 可以返回多次数据,但是 HTTP 协议是基于一问一答的形式,那么,Flux 是如何做到多次返回的呢?
其实他使用的就是 H5 的 SSE(Server-Send Event)技术
服务端
注意:实现 SSE,必须符合其格式要求
- 设置 Content-type: text/event-stream
- 数据返回格式:data: + 返回的数据 + 2个回车符
/**
* 实现 SSE (Server-Send Event)
*/
@WebServlet(name = "SSE", urlPatterns = "/sse")
public class SSE extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
for (int i = 0; i < 5; i++) {
// 指定事件标识(非必须),如果指定事件标识,那么前端就必须使用 addEventListener 来接收数据
// 格式 event: + 标识 + 1个回车
response.getWriter().write("event:flag\n");
// 格式 data: + 数据 + 2个回车
response.getWriter().write("data:" + i + "\n\n");
response.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
客户端
我们继续编写客户端代码,看看客户端是如何接收这些数据的
<script>
let sse = new EventSource("sse");
// 无事件标识,接受数据
sse.onmessage = function (e) {
console.log("message", e.data);
};
// 使用时间标识,接受数据
sse.addEventListener("flag", function (e) {
console.log("event", e.data);
// SSE 会自动重连,所以需要判断
if (e.data === "4") {
sse.close();
}
});
</script>
前端效果如下: