hello world

  1. public class HelloWorld {
  2. public static void main(String[] args) {
  3. Flowable.just("hello,world!").subscribe(System.out::println);
  4. }
  5. }

基本类

RxJava 3 features several base classes you can discover operators on:

  • io.reactivex.rxjava3.core.Flowable: 0..N flows, supporting Reactive-Streams and backpressure
  • io.reactivex.rxjava3.core.Observable: 0..N flows, no backpressure,
  • io.reactivex.rxjava3.core.Single: a flow of exactly 1 item or an error,
  • io.reactivex.rxjava3.core.Completable: a flow without items but only a completion or error signal,
  • io.reactivex.rxjava3.core.Maybe: a flow with no items, exactly one item or an error.

    术语

    上游(upstream),下游(downstream)

    RxJava中的数据流(dataflows)包含一个源(source), 0-N个中间步骤(intermediate steps),然后是数据流的消费者(data consumer)或者combinator步骤(负责消费数据流,个人的理解比如可以sink到其他地方)。
    比如有如下的使用
    1. source.operator1().operator2().operator3().subscribe(consumer);
    如果当前在操作operator2,那么operator1相对于operator2是upstream,operator3相对于operator2是downstream。

    对象的行为 Objects in motion

    在RxJava的文档中,发射(emission,emits),item,事件(event),信号(signal),数据(data)和消息(message)被视为同义词,并表示沿数据流传播的对象。

    背压 Backpressure

    当数据流(dataflow)通过异步(asynchronous)步骤运行时,每个步骤可能以不同的速度执行不同的操作。为了避免严重影响(overwhelm)这些操作,操作被影响时由于临时缓冲(temporary buffer)增加,或者需要删除/丢弃数据(skipping/dropping data)通常会表现出内存增加(increased memory), 所以产生了背压(backpressure)。背压是一种流量控制的方式(flow control), 表达了其处理的能力(how many items are they ready to process)。在通常没有步骤知道上游将发送给它多少数据的情况下,这可以限制数据流的内存使用。
    在rxjava中,Flowable支持背压,Observable不支持背压,Single,Complete,Maybe都不支持背压。

    assembly time

    通过应用各种中间运算符来准备数据流,数据没有流动也没有产生副作用(side effect)。
    1. Flowable<Integer> flow = Flowable.range(1, 5)
    2. .map(v -> v * v)
    3. .filter(v -> v % 3 == 0);

    subscribe time

    当在内部建立处理步骤链的流上调用subscribe()时,这是一个临时状态。
    这是触发订阅副作用(side effect)的时间(请参阅doOnSubscribe)。在此状态下,某些数据源会立即阻止(block)或开始发射(emitter)数据。
    1. flow.subscribe(System.out::println);

    runtime

    这是流主动发出数据(emitting items),错误(errors)或完成信号(completion)时的状态
    1. Observable.create(emitter -> {
    2. while (!emitter.isDisposed()) {
    3. long time = System.currentTimeMillis();
    4. emitter.onNext(time);
    5. if (time % 2 != 0) {
    6. emitter.onError(new IllegalStateException("Odd millisecond!"));
    7. break;
    8. }
    9. }
    10. })
    11. .subscribe(System.out::println, Throwable::printStackTrace);

    简单的后台运算

    1. public class SimpleBackgroundComputation {
    2. public static void main(String[] args) throws Exception {
    3. Flowable.fromCallable(() -> {
    4. Thread.sleep(1000); //模拟计算
    5. return "DONE";
    6. })
    7. .subscribeOn(Schedulers.io())
    8. .observeOn(Schedulers.single())
    9. .subscribe(s -> System.out.println(Thread.currentThread().getName() + " " + s), Throwable::printStackTrace);
    10. Thread.sleep(2000);
    11. }
    12. }
    上面使用了builder的模式,和下面的代码是等价的,可以加深理解 ```java Flowable source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return “Done”; });

Flowable runBackground = source.subscribeOn(Schedulers.io());

Flowable showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

  1. **通常,您可以通过subscriptionOn将计算移动或阻塞IO到其他线程。数据准备好后,您可以确保通过observeOn在前台或GUI线程上对它们进行处理。(可以通过弹珠图更好的理解这句话,这是scheduler调度的核心思想)**<br />[subscribeOnobserveOn结合使用的弹珠图](https://processon.com/diagraming/5ff3f360e0b34d34e12b1e29)
  2. <a name="EnMIe"></a>
  3. # Schedulers
  4. RxJava不直接使用Thread或者ExecutorService,而是使用了Schedulers的抽象。在rxjava3中,Schedulers提供了一些工具方法。**在RxJava中,默认的Scheduler在守护程序线程上运行。**
  5. - **Schedulers.computation()**: Run computation intensive work(计算密集) on a fixed number of dedicated threads(固定数量的线程数) in the background. Most asynchronous operators use this as their default Scheduler.
  6. - **Schedulers.io()**: Run I/O-like or blocking operations on a dynamically changing set of threads(动态数量的线程数).
  7. - **Schedulers.single():** Run work on a single thread in a sequential and FIFO manner.
  8. - **Schedulers.trampoline()**: Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.
  9. - **Schedulers.from(Executor): **提供自定义的线程池
  10. <a name="FdGUA"></a>
  11. # Parallel processing
  12. RxJava中的并行性意味着运行独立的流并将其结果合并回单个流。
  13. <a name="f0Czg"></a>
  14. ## flatMap
  15. ```java
  16. public static void testFlatMap() {
  17. Flowable.range(1, 10)
  18. .flatMap(v ->
  19. Flowable.just(v)
  20. .subscribeOn(Schedulers.computation())
  21. .map(w -> w * w))
  22. //主线程会阻塞 等待子线程执行完
  23. //无需使用Thread.sleep方法
  24. .blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
  25. }

