Reactive 是什么?

在说 WebFlux 之前,我们先看一下 Reactive 是什么

  1. public static void main(String[] args) {
  2. // Mono: 0-1 个元素
  3. // Flux: 0-N 个元素
  4. String[] strs = {"1", "2", "3"};
  5. Subscriber<Integer> subscriber = new Subscriber<Integer>() {
  6. Subscription subscription;
  7. @Override
  8. public void onSubscribe(Subscription subscription) {
  9. this.subscription = subscription;
  10. this.subscription.request(1);
  11. }
  12. @Override
  13. public void onNext(Integer integer) {
  14. System.out.println("接收到数据: " + integer);
  15. try{
  16. TimeUnit.SECONDS.sleep(3);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. // 处理完,调用 request 再请求一个数据
  21. this.subscription.request(1);
  22. // 或者已经达到目标,后面不在接收数据
  23. // this.subscription.cancel();
  24. }
  25. @Override
  26. public void onError(Throwable throwable) {
  27. // 出现了异常
  28. throwable.printStackTrace();
  29. // 不再接收数据了
  30. this.subscription.cancel();
  31. }
  32. @Override
  33. public void onComplete() {
  34. // 全部数据处理完了(发布者关闭了)
  35. System.out.println("全部数据处理完了");
  36. }
  37. };
  38. // JDK8 的 stream 操作
  39. Flux.fromArray(strs).map(Integer::parseInt)
  40. // 最终操作,就是 JDK9 的响应式流
  41. .subscribe(subscriber);
  42. }
  43. 接收到数据: 1
  44. 接收到数据: 2
  45. 接收到数据: 3
  46. 全部数据处理完了

我们可以看出来 Reacotor 其实就是 JDK8 的 **Stream 流操作 + JDK 9 的 Reacotre Stream 响应式流的整合

Mono

Mono **是一个返回 0:1 的关系,返回 0 个或 1 个数据**

  1. @RestController
  2. public class TestController {
  3. @GetMapping("/1")
  4. private String get1() {
  5. return "some string";
  6. }
  7. @GetMapping("/2")
  8. private Mono<String> get2() {
  9. return Mono.just("some string");
  10. }
  11. }

get1:是传统的访问模式,get2:是 webFlux 的访问模式,两者有什么区别呢,通过浏览器访问,发现两者并没有什么区别,都是返回字符串“some string”

我们稍微调整一下让代码增加一点耗时操作

  1. @GetMapping("/1")
  2. private String get1() {
  3. log.info("get1 start");
  4. String result = doSomething();
  5. log.info("get1 end");
  6. return result;
  7. }
  8. @GetMapping("/2")
  9. private Mono<String> get2() {
  10. log.info("get2 start");
  11. Mono<String> result = Mono.fromSupplier(this::doSomething);
  12. log.info("get2 end");
  13. return result;
  14. }
  15. public String doSomething() {
  16. try {
  17. TimeUnit.SECONDS.sleep(5);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. return "some string";
  22. }

我们添加了日志打印,继续来看一下执行时间,我们可以看出 get1 花费了5秒的时间,而 get2 几乎没有花费时间,这就大大节约了容器的线程等待时间
image.png

Flux

Flux **是一个 1:N 关系,返回 1 个或 N 个数据,注意: `produces = “text/event-stream”**` 才可以真确返回流数据

  1. @GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  2. private Flux<String> get3() {
  3. log.info("get3 start");
  4. Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
  5. try {
  6. TimeUnit.SECONDS.sleep(1);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. return "flux data - " + i;
  11. }));
  12. log.info("get3 end");
  13. return result;
  14. }

效果如下所示:
show.gif

SSE 技术

我们知道 Flux 可以返回多次数据,但是 HTTP 协议是基于一问一答的形式,那么,Flux 是如何做到多次返回的呢?

其实他使用的就是 H5 的 SSE(Server-Send Event)技术

服务端

注意:实现 SSE,必须符合其格式要求

  • 设置 Content-type: text/event-stream
  • 数据返回格式:data: + 返回的数据 + 2个回车符
  1. /**
  2. * 实现 SSE (Server-Send Event)
  3. */
  4. @WebServlet(name = "SSE", urlPatterns = "/sse")
  5. public class SSE extends HttpServlet {
  6. @Override
  7. protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
  8. response.setContentType("text/event-stream");
  9. response.setCharacterEncoding("utf-8");
  10. for (int i = 0; i < 5; i++) {
  11. // 指定事件标识(非必须),如果指定事件标识,那么前端就必须使用 addEventListener 来接收数据
  12. // 格式 event: + 标识 + 1个回车
  13. response.getWriter().write("event:flag\n");
  14. // 格式 data: + 数据 + 2个回车
  15. response.getWriter().write("data:" + i + "\n\n");
  16. response.getWriter().flush();
  17. try {
  18. TimeUnit.SECONDS.sleep(1);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }

服务端接口访问效果如下:
sse.gif

客户端

我们继续编写客户端代码,看看客户端是如何接收这些数据的

  1. <script>
  2. let sse = new EventSource("sse");
  3. // 无事件标识,接受数据
  4. sse.onmessage = function (e) {
  5. console.log("message", e.data);
  6. };
  7. // 使用时间标识,接受数据
  8. sse.addEventListener("flag", function (e) {
  9. console.log("event", e.data);
  10. // SSE 会自动重连,所以需要判断
  11. if (e.data === "4") {
  12. sse.close();
  13. }
  14. });
  15. </script>

前端效果如下:
sse-front.gif