Reactive 是什么?
在说 WebFlux 之前,我们先看一下 Reactive 是什么
public static void main(String[] args) {// Mono: 0-1 个元素// Flux: 0-N 个元素String[] strs = {"1", "2", "3"};Subscriber<Integer> subscriber = new Subscriber<Integer>() {Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(Integer integer) {System.out.println("接收到数据: " + integer);try{TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完,调用 request 再请求一个数据this.subscription.request(1);// 或者已经达到目标,后面不在接收数据// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常throwable.printStackTrace();// 不再接收数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("全部数据处理完了");}};// JDK8 的 stream 操作Flux.fromArray(strs).map(Integer::parseInt)// 最终操作,就是 JDK9 的响应式流.subscribe(subscriber);}接收到数据: 1接收到数据: 2接收到数据: 3全部数据处理完了
我们可以看出来 Reacotor 其实就是 JDK8 的 **Stream 流操作 + JDK 9 的 Reacotre Stream 响应式流的整合
Mono
Mono **是一个返回 0:1 的关系,返回 0 个或 1 个数据**
@RestControllerpublic class TestController {@GetMapping("/1")private String get1() {return "some string";}@GetMapping("/2")private Mono<String> get2() {return Mono.just("some string");}}
get1:是传统的访问模式,get2:是 webFlux 的访问模式,两者有什么区别呢,通过浏览器访问,发现两者并没有什么区别,都是返回字符串“some string”
我们稍微调整一下让代码增加一点耗时操作
@GetMapping("/1")private String get1() {log.info("get1 start");String result = doSomething();log.info("get1 end");return result;}@GetMapping("/2")private Mono<String> get2() {log.info("get2 start");Mono<String> result = Mono.fromSupplier(this::doSomething);log.info("get2 end");return result;}public String doSomething() {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return "some string";}
我们添加了日志打印,继续来看一下执行时间,我们可以看出 get1 花费了5秒的时间,而 get2 几乎没有花费时间,这就大大节约了容器的线程等待时间
Flux
Flux **是一个 1:N 关系,返回 1 个或 N 个数据,注意: `produces = “text/event-stream”**` 才可以真确返回流数据
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)private Flux<String> get3() {log.info("get3 start");Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "flux data - " + i;}));log.info("get3 end");return result;}
效果如下所示:
SSE 技术
我们知道 Flux 可以返回多次数据,但是 HTTP 协议是基于一问一答的形式,那么,Flux 是如何做到多次返回的呢?
其实他使用的就是 H5 的 SSE(Server-Send Event)技术
服务端
注意:实现 SSE,必须符合其格式要求
- 设置 Content-type: text/event-stream
- 数据返回格式:data: + 返回的数据 + 2个回车符
/*** 实现 SSE (Server-Send Event)*/@WebServlet(name = "SSE", urlPatterns = "/sse")public class SSE extends HttpServlet {@Overrideprotected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {response.setContentType("text/event-stream");response.setCharacterEncoding("utf-8");for (int i = 0; i < 5; i++) {// 指定事件标识(非必须),如果指定事件标识,那么前端就必须使用 addEventListener 来接收数据// 格式 event: + 标识 + 1个回车response.getWriter().write("event:flag\n");// 格式 data: + 数据 + 2个回车response.getWriter().write("data:" + i + "\n\n");response.getWriter().flush();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}}
客户端
我们继续编写客户端代码,看看客户端是如何接收这些数据的
<script>let sse = new EventSource("sse");// 无事件标识,接受数据sse.onmessage = function (e) {console.log("message", e.data);};// 使用时间标识,接受数据sse.addEventListener("flag", function (e) {console.log("event", e.data);// SSE 会自动重连,所以需要判断if (e.data === "4") {sse.close();}});</script>
前端效果如下:
