spring5 的web开发技术栈
非阻塞的
运行在 netty 上的
需要 servlet3.1

官网链接——recative
image.png
目前主流关系型数据库是不支持 reactor 反应式编程
水平扩展,就是加人;垂直扩展,就是加班;

异步servlet、同步Servlet

创建dynamic web project,编写 servlet3.1 的 servlet。

同步的servlet

  1. package com.demo;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeUnit;
  4. import javax.servlet.ServletException;
  5. import javax.servlet.annotation.WebServlet;
  6. import javax.servlet.http.HttpServlet;
  7. import javax.servlet.http.HttpServletRequest;
  8. import javax.servlet.http.HttpServletResponse;
  9. @WebServlet("/TestSychServlet")
  10. public class TestSychServlet extends HttpServlet {
  11. private static final long serialVersionUID = 1L;
  12. protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
  13. long t = System.currentTimeMillis();
  14. doSomething(request, response);
  15. System.out.println("TestSychServlet use time:"+(System.currentTimeMillis() - t));
  16. }
  17. private void doSomething(HttpServletRequest request, HttpServletResponse response) throws IOException {
  18. try {
  19. TimeUnit.SECONDS.sleep(3);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. response.getWriter().write("TestSychServlet done");
  24. }
  25. protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
  26. doGet(request, response);
  27. }
  28. }

异步 servlet

异步servlet 编写步骤:

  1. 开启异步,AsyncContext asyncContext = request.startAsync();
  2. 另起线程(或线程池)执行业务逻辑,比如 CompletableFuture
  3. 业务处理完成,需要通知上下文 asyncContext.complete();

    异步例子

    ```java package com.demo;

import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /**

  • 异步servlet 编写步骤:
    1. 开启异步,AsyncContext asyncContext = request.startAsync();
    1. 另起线程执行业务逻辑,比如 CompletableFuture
    1. 业务处理完成,需要通知上下文 asyncContext.complete();
  • @author DELL / @WebServlet(value = “/TestAsychServlet2”, asyncSupported = true) public class TestAsychServlet2 extends HttpServlet { private static final long serialVersionUID = 1L;

    protected void doGet(HttpServletRequest request, HttpServletResponse response)

    1. throws ServletException, IOException {
    2. long t = System.currentTimeMillis();
    3. // 开启异步
    4. AsyncContext asyncContext = request.startAsync();
    5. // 执行业务逻辑代码
    6. CompletableFuture.runAsync(() -> doSomething(asyncContext, asyncContext.getRequest(), asyncContext.getResponse()));
    7. System.out.println("TestAsychServlet2 use time:" + (System.currentTimeMillis() - t));

    }

    private void doSomething(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) {

    1. // 模拟耗时
    2. try {
    3. TimeUnit.SECONDS.sleep(3);
    4. System.out.println(">>>>>>>>>>> 3 seconds");
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. try {
    9. servletResponse.getWriter().write("TestAsychServlet2 done");
    10. } catch (IOException e) {
    11. e.printStackTrace();
    12. }
    13. // 调用上下文中 AsyncContext asyncContext 通知完成结束
    14. asyncContext.complete();

    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response)

    1. throws ServletException, IOException {
    2. doGet(request, response);

    }

}

  1. <a name="HWlOw"></a>
  2. ## 输出结果:
  3. TestAsychServlet2 use time:1<br />
  4. >>>>>>>>>>> 3 seconds
  5. <br />TestAsychServlet2 use time:0
  6. <br />>>>>>>>>>>> 3 seconds
  7. <br />TestSychServlet use time:3001
  8. <a name="gKfwU"></a>
  9. ## 小结
  10. 异步和同步,对于前端browser 来说,都是相同耗时获取到响应结果,但是servlet本身时间是不同的。传统式 servlet 被业业务阻塞,异步式servlet没有。
  11. <a name="dqJAj"></a>
  12. # reactor
  13. reactor 就是 jdk8 stream + jdk9 reactive stream<br />flux [0-n]<br />mono [0-1]
  14. <a name="ps1g8"></a>
  15. ## reactor 中的 flux 小例子
  16. ```java
  17. package com.demo;
  18. import org.reactivestreams.Subscriber;
  19. import org.reactivestreams.Subscription;
  20. import reactor.core.Disposable;
  21. import reactor.core.publisher.Flux;
  22. public class TestReactorDemo {
  23. public static void main(String[] args) {
  24. // Reactor 相当于 jdk8 stream + jdk9 reactive stream
  25. // Mono 代表 0-1 个元素
  26. // Flux 代表 0-N 个元素
  27. String[] strArr = {"1", "2", "3"};
  28. // 创建流
  29. Flux.fromArray(strArr)
  30. // 流中间处理
  31. .map(x -> Integer.parseInt(x))
  32. // 结束流,就是一个消费者
  33. .subscribe(System.out::println);
  34. Subscriber<Integer> subscriber = new Subscriber<>() {
  35. private Subscription subscription;
  36. @Override
  37. public void onSubscribe(Subscription subscription) {
  38. // 保存订阅关系
  39. this.subscription = subscription;
  40. System.out.println(">>>>>>>>>> 建立订阅");
  41. // 发个订阅要求
  42. subscription.request(1);
  43. }
  44. @Override
  45. public void onNext(Integer integer) {
  46. // 订阅到的数据,并进行相关处理
  47. System.out.println(">>>>>>>>>>>>>> 数据:" + integer);
  48. // 继续订阅 request,或者结束 cancel
  49. subscription.request(1);
  50. }
  51. @Override
  52. public void onError(Throwable throwable) {
  53. // 异常处理
  54. }
  55. @Override
  56. public void onComplete() {
  57. // 发布者关闭
  58. System.out.println(">>>>>>>> 结束了");
  59. }
  60. };
  61. Flux.fromArray(strArr)
  62. // 流中间处理
  63. .map(x -> Integer.parseInt(x))
  64. // 结束流,就是订阅,subscribe
  65. .subscribe(subscriber);
  66. }
  67. }