注意点: flatMap不可以保证流输出的顺序和map的时候一致!

concatMap

  1. public static void testFlatMap() {
  2. Flowable.range(1, 10)
  3. .flatMap(v ->
  4. Flowable.just(v)
  5. .subscribeOn(Schedulers.computation())
  6. .map(w -> w * w))
  7. //主线程会阻塞 等待子线程执行完
  8. //无需使用Thread.sleep方法
  9. .blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
  10. }

concatMap可以保证流的有序输出,一次映射并运行一个内部流

concatMapEager

  1. public static void testFlatMap() {
  2. Flowable.range(1, 10)
  3. .concatMapEager(v ->
  4. Flowable.just(v)
  5. .subscribeOn(Schedulers.computation())
  6. .map(w -> w * w))
  7. //主线程会阻塞 等待子线程执行完
  8. //无需使用Thread.sleep方法
  9. .blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
  10. }

它将“一次”运行所有内部流,但输出流将按照创建这些内部流的顺序进行。

使用Flowable.parallel

  1. public static void testParallel() {
  2. Flowable.range(1, 10)
  3. .parallel()
  4. .runOn(Schedulers.computation())
  5. .map(v -> v * v)
  6. .sequential()
  7. .blockingSubscribe(System.out::println);
  8. }

使用sequential,输出的流是有序的。

Dependent sub-flows

SkuInfo类

  1. public class SkuInfo {
  2. private Long skuId;
  3. private String skuName;
  4. private Integer price;
  5. public SkuInfo(Long skuId, String skuName, Integer price) {
  6. this.skuId = skuId;
  7. this.skuName = skuName;
  8. this.price = price;
  9. }
  10. public Long getSkuId() {
  11. return skuId;
  12. }
  13. public void setSkuId(Long skuId) {
  14. this.skuId = skuId;
  15. }
  16. public String getSkuName() {
  17. return skuName;
  18. }
  19. public void setSkuName(String skuName) {
  20. this.skuName = skuName;
  21. }
  22. public Integer getPrice() {
  23. return price;
  24. }
  25. public void setPrice(Integer price) {
  26. this.price = price;
  27. }
  28. }

