1:SpringWebFlux介绍
(1):Spring5添加新的模块,用于web开发的,功能类似于SpringMVC,Webflux使用了当前一种比较流行的响应式编程出现的框架
(2):使用传统web框架,eg:springmvc框架,struts2框架,这些都是基于servlet容器,webflux是一种异步非阻塞的框架,异步非阻塞的框架是在Servlet 3.1+之后才支持的;核心是基于Reactor的相关API实现的,Reactor就是响应式编程;
(3):什么是异步非阻塞
*异步、同步,针对的是调用者;调用者发送请求,如果我们需要等待对方相应之后才去做其他的事情叫做同步,如果发送请求之后不需要等待对方回应就去做其他事情就是异步。
*非阻塞、阻塞,针对被调用者。被调用者收到请求之后,做完请求任务之后才给出反馈就是阻塞;如果收到请求之后就立马做出反馈然后在执行请求任务就是非阻塞。
**上面都是针对对象不一样
eg:生活中等公交车,在等公交车中我什么都不做就只是等公交车到就是同步,如果在等待过程中我还可以做其他的事就是异步。
(4):Webflux特点:
非阻塞式:在有限的资源下,提高系统吞吐量和伸缩性,以reactor为基础实现响应式编程;
函数式编程:sprin5就是基于java8,webflux就可以使用java8中的函数式编程方式实现路由请求;
(5):springmvc和webflux对比
俩个框架都可以使用注解方式,都运行在tomcat容器中
springmvc采用命令式编程(一行一行代码执行)
webflux采用异步响应式编程()
2:响应式编程
(1):什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
(2):Java8及其之前的版本进行演示,后续版本有所不同。响应式编程本质上利用了一种设计模式即为观察者模式。Java8中提供了俩个观察者模式的类Observer 和Observble。观察你关系的数据的变化,做出相应处理。
(3):java代码实现
package com.example.demo.reactor8;
import java.util.Observable;
public class ObserverDoemo extends Observable {
public static void main(String[] args) {
ObserverDoemo observer = new ObserverDoemo();
// 添加观察者
observer.addObserver((o, str) -> {
System.out.println("发生变化");
});
observer.addObserver((o, str) -> {
System.out.println("手动被观察者通知,准备改变");
});
// 添加监听,知道数据变化
observer.setChanged();
// 添加通知
observer.notifyObservers();
}
}
Webflux其底层并不是用到了Observer 和Observble实现的,而是使用到了java9及其以上的版本中的类和方法做了封装做成了webflux。即是Flow类,Webflux的底层就是对此类做了封装,pulisher消息发布,Subscriber消息订阅。发布订阅
3:响应式编程(Reactor实现)
(1):响应式编程操作中,Reactor是满足Reactive规范框架
(2):Reactor有俩大核心类,Mono和Flux,这俩大类实现接口Publisher,提供丰富的操作符。
Flux对象实现发布者,返回实现N个元素;
Mono对象实现翻发布者,返回0或1一个元素;
(3):Flux和Mono都是数据流的发布者,使用俩者都可以发出三种数据信号:元素值、错误信号、完成信号。
错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了。
错误信息:终止数据流同时把错误信息传递给订阅者。
(4):代码示例
1:引入依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
package com.example.demo.reactor8;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TextReactor {
public static void main(String[] args) {
// just方法可以直接声明,数据流并没有发出,只有进行订阅之后才会发生。
// subscribe 订阅
// 可多个
Flux.just("1", "2").subscribe(System.out::print);
// 一个或没有
Mono.just("1");
// 其他方式
String[] arr = { "1", "2" };
// 数组
Flux.fromArray(arr);
List<String> list = Arrays.asList(arr);
// 集合
Flux.fromIterable(list);
Stream<String> stream = list.stream();
// 数据流
Flux.fromStream(stream);
}
}
(5)三种信号特点
错误信号,完成信号都是终止信号,但是不能共存。
如果没有发送任何的元素值,而是直接发送错误信号或完成信号,就表示是空数据流;
如果没有错误信号,没有完成信号,就表示是无限数据流;
(6):调用just或其他方式只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不发生。
(7):操作符
对数据流进行一道道操作,就称为操作符。比如工厂流水线;
map和flatMap区别
map:元素映射为新的元素
flatMap:把元素映射为流,在把多个流合并成一个大流做返回
4:WebFlux执行流程和核心API
springWebflux基于Reactor,默认容器是netty,netty是高性能的IO框架,异步非阻塞框架。
(1):netty
BIO & NIO & NIO常见框架
BIO 、NIO
(2):springwebflux执行过程和springmvc相似的
springwebflux核心控制器DispatchHandler,实现接口WebHandler。
WebHandler接口中有一个方法handle
DispatchHandler
(3):DispatchHandler主要负责请求的处理
HandlerMapping 请求查询到处理的方法
HandlerAdapter 真正负责请求的处理方法
HandlerResultHandler 相应结果处理
(4):springwebflux实现函数式编程,俩个接口:RouterFunction和HandlerFunction
RouterFunction(实现路由功能,请求转发给对应的 handler)
HandlerFunction(处理请求生成响应的函数)。
3.核心任务定义两个函数式接口的实现并且启动需要的服务器。
4.SpringWebflux 请求和响应不再是ServletRequest和ServletResponse ,而是
ServerRequest 和 ServerResponse
5:SpringWebFlux(基于注解编程模型)
(1):创建springboot工程,引入weblfux依赖。
(2):启动配置端口号 server.port=8081
(3):创建接口,定义操作方法
package com.example.demo.service;
import com.example.demo.entity.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// 用户操作接口
public interface UserService {
// 根据id查询用户
// 因为查询所以用户是1个或0,所以使用Mono
Mono<User> getUserById(int id);
// 查询所有用户
// 因为查询所以用户是多个,所以使用Flux
Flux<User> queryUserList();
// 返回 添加用户
Mono<Void> saveUser(Mono<User> user);
}
接口实现类
package com.example.demo.service.impl;
import java.util.HashMap;
import java.util.Map;
import com.example.demo.entity.User;
import com.example.demo.service.UserService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UserServiceImpl implements UserService {
// 创建map 模拟数据库
private final Map<Integer, User> t_user = new HashMap<Integer, User>();
// 无参构造存入map数据库
public UserServiceImpl() {
this.t_user.put(1, new User("张三", "男", "12"));
this.t_user.put(2, new User("小丽", "女", "11"));
}
@Override
public Mono<User> getUserById(int id) {
// 根据id查询用户
return Mono.justOrEmpty(this.t_user.get(id));
}
@Override
public Flux<User> queryUserList() {
// 查询所有用户
return Flux.fromIterable(this.t_user.values());
}
@Override
public Mono<Void> saveUser(Mono<User> user) {
// 便利内容
return user.doOnNext(person -> {
// 向map集合添加
int id = this.t_user.size() + 1;
this.t_user.put(id, person);
}).thenEmpty(Mono.empty());// thenEmpty终止信号,清空Mono
}
}
(4):编写controller层
package com.example.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.entity.User;
import com.example.demo.service.UserService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// 交给spring管理并返回json
@RestController
public class UserController {
// 注入service
@Autowired
private UserService userService;
// id查询
@GetMapping("/user/{id}")
public Mono<User> getUserById(@PathVariable int id) {
return userService.getUserById(id);
}
// 查询所有
@GetMapping("/queryUserList")
public Flux<User> queryUserList() {
return userService.queryUserList();
}
// 添加
@PostMapping("/addUser")
public Mono<Void> add(@RequestBody User user) {
return userService.saveUser(Mono.just(user));
}
}
(5):启动springboot
(6):启动访问服务测试
http://localhost:8081/user/1 1为接口实现类中的map key为1的值结果为
*说明
springmvc实现方式:同步阻塞方式,基于springmvc+servlet+tomcat
springwebflux实现方式:异步非阻塞式,基于springwebflux+reactor+netty
git@github.com:My-Jun/spring5_demo7.git
6:SpringWebFlux(基于函数式编程模型)
(1):在使用函数式编程模式操作时候,需要自己初始化服务器。
(2):基于函数式编程模式时候,有俩个核心接口:
RouterFunction(实现路由功能,请求转发给对应的handler)
HandlerFunction(处理请求生成响应的函数)
(3):核心任务定义俩个函数式接口的实现并且启动需要的服务器。
(4):springwebflux请求和相应不再是ServeletRequest(请求)和ServletResponse(相应),而且
ServerRequest和ServerResponse
具体实现过程
(1):把注解编程代码负责一份,进行修改
(2):创建Handler(具体是实现方式)具体操作
package com.example.demo.handler;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.example.demo.entity.User;
import com.example.demo.service.UserService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
// id查询
public Mono<ServerResponse> getUserById(ServerRequest request) {
// 获取id值
String pathVariable = request.pathVariable("id");
// 空值处理
Mono<ServerResponse> noFound = ServerResponse.notFound().build();
Integer id = Integer.valueOf(pathVariable);
// 调用接口返回得到数据
Mono<User> userMono = this.userService.getUserById(id);
// 把userMono进行转换返回,使用Reactor操作符flatmap
MediaType contentType = new MediaType("application", "json");
return userMono.flatMap(person -> ServerResponse.ok().contentType(contentType)
.body((BodyInserter<?, ? super ServerHttpResponse>) (person))).switchIfEmpty(noFound);
}
}
(3):router路由
详情参考
Webflux实现==>基于函数式编程模型
(4):使用webclient调用
git@github.com:My-Jun/spring5_webflux2.git