spring5 的web开发技术栈
非阻塞的
运行在 netty 上的
需要 servlet3.1
官网链接——recative
目前主流关系型数据库是不支持 reactor 反应式编程
水平扩展,就是加人;垂直扩展,就是加班;
异步servlet、同步Servlet
创建dynamic web project,编写 servlet3.1 的 servlet。
同步的servlet
package com.demo;import java.io.IOException;import java.util.concurrent.TimeUnit;import javax.servlet.ServletException;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;@WebServlet("/TestSychServlet")public class TestSychServlet extends HttpServlet {private static final long serialVersionUID = 1L;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {long t = System.currentTimeMillis();doSomething(request, response);System.out.println("TestSychServlet use time:"+(System.currentTimeMillis() - t));}private void doSomething(HttpServletRequest request, HttpServletResponse response) throws IOException {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}response.getWriter().write("TestSychServlet done");}protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {doGet(request, response);}}
异步 servlet
异步servlet 编写步骤:
- 开启异步,AsyncContext asyncContext = request.startAsync();
- 另起线程(或线程池)执行业务逻辑,比如 CompletableFuture
- 业务处理完成,需要通知上下文 asyncContext.complete();
异步例子
```java package com.demo;
import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /**
- 异步servlet 编写步骤:
- 开启异步,AsyncContext asyncContext = request.startAsync();
- 另起线程执行业务逻辑,比如 CompletableFuture
- 业务处理完成,需要通知上下文 asyncContext.complete();
@author DELL / @WebServlet(value = “/TestAsychServlet2”, asyncSupported = true) public class TestAsychServlet2 extends HttpServlet { private static final long serialVersionUID = 1L;
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {long t = System.currentTimeMillis();// 开启异步AsyncContext asyncContext = request.startAsync();// 执行业务逻辑代码CompletableFuture.runAsync(() -> doSomething(asyncContext, asyncContext.getRequest(), asyncContext.getResponse()));System.out.println("TestAsychServlet2 use time:" + (System.currentTimeMillis() - t));
}
private void doSomething(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) {
// 模拟耗时try {TimeUnit.SECONDS.sleep(3);System.out.println(">>>>>>>>>>> 3 seconds");} catch (InterruptedException e) {e.printStackTrace();}try {servletResponse.getWriter().write("TestAsychServlet2 done");} catch (IOException e) {e.printStackTrace();}// 调用上下文中 AsyncContext asyncContext 通知完成结束asyncContext.complete();
}
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {doGet(request, response);
}
}
<a name="HWlOw"></a>## 输出结果:TestAsychServlet2 use time:1<br />>>>>>>>>>>> 3 seconds<br />TestAsychServlet2 use time:0<br />>>>>>>>>>>> 3 seconds<br />TestSychServlet use time:3001<a name="gKfwU"></a>## 小结异步和同步,对于前端browser 来说,都是相同耗时获取到响应结果,但是servlet本身时间是不同的。传统式 servlet 被业业务阻塞,异步式servlet没有。<a name="dqJAj"></a># reactorreactor 就是 jdk8 stream + jdk9 reactive stream<br />flux [0-n]<br />mono [0-1]<a name="ps1g8"></a>## reactor 中的 flux 小例子```javapackage com.demo;import org.reactivestreams.Subscriber;import org.reactivestreams.Subscription;import reactor.core.Disposable;import reactor.core.publisher.Flux;public class TestReactorDemo {public static void main(String[] args) {// Reactor 相当于 jdk8 stream + jdk9 reactive stream// Mono 代表 0-1 个元素// Flux 代表 0-N 个元素String[] strArr = {"1", "2", "3"};// 创建流Flux.fromArray(strArr)// 流中间处理.map(x -> Integer.parseInt(x))// 结束流,就是一个消费者.subscribe(System.out::println);Subscriber<Integer> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系this.subscription = subscription;System.out.println(">>>>>>>>>> 建立订阅");// 发个订阅要求subscription.request(1);}@Overridepublic void onNext(Integer integer) {// 订阅到的数据,并进行相关处理System.out.println(">>>>>>>>>>>>>> 数据:" + integer);// 继续订阅 request,或者结束 cancelsubscription.request(1);}@Overridepublic void onError(Throwable throwable) {// 异常处理}@Overridepublic void onComplete() {// 发布者关闭System.out.println(">>>>>>>> 结束了");}};Flux.fromArray(strArr)// 流中间处理.map(x -> Integer.parseInt(x))// 结束流,就是订阅,subscribe.subscribe(subscriber);}}
Flux 的 subscribe 方法有多个,多态,api 如下:
Flux 小例子小结
Flux 惰性求值,流在最终操作时才开始有值产生(个人感觉就是 java8 流式处理一样的)
webflux
引入的是 reactive web(对应springWebFlux) ,不是原来的 web (对应 SpringMVC)
lombok 功能添加,-javaagent:lombok.jar (编辑 sts.ini 或者 eclipse.ini,idea 直接插件 lombok)
官网 webflux
传统方式
传统方式 demo 1 简单使用
package com.demo;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import reactor.core.publisher.Mono;/*** 传统 SpringMVC 方式,使用 webflux*/@RestController@RequestMapping("/v1/test/")public class TestController {/*** 传统 SpringMVC*/@GetMapping("/1")public String get1() {return "get1";}/*** 传统方式,webflux* @return*/@GetMapping("/2")public Mono<String> get2() {return Mono.just("get2");}}
传统方式 demo 2 模拟数据处理
package com.demo;import java.util.concurrent.TimeUnit;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import reactor.core.publisher.Mono;/*** 传统 SpringMVC 方式,使用 webflux*/@Slf4j@RestController@RequestMapping("/v1/test/demo")public class TestDemo2Controller {/*** 传统 SpringMVC*/@GetMapping("/1")public String get1() {log.info("get1 start");String result = doSomething();log.info("get1 end");return result;}/*** 传统方式,webflux* @return*/@GetMapping("/2")public Mono<String> get2() {log.info("get2 start");Mono<String> result = Mono.fromSupplier(this::doSomething);log.info("get2 end");return result;}/*** 模拟业务处理* @return*/private String doSomething() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "do something";}}
demo 2 访问结果:
浏览器:
日志:
2021-04-24 10:31:51.118 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get2 start2021-04-24 10:31:51.118 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get2 end2021-04-24 10:32:10.849 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get1 start2021-04-24 10:32:13.849 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get1 end
中的关注time,还有后端的日志时间打印,可以发现 反应式编程是惰性的。
传统方式 demo 3 使用 Flux 返回多条数据
package com.demo;import java.util.concurrent.TimeUnit;import java.util.stream.IntStream;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import reactor.core.publisher.Flux;/*** 传统 SpringMVC 方式,使用 webflux Flux 返回对象** Flux 是 0-n 个元素*/@Slf4j@RestController@RequestMapping("/v1/test/flux")public class TestFluxController {/*** 传统 webflux,不指定产生方式,一次性全部返回*/@GetMapping("/1")public Flux<String> get1() {log.info("get1 called");return Flux.fromStream(IntStream.range(1, 6).mapToObj(x -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "from flux data:" + x;}));}/*** 传统方式,webflux,指定产生方式,一次只返回一个*/@GetMapping(value = "/2", produces = {"text/event-stream"})public Flux<String> get2() {log.info("get2 called");return Flux.fromStream(IntStream.range(1, 6).mapToObj(x -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "from flux data:" + x;}));}}
结果:
http://localhost:8080/v1/test/flux/1 请求结果:
http://localhost:8080/v1/test/flux/2 请求结果:
数据逐条显示在浏览器上!!!使用SSE server-sent event,基于 html5;使用传统方式时,一般都会写两个接口,一个是一次全部返回(不指定 produces),一个是逐条返回(指定 produces 为 text/event-stream)
使用 servlet 编写 SSE(使用的是 dynamic web的工程,不是 spring webflux)
package com.demo;import java.io.IOException;import java.util.concurrent.TimeUnit;import javax.servlet.ServletException;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;/*** 测试使用 SSE server send event** 逐条将结果返回到前端*/@WebServlet(name = "SSEDemoServlet", value = "/SSEDemoServlet")public class SSEDemoServlet extends HttpServlet {@Overrideprotected void doPost(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {doGet(request, response);}@Overrideprotected void doGet(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {// 使用SSE 需要设置 response 消息头response.setContentType("text/event-stream");response.setCharacterEncoding("utf-8");// 模拟数据for (int i = 0; i < 5; i++) {// 添加event id标识,格式:id: + id值 + 一个回车response.getWriter().write("id:" + Math.random() + "\n");// 添加event type标识,格式:event: + 事件名称 + 一个回车response.getWriter().write("event:me\n");// 数据的格式需要是这样: data: + 数据 + 2个回车response.getWriter().write("data:" + i + "\n\n");response.getWriter().flush();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}}
<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>sse (Server Send Event)测试</title></head><body><h1>测试 SSE Server Send Event</h1></body><script type="text/javascript">// 初始化参数,参数为 url,此处使用相对路径// sse 会自动链接服务器,不关闭一直会链接var sse = new EventSource("SSEDemoServlet");// 收到信息时的处理sse.onmessage = function (ev) {console.log("message", ev.data, ev);}// 添加时间监听sse.addEventListener("me", function (evt) {console.log("me event", evt.data, evt);if (evt.data == 3) {// 关闭链接sse.close();}})</script></html>
SSE 不关闭会一直拿数据,自动链接。
注意:在 spring webflux 中是没有 servlet3.1 jar包的,即使导入依赖
<!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api --><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><version>3.1.0</version><scope>provided</scope></dependency>
对应的 servlet 无法正常访问(状态是200,但是response 的 header 不是自己设置的 text/event-stream),搞得没办法,先且在 dynamic web 项目中测试上面的 SSE demo。
使用 routerfunction 方式
主要变化是将原来的 HttpServletRequest 抽象成 ServerRequest,HttpServletResponse 抽象为 ServerResponser
开发步骤
handlerFunction(输入 ServerRequest,返回 ServerResponse)
- RouterFunction(请求url 和 HandlerFunction 对应起来)
- 建立一个HttpHandler 处理相关的请求
- Server 处理
routerfunction 代码
```java package com.demo.webflux;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse;
/**
测试 webflux,使用 路由函数, routerfunction */ @Configuration public class TestWebFluxByRouterFunction {
@Bean RouterFunction
webfluxDemoRouter(WebFluxDemoHandler handler) { return RouterFunctions// 相当于类上的路径.nest(RequestPredicates.path("/v1/web/flux/demo"),RouterFunctions// 相当于方法上的路径.route(RequestPredicates.GET("/"), handler::getAll).andRoute(RequestPredicates.DELETE("/{id}"), handler::deleteById).andRoute(RequestPredicates.POST("/add")// 设置请求格式.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::add).andRoute(RequestPredicates.GET("/init"), handler::init).andRoute(RequestPredicates.GET("/{id}"), handler::getById));
}
}
需要指定为 configuration,内部创建了 Bean 对象。<a name="V9H0D"></a>### 建立一个 handler: WebFluxDemoHandler```javapackage com.demo.webflux;import java.time.LocalDate;import java.util.Arrays;import java.util.List;import java.util.Objects;import java.util.stream.Collectors;import java.util.stream.IntStream;import org.springframework.http.MediaType;import org.springframework.stereotype.Component;import org.springframework.web.reactive.function.server.ServerRequest;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;/*** 自定义一个 HandlerFunction,传入参数 {@link ServerRequest ServerRequest}, 返回 Mono<T> 此处 T 为 {@link ServerResponse* ServerResponse}** @see org.springframework.web.reactive.function.server.HandlerFunction#handle(ServerRequest)*/@Componentpublic class WebFluxDemoHandler {/*** 模拟数据库数据*/private List<WebFluxDemo> data = IntStream.range(1, 5).mapToObj(x -> WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x).build()).collect(Collectors.toList());/*** 初始化数据,并返回数据*/public Mono<ServerResponse> init(ServerRequest request) {this.data = IntStream.range(1, 5).mapToObj(x -> WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x).build()).collect(Collectors.toList());return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Flux.fromIterable(data),WebFluxDemo.class);}/*** 查询所有的数据*/public Mono<ServerResponse> getAll(ServerRequest request) {return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Flux.fromIterable(data), WebFluxDemo.class);}/*** 根据id,查询数据*/public Mono<ServerResponse> getById(ServerRequest request) {String id = request.pathVariable("id");WebFluxDemo first = data.stream().filter(x -> x.getId().equals(id)).findFirst().get();Mono<ServerResponse> result = null;if (Objects.isNull(first)) {result = ServerResponse.notFound().build();} else {result = ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just(first),WebFluxDemo.class);}return result;}/*** 根据id,删除数据*/public Mono<ServerResponse> deleteById(ServerRequest request) {String id = request.pathVariable("id");WebFluxDemo first = data.stream().filter(x -> x.getId().equals(id)).findFirst().orElseGet(()->null);Mono<ServerResponse> result = null;if (Objects.isNull(first)) {result = ServerResponse.notFound().build();} else {data.remove(first);result = ServerResponse.ok().build();}return result;}/*** 添加数据*/public Mono<ServerResponse> add(ServerRequest request) {return request.bodyToMono(WebFluxDemo.class).flatMap(demo -> {data.add(demo);return ServerResponse.ok().body(Mono.just(demo), WebFluxDemo.class);}).switchIfEmpty(ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).body(Mono.just("请求错误"),String.class));}}
handler 返回值是 Mono
SSE 方式查询结果 ServerResponse.ok().contentType(MediaType.APPLICATION_JSON):
/*** 查询所有的数据*/public Mono<ServerResponse> getAll(ServerRequest request) {// return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)// .body(Flux.fromIterable(data), WebFluxDemo.class);return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(Flux.fromStream(IntStream.range(1, 5).mapToObj(x -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x).build();})),WebFluxDemo.class);}
routerfunction 方式如何设定为 text/event-stream (sse server send event),难道默认就是?
这个是需要设置的 ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM)。
普通 domain 类 WebFluxDemo
package com.demo.webflux;import java.time.LocalDate;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;/*** 测试webflux 的一个对象信息*/@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class WebFluxDemo {private String id;private String name;private Integer age;private LocalDate birthday;}
数据库的crud操作
关系型引入 r2dbc,MongoDB已支持
小结
流式编程,就是要重点关注 publisher、subscriber、flow、flux、mono这几个。
问题
- webflux 中如何使用servlet原生编程
- web与webflux能否同时使用
- routerfunction 方式如何设定为 text/event-stream (sse server send event),难道默认就是?如何做到文章分段返回与渲染(长文章,图片)
- 关于routerfunction 方式的参数校验与errorhandler,mono.error等
- 反应式编程,什么时候指定 request请求数量的?publisher的关闭,subscriber 的订阅?