Inventory类

  1. public class Inventory {
  2. private Long skuId;
  3. private String warehouseCode;
  4. private Integer stock;
  5. public Long getSkuId() {
  6. return skuId;
  7. }
  8. public void setSkuId(Long skuId) {
  9. this.skuId = skuId;
  10. }
  11. public String getWarehouseCode() {
  12. return warehouseCode;
  13. }
  14. public void setWarehouseCode(String warehouseCode) {
  15. this.warehouseCode = warehouseCode;
  16. }
  17. public Integer getStock() {
  18. return stock;
  19. }
  20. public void setStock(Integer stock) {
  21. this.stock = stock;
  22. }
  23. public Inventory(Long skuId, String warehouseCode, Integer stock) {
  24. this.skuId = skuId;
  25. this.warehouseCode = warehouseCode;
  26. this.stock = stock;
  27. }
  28. }
  1. public class DependentSubFlowTest {
  2. public static void main(String[] args) {
  3. test_dependent_sub_flows();
  4. }
  5. public static void test_dependent_sub_flows() {
  6. //1.首先获得库存流
  7. Flowable<Inventory> inventorySource = getInventoryFlow();
  8. //2.通过库存流获取sku流
  9. inventorySource
  10. .flatMap(inventory ->
  11. getSkuInfoBySkuId(inventory.getSkuId())
  12. .map(skuInfo -> "name:" + skuInfo.getSkuName() + ",price:" + skuInfo.getPrice()))
  13. .subscribe(System.out::println);
  14. }
  15. /**
  16. * 创建库存的flowable
  17. *
  18. * @return 返回库存的flowable
  19. */
  20. public static Flowable<Inventory> getInventoryFlow() {
  21. Inventory iv1 = new Inventory(1111L, "A1", 10);
  22. Inventory iv2 = new Inventory(1112L, "A2", 20);
  23. Inventory iv3 = new Inventory(1113L, "A3", 30);
  24. List<Inventory> inventories = new ArrayList<>();
  25. inventories.add(iv1);
  26. inventories.add(iv2);
  27. inventories.add(iv3);
  28. return Flowable.fromStream(inventories.stream());
  29. }
  30. /**
  31. * 通过skuId获取Sku信息
  32. *
  33. * @param skuId skuId
  34. * @return sku信息
  35. */
  36. public static Flowable<SkuInfo> getSkuInfoBySkuId(Long skuId) {
  37. SkuInfo s1 = new SkuInfo(1111L, "面包", 3);
  38. SkuInfo s2 = new SkuInfo(1112L, "可乐", 5);
  39. SkuInfo s3 = new SkuInfo(1113L, "方便面", 4);
  40. Map<Long, SkuInfo> skuMap = new HashMap<>();
  41. skuMap.put(s1.getSkuId(), s1);
  42. skuMap.put(s2.getSkuId(), s2);
  43. skuMap.put(s3.getSkuId(), s3);
  44. return Flowable.just(skuMap.get(skuId));
  45. }
  46. }

Continuations

depedent

简单来说就是下游数据对上游数据有依赖,在并行执行的时候需要考虑依赖关系。比如有这样一个场景,

  1. 获取仓库编码列表
  2. 获取对应仓库下的所有库存
  3. 通过库存获取skuId,然后通过skuId获取SkuInfo

如果之前是写成

  1. public static void test_dependent_continuation() throws InterruptedException {
  2. WarehouseService.getWarehouseCodeFlow()
  3. .flatMap(warehouseCode -> WarehouseService.getInventoryFlow(warehouseCode)
  4. )
  5. .flatMap(inventory -> InventoryService.getSkuInfoBySkuId(inventory.getSkuId()))
  6. .subscribeOn(Schedulers.computation())
  7. .subscribe(System.out::println);
  8. Thread.sleep(1000L);
  9. }

需要改成如下形式

  1. public static void test_dependent_continuation() throws InterruptedException {
  2. WarehouseService.getWarehouseCodeFlow()
  3. .flatMap(warehouseCode -> WarehouseService.getInventoryFlow(warehouseCode)
  4. .flatMap(inventory -> InventoryService.getSkuInfoBySkuId(inventory.getSkuId())))
  5. .subscribeOn(Schedulers.computation())
  6. .subscribe(System.out::println);
  7. Thread.sleep(1000L);
  8. }

non dependent

第一个源/数据流的结果无关紧要。

使用andThen

  1. sourceObservable
  2. .ignoreElements() // returns Completable
  3. .andThen(someSingleSource)
  4. .map(v -> v.toString())

具体可以看下面的deferred-dependent的例子

Deferred-dependent

有时,由于某种原因,先前的数据流和新的数据流之间存在隐式数据依赖性,由于某些原因,这些依赖性未通过“常规通道”流动。

  1. public static void test_deferred_dependent() {
  2. AtomicInteger count = new AtomicInteger();
  3. Observable.range(1, 10)
  4. .doOnNext(ignored -> count.incrementAndGet())
  5. .ignoreElements()
  6. .andThen(Single.just(count.get()))
  7. .subscribe(System.out::println);
  8. }

