1、核心概念
1.1、异步非阻塞
SpringMVC是同步阻塞的IO模型,资源浪费相对来说比较严重,当我们在处理一个比较耗时的任务时,例如:上传一个比较大的文件,首先,服务器的线程一直在等待接收文件,在这期间它就等在那儿,什么都干不了,好不容易等到文件来了并且接收完毕,我们又要将文件写入磁盘,在这写入的过程中,又要等到文件写完才能去干其它的事情。这一前一后的等待,不浪费资源么?
Spring WebFlux就是来解决这问题的,Spring WebFlux可以做到异步非阻塞。还是上面那上传文件的例子,Spring WebFlux是这样做的:线程发现文件还没准备好,就先去做其它事情,当文件准备好之后,通知线程来处理,当接收完毕写入磁盘的时候(根据具体情况选择是否做异步非阻塞),写入完毕后通知这个线程再来处理(异步非阻塞情况下)。
1.2、响应式(reactive)函数编程
1.3、不再拘束于Servlet容器
以前,我们的应用都运行于Servlet容器之中,例如我们大家最为熟悉的Tomcat, Jetty…等等。而现在Spring WebFlux不仅能运行于传统的Servlet容器中(前提是容器要支持Servlet3.1,因为非阻塞IO是使用了Servlet3.1的特性),还能运行在支持NIO的Netty和Undertow中。
1.4、反应式
反应式系统具有某些特性,使其成为低延迟、高吞吐量工作负载的理想选择。projectreactor和Spring组合一起工作,使开发人员能够构建响应性、弹性、弹性和消息驱动的企业级反应系统。
1.5、反应式处理
反应式处理是一种范例,它使开发人员能够构建非阻塞、异步的应用程序,从而能够处理背压(流控制)。
为什么使用反应式处理?
反应式系统更好地利用了现代处理器。此外,在反应式编程中加入背压可以确保解耦组件之间具有更好的弹性。
背压
在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。需要强调的是:这句话的重点不在于「上游生产速度大于下游消费速度」,而在于Buffer溢出。
反应式宣言
英文: https://www.reactivemanifesto.org/
中文: https://www.reactivemanifesto.org/zh-CN
反应式系统的特质
- 即时响应性: :只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。
- 回弹性:系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。
- 弹性: 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
- 消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。
3、Reactive Stream
3.1、定义
Reactive Stream 提供异步流处理和无阻塞背压 标准规范。<br />[https://github.com/reactive-streams/reactive-streams-jvm](https://github.com/reactive-streams/reactive-streams-jvm)<br />[http://www.reactive-streams.org/](http://www.reactive-streams.org/)<br />****<br />**API组件**
- Publisher 发布者
- Subscriber 订阅者
- Subscription 契约 /订阅关系 ,实现背压的关键
- Processor 中间处理
JDK9 Flow API
**
java.util.concurrent.Flow
@FunctionalInterface
public static interface Publisher {
// 发布者与订阅者建立订阅关系
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber {
// 第一次建立订阅关系调用
public void onSubscribe(Subscription subscription);
//继续接收数据
public void onNext(T item);
//出现异常
public void onError(Throwable throwable);
//发送完成
public void onComplete();
}
//中间处理角色
public static interface Processor<T,R> extends Subscriber, Publisher {
}
返回发布服务器或订阅服务器缓存的默认值,可以在没有其他约束的情况下使用。
static final int DEFAULT_BUFFER_SIZE = 256;
/**
• Returns a default value for Publisher or Subscriber buffering,
• that may be used in the absence of other constraints.
•
• @implNote
• The current value returned is 256.
•
• @return the buffer size value
*/
public static int defaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
3.2、Reactive Stream Demo
1、创建生产者
SubmissionPublisher submissionPublisher = new SubmissionPublisher();
2、创建消费者
Flow.Subscriber subscriber = new Flow.Subscriber() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("建立发布订阅关系");
//建立订阅关系
this.subscription = subscription;
//请求数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
//接收数据进行处理
System.out.println("接收到数据 = " + item);
//处理完成继续请求数据 (调节数据接收频率)
this.subscription.request(1);
//通知生产者不再接收数据
//this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
//出现异常
throwable.printStackTrace();
//通知生产者不再接收数据
this.subscription.cancel();
}
@Override
public void onComplete() {
//全部数据处理完成(发布者关闭)
System.out.println("数据处理结束");
}};
1、发布者与消费者建立订阅关系
submissionPublisher.subscribe(subscriber);
2. 发布者发布数据
submissionPublisher.submit(100);
3、发布者关闭
submissionPublisher.close();
4. 等待数据发送
try {
Thread.currentThread().join(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
结果;
运行原理
连续发布500条数据
for (int i = 0; i <500 ; i++) { System.out.println("发布数据 = " + i); submissionPublisher.submit(i); }
4、Project Reactor
官方文档: https://projectreactor.io/docs/core/release/reference/
4.1、 定义
Reactor
Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM
Reactive programming
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
反应式编程是一种关注数据流和变更传播的异步编程范式。这意味着可以通过所使用的编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流
4.2、Reactor 核心功能
Flux: 生产者发布的0-N个异步序列元素
Flux is a standard Publisher that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.
Flux 创建Demo
Flux ints = Flux.range(1, 4);
Flux seq1 = Flux.just("bole1", "bole2", "bole3");
List iterable = Arrays.asList("bole_01", "bole_02", "bole_03");
Flux seq2 = Flux.fromIterable(iterable);
seq2.subscribe(i -> System.out.println(i));
Mono:是一种特别的发布者,它最多发出一个条目。
Mono is a specialized Publisher that emits at most one item and then (optionally) terminates with an
onComplete signal or an onError signal.
Mono data = Mono.just("bole");
Mono noData = Mono.empty();
m.subscribe(i -> System.out.println(i));
4.3、Project Reactor测试
public class ReactorDemo {
public static void main(String[] args) {
//Mono创建方式 :1个元素
Mono<String> mono= Mono.just("mono");
mono.subscribe(i-> System.out.println("m:"+mono));
//Mono创建方式 :空序列
Mono<String> monoEmpty= Mono.empty();
monoEmpty.subscribe(i-> System.out.println("monoEmpty:"+mono));
//Flux创建方式1
Flux<String> flux0 = Flux.just("bole", "bole1", "bole3");
Flux<String> flux1 = Flux.fromArray(new String[]{"bole", "bole1", "bole3"});
Flux<String> flux2 = Flux.fromStream(Stream.of("bole", "bole1", "bole3"));
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("bole", "bole1", "bole3"));
Flux<Integer> flux4 = Flux.range(1, 10);
//使用baseSubscriber
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling after having received " + integer);
request(1);
//cancel();
}
});
//Flux创建方式 2 : 程序创建
Flux<String> flux5 = Flux.generate(() -> 0, (i, sink) -> {
sink.next("bole" + i);
if (i > 10) {
sink.complete();
}
return i + 1;
});
flux0.doOnSubscribe(i -> {
i.request(2);
System.out.println("sub:" + i);
}).doOnNext(i -> {
System.out.println("next:" + i);
}).doOnComplete(() -> {
System.out.println("Complete");
}).subscribe();
/* Flux.generate(AtomicLong::new, (l, sink) -> {
long v = l.getAndIncrement();
sink.next("bole" + v);
if (v == 3) {
sink.complete();
}
return l;
}, l -> {
}).subscribe(i -> System.out.println("atomic :" + i));*/
5、webflux整合R2DBC
5.1 RouterFunction和HandlerFunction介绍
In WebFlux.fn, an HTTP request is handled with a HandlerFunction: a function that takes ServerRequest and returns a delayed ServerResponse (i.e. Mono). Both the request and the response object have immutable contracts that offer JDK 8-friendly access to the HTTP request and response. HandlerFunction is the equivalent of the body of a @RequestMapping method in the annotation-based programming model.
Incoming requests are routed to a handler function with a RouterFunction: a function that takes ServerRequest and returns a delayed HandlerFunction (i.e. Mono). When the router function matches, a handler function is returned; otherwise an empty Mono. RouterFunction is the equivalent of a @RequestMapping annotation, but with the major difference that router functions provide not just data, but also behavior.
RouterFunctions.route() provides a router builder that facilitates the creation of routers, as the following example shows:
5.1 应用依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
5.2 配置文件
server.port=8080
spring.r2dbc.url=r2dbcs:mysql://localhost:3306/zyl
spring.r2dbc.name=root
spring.r2dbc.password=123456
5.3 DAO层代码
public interface StudentRepository extends ReactiveCrudRepository<Student,Integer> {
}
5.3 编写HandlerFunction
public class StudentHandler {
private final StudentRepository studentRepository;
public StudentHandler(StudentRepository studentRepository) {
this.studentRepository = studentRepository;
}
public Mono<ServerResponse> getStudent(ServerRequest request) {
Flux<Student> all = studentRepository.findAll();
return ok().contentType(APPLICATION_JSON).body(all, Student.class);
}
public Mono<ServerResponse> save(ServerRequest request) {
Mono<Student> studentMono = request.bodyToMono(Student.class);
return ok().contentType(APPLICATION_JSON).body(studentRepository.saveAll(studentMono),Student.class);
}
}
5.4 编写RouterFunction
@Configuration
public class StudentRouter {
@Bean
public RouterFunction<ServerResponse> student(StudentHandler studentHandler) {
return RouterFunctions.route().
GET("/getAll",RequestPredicates.accept(MediaType.APPLICATION_JSON),studentHandler::getStudent).
POST("/save",RequestPredicates.accept(MediaType.APPLICATION_JSON),studentHandler::save).build();
}
}