Flux 的 subscribe 方法有多个,多态,api 如下:
image.png

Flux 小例子小结

Flux 惰性求值,流在最终操作时才开始有值产生(个人感觉就是 java8 流式处理一样的)

webflux

引入的是 reactive web(对应springWebFlux) ,不是原来的 web (对应 SpringMVC)
lombok 功能添加,-javaagent:lombok.jar (编辑 sts.ini 或者 eclipse.ini,idea 直接插件 lombok)

官网 webflux

传统方式

所谓传统方式就是 SpringMVC 样子相似,看代码:

传统方式 demo 1 简单使用

  1. package com.demo;
  2. import org.springframework.web.bind.annotation.GetMapping;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import reactor.core.publisher.Mono;
  6. /**
  7. * 传统 SpringMVC 方式,使用 webflux
  8. */
  9. @RestController
  10. @RequestMapping("/v1/test/")
  11. public class TestController {
  12. /**
  13. * 传统 SpringMVC
  14. */
  15. @GetMapping("/1")
  16. public String get1() {
  17. return "get1";
  18. }
  19. /**
  20. * 传统方式,webflux
  21. * @return
  22. */
  23. @GetMapping("/2")
  24. public Mono<String> get2() {
  25. return Mono.just("get2");
  26. }
  27. }