输出结果是 0,因为Single.just(count.get())的结果是在assembly time计算的,这个时候还没有数据流。我们需要推迟这个计算知道runtime。

  1. AtomicInteger count = new AtomicInteger();
  2. Observable.range(1, 10)
  3. .doOnNext(ignored -> count.incrementAndGet())
  4. .ignoreElements()
  5. .andThen(Single.defer(() -> Single.just(count.get())))
  6. .subscribe(System.out::println);

输出结果是10,是符合预期的结果。

类型转换 Type convention

| |
Flowable | Observable | Single | Maybe | Completable | | Flowable | | toObservable | first, firstOrError, single, singleOrError, last, lastOrError | firstElement, singleElement, lastElement | ignoreElements | | —- | —- | —- | —- | —- | —- | | Observable | toFlowable | | first, firstOrError, single, singleOrError, last, lastOrError | firstElement, singleElement, lastElement | ignoreElements | | Single | toFlowable | toObservable | | toMaybe | ignoreElement | | Maybe | toFlowable | toObservable | toSingle | | ignoreElement | | Completable | toFlowable | toObservable | toSingle | toMaybe | |

1:将多值源转换为单值源时,应决定应将多个源值中的哪一个视为结果。
2:将一个Observable变成Flowable需要另外的决定: 如何处理不受限制的observable?有几种可用的策略(buffering, dropping, keeping the lates)使用BackpressureStrategy参数(Flowable toFlowable(@NonNull BackpressureStrategy strategy))或者使用Flowable的操作符比如onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest。
3:只有(最多)一个数据源时,背压就没有问题,因为它可以一直存储,直到下游准备好使用。

操作符的重载

针对特定的类型。

Operator Overloads
flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

操作符命名约定

unusable keywords

一些java中关键字不能使用,rxjava使用了其他的名字

type erasure

许多期望用户提供返回返回类型的函数的运算符不会被重载,因为围绕Function 的类型擦除会将此类方法签名转换为重复项。RxJava选择通过在类型后面加上后缀来命名此类运算符

  1. Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
  2. Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

type ambiguities

比如concatWith有如下几种形式

  1. Flowable<T> concatWith(Publisher<? extends T> other);
  2. Flowable<T> concatWith(SingleSource<? extends T> other);

Publisher和SingleSource都显示为函数式接口(一种抽象方法的类型),并可能鼓励用户尝试提供lambda表达式:

  1. someSource.concatWith(s -> Single.just(2))
  2. .subscribe(System.out::println, Throwable::printStackTrace);

自从rxjava 2.1.10后上面的代码编译期就会报错。至少存在4个concatWith重载,并且编译器发现上面的代码有类型歧义。在这种情况下,用户可能希望将某些计算推迟到someSource完成,因此应该将正确的明确运算符推迟:

  1. someSource.concatWith(Single.defer(() -> Single.just(2)))
  2. .subscribe(System.out::println, Throwable::printStackTrace);

有时,添加后缀以避免逻辑歧义,这些歧义可能会编译但会在流中产生错误的类型。

  1. Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
  2. Flowable<T> mergeArray(Publisher<? extends T>... sources);

当功能接口类型作为类型参数T参与时,这也可能变得模棱两可。 不是很理解

错误处理

数据流可能会失败,这时会向用户发出错误。但是有时候,多个源可能会失败,这时可以选择是否等待所有源完成或失败。为了表明这种处理,许多运算符名称后缀了DelayError字(而其他运算符在其重载之一中带有delayError或delayErrors布尔标志)。

  1. Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
  2. Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

Base class vs base type

RxJava 3的设计在很大程度上受Reactive Streams规范的影响,因此,该库为每个反应类型提供了一个类和一个接口:

Type Class Interface Consumer
0..N backpressured Flowable Publisher Subscriber
0..N unbounded Observable ObservableSource Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

1 org.reactivestreams.Publisher是外部Reactive Streams库的一部分。通过响应式流规范控制的标准化机制,它是与其他响应式库进行交互的主要类型。
2 接口的命名约定是将Source附加到原来的名称后,由于Publisher是由Reactive Streams库提供的,因此没有FlowableSource(并且对它进行子类型化也不会对互操作有所帮助),但是,这些接口在Reactive Streams规范的意义上不是标准的,并且当前仅针对RxJava。