RX 文档和lib
不可错过的文档
依赖
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
API 组件
- Publisher : 潜在数据的提供者/数据来源是Subscriber的监听结果? 🤔 这里的描述还不太清楚 ==>obserable
- 对应Rx中的概念
- Observable 可观察到的
- Observer 观察者
- reactor
- Flux
- mono
- 对应Rx中的概念
- Subscriber : LambdaMonoSubscriber实现了该类
- Subscription
- Processor
它在实际的使用过程中被拆分了,例如 Subscriber
的3个方法 onNext
onComplete
onError
的实现在 Spring
的使用中被替换成了 Consumer
的使用方式,而在其内部会重新聚合 Subscriber
实例,实际 Class
为 LambdaMonoSubscriber
, 在实例化的时候组装订阅者并且自动请求 requests()
The subscriber will automatically request Long.MAX_VALUE onSubscribe
从这里的处理可以看出来,其实所有的操作在其内部都会被封装成 Publisher
和 Subscriber
的形式去进行处理. 其处理的开始时间是在 .Subscribe
的那个瞬间开始执行的.
这里的 subscribe
对应了 1、publisher.subscribe()
这个阶段. 那么从流程上看就进入了 2、Subscriber.subscribe
阶段了 , 这里是具体的 Publisher
执行 subscribe
操作,对应到 lib
里边就是 Mono和Flux
的各种实现
Mono/Flux 是抽象类,具体的实现还是需要依靠子类的,例如FluxArray/MonoJust等等
subscribe是必须的,如果没有这部分订阅上,前边的逻辑都是没有意义的,同时在
webflux
中这个方式也体现出来了. 具体的代码是在 HttpServerHandle 里边
#HttpServerHandle
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) {
try {
if (log.isDebugEnabled()) {
log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
}
HttpServerOperations ops = (HttpServerOperations) connection;
Mono.fromDirect(handler.apply(ops, ops))
.subscribe(ops.disposeSubscriber());
}
catch (Throwable t) {
log.error(format(connection.channel(), ""), t);
//"FutureReturnValueIgnored" this is deliberate
connection.channel()
.close();
}
}
}
小总结
多方查看一下数据以后可以先做一个小小的总结😂
- Reactor Streams 是规范 (Publisher接口)
- Reactor 实现了Reactor Streams (Mono/Flux 2个实现)
- Web Flux 实现了以Reactor为基础的,web领域的响应式编程框架 (业务代码)
参考文档
defer 和just的区别. 实际代码
//使用方式1
return Mono.defer(() -> {
return response.writeWith(Flux.just(wrap));//设置body
});
//使用方式2
return response.writeWith(Mono.just(wrap));
concatMap 的数据集是有序的 | flatMap 的数据集是无序的