spring5 的web开发技术栈
非阻塞的
运行在 netty 上的
需要 servlet3.1
官网链接——recative
目前主流关系型数据库是不支持 reactor 反应式编程
水平扩展,就是加人;垂直扩展,就是加班;
异步servlet、同步Servlet
创建dynamic web project,编写 servlet3.1 的 servlet。
同步的servlet
package com.demo;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/TestSychServlet")
public class TestSychServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long t = System.currentTimeMillis();
doSomething(request, response);
System.out.println("TestSychServlet use time:"+(System.currentTimeMillis() - t));
}
private void doSomething(HttpServletRequest request, HttpServletResponse response) throws IOException {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
response.getWriter().write("TestSychServlet done");
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
}
异步 servlet
异步servlet 编写步骤:
- 开启异步,AsyncContext asyncContext = request.startAsync();
- 另起线程(或线程池)执行业务逻辑,比如 CompletableFuture
- 业务处理完成,需要通知上下文 asyncContext.complete();
异步例子
```java package com.demo;
import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /**
- 异步servlet 编写步骤:
- 开启异步,AsyncContext asyncContext = request.startAsync();
- 另起线程执行业务逻辑,比如 CompletableFuture
- 业务处理完成,需要通知上下文 asyncContext.complete();
@author DELL / @WebServlet(value = “/TestAsychServlet2”, asyncSupported = true) public class TestAsychServlet2 extends HttpServlet { private static final long serialVersionUID = 1L;
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
long t = System.currentTimeMillis();
// 开启异步
AsyncContext asyncContext = request.startAsync();
// 执行业务逻辑代码
CompletableFuture.runAsync(() -> doSomething(asyncContext, asyncContext.getRequest(), asyncContext.getResponse()));
System.out.println("TestAsychServlet2 use time:" + (System.currentTimeMillis() - t));
}
private void doSomething(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) {
// 模拟耗时
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(">>>>>>>>>>> 3 seconds");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
servletResponse.getWriter().write("TestAsychServlet2 done");
} catch (IOException e) {
e.printStackTrace();
}
// 调用上下文中 AsyncContext asyncContext 通知完成结束
asyncContext.complete();
}
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
doGet(request, response);
}
}
<a name="HWlOw"></a>
## 输出结果:
TestAsychServlet2 use time:1<br />
>>>>>>>>>>> 3 seconds
<br />TestAsychServlet2 use time:0
<br />>>>>>>>>>>> 3 seconds
<br />TestSychServlet use time:3001
<a name="gKfwU"></a>
## 小结
异步和同步,对于前端browser 来说,都是相同耗时获取到响应结果,但是servlet本身时间是不同的。传统式 servlet 被业业务阻塞,异步式servlet没有。
<a name="dqJAj"></a>
# reactor
reactor 就是 jdk8 stream + jdk9 reactive stream<br />flux [0-n]<br />mono [0-1]
<a name="ps1g8"></a>
## reactor 中的 flux 小例子
```java
package com.demo;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
public class TestReactorDemo {
public static void main(String[] args) {
// Reactor 相当于 jdk8 stream + jdk9 reactive stream
// Mono 代表 0-1 个元素
// Flux 代表 0-N 个元素
String[] strArr = {"1", "2", "3"};
// 创建流
Flux.fromArray(strArr)
// 流中间处理
.map(x -> Integer.parseInt(x))
// 结束流,就是一个消费者
.subscribe(System.out::println);
Subscriber<Integer> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系
this.subscription = subscription;
System.out.println(">>>>>>>>>> 建立订阅");
// 发个订阅要求
subscription.request(1);
}
@Override
public void onNext(Integer integer) {
// 订阅到的数据,并进行相关处理
System.out.println(">>>>>>>>>>>>>> 数据:" + integer);
// 继续订阅 request,或者结束 cancel
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
// 异常处理
}
@Override
public void onComplete() {
// 发布者关闭
System.out.println(">>>>>>>> 结束了");
}
};
Flux.fromArray(strArr)
// 流中间处理
.map(x -> Integer.parseInt(x))
// 结束流,就是订阅,subscribe
.subscribe(subscriber);
}
}
Flux 的 subscribe 方法有多个,多态,api 如下:
Flux 小例子小结
Flux 惰性求值,流在最终操作时才开始有值产生(个人感觉就是 java8 流式处理一样的)
webflux
引入的是 reactive web(对应springWebFlux) ,不是原来的 web (对应 SpringMVC)
lombok 功能添加,-javaagent:lombok.jar (编辑 sts.ini 或者 eclipse.ini,idea 直接插件 lombok)
官网 webflux
传统方式
传统方式 demo 1 简单使用
package com.demo;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
* 传统 SpringMVC 方式,使用 webflux
*/
@RestController
@RequestMapping("/v1/test/")
public class TestController {
/**
* 传统 SpringMVC
*/
@GetMapping("/1")
public String get1() {
return "get1";
}
/**
* 传统方式,webflux
* @return
*/
@GetMapping("/2")
public Mono<String> get2() {
return Mono.just("get2");
}
}
传统方式 demo 2 模拟数据处理
package com.demo;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
* 传统 SpringMVC 方式,使用 webflux
*/
@Slf4j
@RestController
@RequestMapping("/v1/test/demo")
public class TestDemo2Controller {
/**
* 传统 SpringMVC
*/
@GetMapping("/1")
public String get1() {
log.info("get1 start");
String result = doSomething();
log.info("get1 end");
return result;
}
/**
* 传统方式,webflux
* @return
*/
@GetMapping("/2")
public Mono<String> get2() {
log.info("get2 start");
Mono<String> result = Mono.fromSupplier(this::doSomething);
log.info("get2 end");
return result;
}
/**
* 模拟业务处理
* @return
*/
private String doSomething() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "do something";
}
}
demo 2 访问结果:
浏览器:
日志:
2021-04-24 10:31:51.118 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get2 start
2021-04-24 10:31:51.118 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get2 end
2021-04-24 10:32:10.849 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get1 start
2021-04-24 10:32:13.849 INFO 17036 --- [ctor-http-nio-2] com.demo.TestDemo2Controller : get1 end
中的关注time,还有后端的日志时间打印,可以发现 反应式编程是惰性的。
传统方式 demo 3 使用 Flux 返回多条数据
package com.demo;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
/**
* 传统 SpringMVC 方式,使用 webflux Flux 返回对象
*
* Flux 是 0-n 个元素
*/
@Slf4j
@RestController
@RequestMapping("/v1/test/flux")
public class TestFluxController {
/**
* 传统 webflux,不指定产生方式,一次性全部返回
*/
@GetMapping("/1")
public Flux<String> get1() {
log.info("get1 called");
return Flux.fromStream(IntStream.range(1, 6).mapToObj(x -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from flux data:" + x;
}));
}
/**
* 传统方式,webflux,指定产生方式,一次只返回一个
*/
@GetMapping(value = "/2", produces = {"text/event-stream"})
public Flux<String> get2() {
log.info("get2 called");
return Flux.fromStream(IntStream.range(1, 6).mapToObj(x -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from flux data:" + x;
}));
}
}
结果:
http://localhost:8080/v1/test/flux/1 请求结果:
http://localhost:8080/v1/test/flux/2 请求结果:
数据逐条显示在浏览器上!!!使用SSE server-sent event,基于 html5;使用传统方式时,一般都会写两个接口,一个是一次全部返回(不指定 produces),一个是逐条返回(指定 produces 为 text/event-stream)
使用 servlet 编写 SSE(使用的是 dynamic web的工程,不是 spring webflux)
package com.demo;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* 测试使用 SSE server send event
*
* 逐条将结果返回到前端
*/
@WebServlet(name = "SSEDemoServlet", value = "/SSEDemoServlet")
public class SSEDemoServlet extends HttpServlet {
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
doGet(request, response);
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// 使用SSE 需要设置 response 消息头
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
// 模拟数据
for (int i = 0; i < 5; i++) {
// 添加event id标识,格式:id: + id值 + 一个回车
response.getWriter().write("id:" + Math.random() + "\n");
// 添加event type标识,格式:event: + 事件名称 + 一个回车
response.getWriter().write("event:me\n");
// 数据的格式需要是这样: data: + 数据 + 2个回车
response.getWriter().write("data:" + i + "\n\n");
response.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>sse (Server Send Event)测试</title>
</head>
<body>
<h1>测试 SSE Server Send Event</h1>
</body>
<script type="text/javascript">
// 初始化参数,参数为 url,此处使用相对路径
// sse 会自动链接服务器,不关闭一直会链接
var sse = new EventSource("SSEDemoServlet");
// 收到信息时的处理
sse.onmessage = function (ev) {
console.log("message", ev.data, ev);
}
// 添加时间监听
sse.addEventListener("me", function (evt) {
console.log("me event", evt.data, evt);
if (evt.data == 3) {
// 关闭链接
sse.close();
}
})
</script>
</html>
SSE 不关闭会一直拿数据,自动链接。
注意:在 spring webflux 中是没有 servlet3.1 jar包的,即使导入依赖
<!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
对应的 servlet 无法正常访问(状态是200,但是response 的 header 不是自己设置的 text/event-stream),搞得没办法,先且在 dynamic web 项目中测试上面的 SSE demo。
使用 routerfunction 方式
主要变化是将原来的 HttpServletRequest 抽象成 ServerRequest,HttpServletResponse 抽象为 ServerResponser
开发步骤
handlerFunction(输入 ServerRequest,返回 ServerResponse)
- RouterFunction(请求url 和 HandlerFunction 对应起来)
- 建立一个HttpHandler 处理相关的请求
- Server 处理
routerfunction 代码
```java package com.demo.webflux;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse;
/**
测试 webflux,使用 路由函数, routerfunction */ @Configuration public class TestWebFluxByRouterFunction {
@Bean RouterFunction
webfluxDemoRouter(WebFluxDemoHandler handler) { return RouterFunctions
// 相当于类上的路径
.nest(RequestPredicates.path("/v1/web/flux/demo"),
RouterFunctions
// 相当于方法上的路径
.route(RequestPredicates.GET("/"), handler::getAll)
.andRoute(RequestPredicates.DELETE("/{id}"), handler::deleteById)
.andRoute(RequestPredicates.POST("/add")
// 设置请求格式
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::add)
.andRoute(RequestPredicates.GET("/init"), handler::init)
.andRoute(RequestPredicates.GET("/{id}"), handler::getById)
);
}
}
需要指定为 configuration,内部创建了 Bean 对象。
<a name="V9H0D"></a>
### 建立一个 handler: WebFluxDemoHandler
```java
package com.demo.webflux;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 自定义一个 HandlerFunction,传入参数 {@link ServerRequest ServerRequest}, 返回 Mono<T> 此处 T 为 {@link ServerResponse
* ServerResponse}
*
* @see org.springframework.web.reactive.function.server.HandlerFunction#handle(ServerRequest)
*/
@Component
public class WebFluxDemoHandler {
/**
* 模拟数据库数据
*/
private List<WebFluxDemo> data = IntStream.range(1, 5)
.mapToObj(x -> WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x)
.build())
.collect(Collectors.toList());
/**
* 初始化数据,并返回数据
*/
public Mono<ServerResponse> init(ServerRequest request) {
this.data = IntStream.range(1, 5)
.mapToObj(
x -> WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name("xiaohui" + x)
.build())
.collect(Collectors.toList());
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Flux.fromIterable(data),
WebFluxDemo.class);
}
/**
* 查询所有的数据
*/
public Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(Flux.fromIterable(data), WebFluxDemo.class);
}
/**
* 根据id,查询数据
*/
public Mono<ServerResponse> getById(ServerRequest request) {
String id = request.pathVariable("id");
WebFluxDemo first = data.stream().filter(x -> x.getId().equals(id)).findFirst().get();
Mono<ServerResponse> result = null;
if (Objects.isNull(first)) {
result = ServerResponse.notFound().build();
} else {
result = ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just(first),
WebFluxDemo.class);
}
return result;
}
/**
* 根据id,删除数据
*/
public Mono<ServerResponse> deleteById(ServerRequest request) {
String id = request.pathVariable("id");
WebFluxDemo first = data.stream().filter(x -> x.getId().equals(id)).findFirst().orElseGet(()->null);
Mono<ServerResponse> result = null;
if (Objects.isNull(first)) {
result = ServerResponse.notFound().build();
} else {
data.remove(first);
result = ServerResponse.ok().build();
}
return result;
}
/**
* 添加数据
*/
public Mono<ServerResponse> add(ServerRequest request) {
return request.bodyToMono(WebFluxDemo.class).flatMap(demo -> {
data.add(demo);
return ServerResponse.ok().body(Mono.just(demo), WebFluxDemo.class);
}).switchIfEmpty(ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).body(Mono.just("请求错误"),
String.class));
}
}
handler 返回值是 Mono
SSE 方式查询结果 ServerResponse.ok().contentType(MediaType.APPLICATION_JSON):
/**
* 查询所有的数据
*/
public Mono<ServerResponse> getAll(ServerRequest request) {
// return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
// .body(Flux.fromIterable(data), WebFluxDemo.class);
return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(Flux.fromStream(IntStream.range(1, 5)
.mapToObj(x -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return WebFluxDemo.builder().id("1" + x).age(30 + x).birthday(LocalDate.now()).name(
"xiaohui" + x)
.build();
})),
WebFluxDemo.class);
}
routerfunction 方式如何设定为 text/event-stream (sse server send event),难道默认就是?
这个是需要设置的 ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM)。
普通 domain 类 WebFluxDemo
package com.demo.webflux;
import java.time.LocalDate;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 测试webflux 的一个对象信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebFluxDemo {
private String id;
private String name;
private Integer age;
private LocalDate birthday;
}
数据库的crud操作
关系型引入 r2dbc,MongoDB已支持
小结
流式编程,就是要重点关注 publisher、subscriber、flow、flux、mono这几个。
问题
- webflux 中如何使用servlet原生编程
- web与webflux能否同时使用
- routerfunction 方式如何设定为 text/event-stream (sse server send event),难道默认就是?如何做到文章分段返回与渲染(长文章,图片)
- 关于routerfunction 方式的参数校验与errorhandler,mono.error等
- 反应式编程,什么时候指定 request请求数量的?publisher的关闭,subscriber 的订阅?