传统方式 demo 2 模拟数据处理

  1. package com.demo;
  2. import java.util.concurrent.TimeUnit;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import reactor.core.publisher.Mono;
  8. /**
  9. * 传统 SpringMVC 方式,使用 webflux
  10. */
  11. @Slf4j
  12. @RestController
  13. @RequestMapping("/v1/test/demo")
  14. public class TestDemo2Controller {
  15. /**
  16. * 传统 SpringMVC
  17. */
  18. @GetMapping("/1")
  19. public String get1() {
  20. log.info("get1 start");
  21. String result = doSomething();
  22. log.info("get1 end");
  23. return result;
  24. }
  25. /**
  26. * 传统方式,webflux
  27. * @return
  28. */
  29. @GetMapping("/2")
  30. public Mono<String> get2() {
  31. log.info("get2 start");
  32. Mono<String> result = Mono.fromSupplier(this::doSomething);
  33. log.info("get2 end");
  34. return result;
  35. }
  36. /**
  37. * 模拟业务处理
  38. * @return
  39. */
  40. private String doSomething() {
  41. try {
  42. TimeUnit.SECONDS.sleep(3);
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. return "do something";
  47. }
  48. }

demo 2 访问结果:

浏览器:
image.png
日志:

  1. 2021-04-24 10:31:51.118 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get2 start
  2. 2021-04-24 10:31:51.118 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get2 end
  3. 2021-04-24 10:32:10.849 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get1 start
  4. 2021-04-24 10:32:13.849 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get1 end

中的关注time,还有后端的日志时间打印,可以发现 反应式编程是惰性的。

传统方式 demo 3 使用 Flux 返回多条数据

  1. package com.demo;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.stream.IntStream;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import reactor.core.publisher.Flux;
  9. /**
  10. * 传统 SpringMVC 方式,使用 webflux Flux 返回对象
  11. *
  12. * Flux 是 0-n 个元素
  13. */
  14. @Slf4j
  15. @RestController
  16. @RequestMapping("/v1/test/flux")
  17. public class TestFluxController {
  18. /**
  19. * 传统 webflux,不指定产生方式,一次性全部返回
  20. */
  21. @GetMapping("/1")
  22. public Flux<String> get1() {
  23. log.info("get1 called");
  24. return Flux.fromStream(IntStream.range(1, 6).mapToObj(x -> {
  25. try {
  26. TimeUnit.SECONDS.sleep(2);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. return "from flux data:" + x;
  31. }));
  32. }
  33. /**
  34. * 传统方式,webflux,指定产生方式,一次只返回一个
  35. */
  36. @GetMapping(value = "/2", produces = {"text/event-stream"})
  37. public Flux<String> get2() {
  38. log.info("get2 called");
  39. return Flux.fromStream(IntStream.range(1, 6).mapToObj(x -> {
  40. try {
  41. TimeUnit.SECONDS.sleep(2);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. return "from flux data:" + x;
  46. }));
  47. }
  48. }

结果:

http://localhost:8080/v1/test/flux/1 请求结果:
image.png
http://localhost:8080/v1/test/flux/2 请求结果:
image.png
数据逐条显示在浏览器上!!!使用SSE server-sent event,基于 html5;使用传统方式时,一般都会写两个接口,一个是一次全部返回(不指定 produces),一个是逐条返回(指定 produces 为 text/event-stream)

使用 servlet 编写 SSE(使用的是 dynamic web的工程,不是 spring webflux)

  1. package com.demo;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeUnit;
  4. import javax.servlet.ServletException;
  5. import javax.servlet.annotation.WebServlet;
  6. import javax.servlet.http.HttpServlet;
  7. import javax.servlet.http.HttpServletRequest;
  8. import javax.servlet.http.HttpServletResponse;
  9. /**
  10. * 测试使用 SSE server send event
  11. *
  12. * 逐条将结果返回到前端
  13. */
  14. @WebServlet(name = "SSEDemoServlet", value = "/SSEDemoServlet")
  15. public class SSEDemoServlet extends HttpServlet {
  16. @Override
  17. protected void doPost(HttpServletRequest request, HttpServletResponse response)
  18. throws ServletException, IOException {
  19. doGet(request, response);
  20. }
  21. @Override
  22. protected void doGet(HttpServletRequest request, HttpServletResponse response)
  23. throws ServletException, IOException {
  24. // 使用SSE 需要设置 response 消息头
  25. response.setContentType("text/event-stream");
  26. response.setCharacterEncoding("utf-8");
  27. // 模拟数据
  28. for (int i = 0; i < 5; i++) {
  29. // 添加event id标识,格式:id: + id值 + 一个回车
  30. response.getWriter().write("id:" + Math.random() + "\n");
  31. // 添加event type标识,格式:event: + 事件名称 + 一个回车
  32. response.getWriter().write("event:me\n");
  33. // 数据的格式需要是这样: data: + 数据 + 2个回车
  34. response.getWriter().write("data:" + i + "\n\n");
  35. response.getWriter().flush();
  36. try {
  37. TimeUnit.SECONDS.sleep(1);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }
  43. }
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>sse (Server Send Event)测试</title>
  6. </head>
  7. <body>
  8. <h1>测试 SSE Server Send Event</h1>
  9. </body>
  10. <script type="text/javascript">
  11. // 初始化参数,参数为 url,此处使用相对路径
  12. // sse 会自动链接服务器,不关闭一直会链接
  13. var sse = new EventSource("SSEDemoServlet");
  14. // 收到信息时的处理
  15. sse.onmessage = function (ev) {
  16. console.log("message", ev.data, ev);
  17. }
  18. // 添加时间监听
  19. sse.addEventListener("me", function (evt) {
  20. console.log("me event", evt.data, evt);
  21. if (evt.data == 3) {
  22. // 关闭链接
  23. sse.close();
  24. }
  25. })
  26. </script>
  27. </html>

SSE 不关闭会一直拿数据,自动链接。
注意:在 spring webflux 中是没有 servlet3.1 jar包的,即使导入依赖

  1. <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
  2. <dependency>
  3. <groupId>javax.servlet</groupId>
  4. <artifactId>javax.servlet-api</artifactId>
  5. <version>3.1.0</version>
  6. <scope>provided</scope>
  7. </dependency>

对应的 servlet 无法正常访问(状态是200,但是response 的 header 不是自己设置的 text/event-stream),搞得没办法,先且在 dynamic web 项目中测试上面的 SSE demo。
image.png

使用 routerfunction 方式

主要变化是将原来的 HttpServletRequest 抽象成 ServerRequest,HttpServletResponse 抽象为 ServerResponser

开发步骤

handlerFunction(输入 ServerRequest,返回 ServerResponse)

  1. RouterFunction(请求url 和 HandlerFunction 对应起来)
  2. 建立一个HttpHandler 处理相关的请求
  3. Server 处理

    routerfunction 代码

    ```java package com.demo.webflux;

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse;

/**

  • 测试 webflux,使用 路由函数, routerfunction */ @Configuration public class TestWebFluxByRouterFunction {

    @Bean RouterFunction webfluxDemoRouter(WebFluxDemoHandler handler) {

    1. return RouterFunctions
    2. // 相当于类上的路径
    3. .nest(RequestPredicates.path("/v1/web/flux/demo"),
    4. RouterFunctions
    5. // 相当于方法上的路径
    6. .route(RequestPredicates.GET("/"), handler::getAll)
    7. .andRoute(RequestPredicates.DELETE("/{id}"), handler::deleteById)
    8. .andRoute(RequestPredicates.POST("/add")
    9. // 设置请求格式
    10. .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::add)
    11. .andRoute(RequestPredicates.GET("/init"), handler::init)
    12. .andRoute(RequestPredicates.GET("/{id}"), handler::getById)
    13. );

    }

}

  1. 需要指定为 configuration,内部创建了 Bean 对象。
  2. <a name="V9H0D"></a>
  3. ### 建立一个 handler: WebFluxDemoHandler
  4. ```java
  5. package com.demo.webflux;
  6. import java.time.LocalDate;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. import java.util.Objects;
  10. import java.util.stream.Collectors;
  11. import java.util.stream.IntStream;
  12. import org.springframework.http.MediaType;
  13. import org.springframework.stereotype.Component;
  14. import org.springframework.web.reactive.function.server.ServerRequest;
  15. import org.springframework.web.reactive.function.server.ServerResponse;
  16. import reactor.core.publisher.Flux;
  17. import reactor.core.publisher.Mono;
  18. /**
  19. * 自定义一个 HandlerFunction,传入参数 {@link ServerRequest ServerRequest}, 返回 Mono<T> 此处 T 为 {@link ServerResponse
  20. * ServerResponse}
  21. *
  22. * @see org.springframework.web.reactive.function.server.HandlerFunction#handle(ServerRequest)
  23. */
  24. @Component
  25. public class WebFluxDemoHandler {
  26. /**
  27. * 模拟数据库数据
  28. */
  29. private List<WebFluxDemo> data = IntStream.range(1, 5)
  30. .mapToObj(x -> WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x)
  31. .build())
  32. .collect(Collectors.toList());
  33. /**
  34. * 初始化数据,并返回数据
  35. */
  36. public Mono<ServerResponse> init(ServerRequest request) {
  37. this.data = IntStream.range(1, 5)
  38. .mapToObj(
  39. x -> WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x)
  40. .build())
  41. .collect(Collectors.toList());
  42. return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Flux.fromIterable(data),
  43. WebFluxDemo.class);
  44. }
  45. /**
  46. * 查询所有的数据
  47. */
  48. public Mono<ServerResponse> getAll(ServerRequest request) {
  49. return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
  50. .body(Flux.fromIterable(data), WebFluxDemo.class);
  51. }
  52. /**
  53. * 根据id,查询数据
  54. */
  55. public Mono<ServerResponse> getById(ServerRequest request) {
  56. String id = request.pathVariable("id");
  57. WebFluxDemo first = data.stream().filter(x -> x.getId().equals(id)).findFirst().get();
  58. Mono<ServerResponse> result = null;
  59. if (Objects.isNull(first)) {
  60. result = ServerResponse.notFound().build();
  61. } else {
  62. result = ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just(first),
  63. WebFluxDemo.class);
  64. }
  65. return result;
  66. }
  67. /**
  68. * 根据id,删除数据
  69. */
  70. public Mono<ServerResponse> deleteById(ServerRequest request) {
  71. String id = request.pathVariable("id");
  72. WebFluxDemo first = data.stream().filter(x -> x.getId().equals(id)).findFirst().orElseGet(()->null);
  73. Mono<ServerResponse> result = null;
  74. if (Objects.isNull(first)) {
  75. result = ServerResponse.notFound().build();
  76. } else {
  77. data.remove(first);
  78. result = ServerResponse.ok().build();
  79. }
  80. return result;
  81. }
  82. /**
  83. * 添加数据
  84. */
  85. public Mono<ServerResponse> add(ServerRequest request) {
  86. return request.bodyToMono(WebFluxDemo.class).flatMap(demo -> {
  87. data.add(demo);
  88. return ServerResponse.ok().body(Mono.just(demo), WebFluxDemo.class);
  89. }).switchIfEmpty(ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).body(Mono.just("请求错误"),
  90. String.class));
  91. }
  92. }

