仅供参考,个人理解,必然有误

RX 文档和lib

不可错过的文档

依赖

  1. <dependency>
  2. <groupId>org.reactivestreams</groupId>
  3. <artifactId>reactive-streams</artifactId>
  4. <version>1.0.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.reactivestreams</groupId>
  8. <artifactId>reactive-streams-tck</artifactId>
  9. <version>1.0.3</version>
  10. <scope>test</scope>
  11. </dependency>

API 组件

  • Publisher : 潜在数据的提供者/数据来源是Subscriber的监听结果? 🤔 这里的描述还不太清楚 ==>obserable
    • 对应Rx中的概念
      • Observable 可观察到的
      • Observer 观察者
    • reactor
      • Flux
      • mono
  • Subscriber : LambdaMonoSubscriber实现了该类
  • Subscription
  • Processor

它在实际的使用过程中被拆分了,例如 Subscriber 的3个方法 onNext onComplete onError 的实现在 Spring 的使用中被替换成了 Consumer 的使用方式,而在其内部会重新聚合 Subscriber 实例,实际 ClassLambdaMonoSubscriber , 在实例化的时候组装订阅者并且自动请求 requests()

The subscriber will automatically request Long.MAX_VALUE onSubscribe

从这里的处理可以看出来,其实所有的操作在其内部都会被封装成 PublisherSubscriber 的形式去进行处理. 其处理的开始时间是在 .Subscribe 的那个瞬间开始执行的.

image.png
这里的 subscribe 对应了 1、publisher.subscribe() 这个阶段. 那么从流程上看就进入了 2、Subscriber.subscribe 阶段了 , 这里是具体的 Publisher 执行 subscribe 操作,对应到 lib 里边就是 Mono和Flux 的各种实现
Mono/Flux 是抽象类,具体的实现还是需要依靠子类的,例如FluxArray/MonoJust等等

subscribe是必须的,如果没有这部分订阅上,前边的逻辑都是没有意义的,同时在 webflux 中这个方式也体现出来了. 具体的代码是在 HttpServerHandle 里边

  1. #HttpServerHandle
  2. public void onStateChange(Connection connection, State newState) {
  3. if (newState == HttpServerState.REQUEST_RECEIVED) {
  4. try {
  5. if (log.isDebugEnabled()) {
  6. log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
  7. }
  8. HttpServerOperations ops = (HttpServerOperations) connection;
  9. Mono.fromDirect(handler.apply(ops, ops))
  10. .subscribe(ops.disposeSubscriber());
  11. }
  12. catch (Throwable t) {
  13. log.error(format(connection.channel(), ""), t);
  14. //"FutureReturnValueIgnored" this is deliberate
  15. connection.channel()
  16. .close();
  17. }
  18. }
  19. }

小总结

多方查看一下数据以后可以先做一个小小的总结😂

  • Reactor Streams 是规范 (Publisher接口)
  • Reactor 实现了Reactor Streams (Mono/Flux 2个实现)
  • Web Flux 实现了以Reactor为基础的,web领域的响应式编程框架 (业务代码)

**

参考文档

  • defer 和just的区别. 实际代码

    1. //使用方式1
    2. return Mono.defer(() -> {
    3. return response.writeWith(Flux.just(wrap));//设置body
    4. });
    5. //使用方式2
    6. return response.writeWith(Mono.just(wrap));
  • concatMap 的数据集是有序的 | flatMap 的数据集是无序的