handler 返回值是 Mono,参数是 ServerRequest。

SSE 方式查询结果 ServerResponse.ok().contentType(MediaType.APPLICATION_JSON):

  1. /**
  2. * 查询所有的数据
  3. */
  4. public Mono<ServerResponse> getAll(ServerRequest request) {
  5. // return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
  6. // .body(Flux.fromIterable(data), WebFluxDemo.class);
  7. return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(Flux.fromStream(IntStream.range(1, 5)
  8. .mapToObj(x -> {
  9. try {
  10. TimeUnit.SECONDS.sleep(2);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. return WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name(
  15. "xiaohui" + x)
  16. .build();
  17. })),
  18. WebFluxDemo.class);
  19. }

routerfunction 方式如何设定为 text/event-stream (sse server send event),难道默认就是?
这个是需要设置的 ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM)。
image.png

普通 domain 类 WebFluxDemo

  1. package com.demo.webflux;
  2. import java.time.LocalDate;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Builder;
  5. import lombok.Data;
  6. import lombok.NoArgsConstructor;
  7. /**
  8. * 测试webflux 的一个对象信息
  9. */
  10. @Data
  11. @Builder
  12. @NoArgsConstructor
  13. @AllArgsConstructor
  14. public class WebFluxDemo {
  15. private String id;
  16. private String name;
  17. private Integer age;
  18. private LocalDate birthday;
  19. }

数据库的crud操作

关系型引入 r2dbc,MongoDB已支持

小结

流式编程,就是要重点关注 publisher、subscriber、flow、flux、mono这几个。

问题

  1. webflux 中如何使用servlet原生编程
  2. web与webflux能否同时使用
  3. routerfunction 方式如何设定为 text/event-stream (sse server send event),难道默认就是?如何做到文章分段返回与渲染(长文章,图片)
  4. 关于routerfunction 方式的参数校验与errorhandler,mono.error等
  5. 反应式编程,什么时候指定 request请求数量的?publisher的关闭,subscriber 的订阅?