hello world
public class HelloWorld {public static void main(String[] args) {Flowable.just("hello,world!").subscribe(System.out::println);}}
基本类
RxJava 3 features several base classes you can discover operators on:
io.reactivex.rxjava3.core.Flowable: 0..N flows, supporting Reactive-Streams and backpressureio.reactivex.rxjava3.core.Observable: 0..N flows, no backpressure,io.reactivex.rxjava3.core.Single: a flow of exactly 1 item or an error,io.reactivex.rxjava3.core.Completable: a flow without items but only a completion or error signal,io.reactivex.rxjava3.core.Maybe: a flow with no items, exactly one item or an error.术语
上游(upstream),下游(downstream)
RxJava中的数据流(dataflows)包含一个源(source), 0-N个中间步骤(intermediate steps),然后是数据流的消费者(data consumer)或者combinator步骤(负责消费数据流,个人的理解比如可以sink到其他地方)。
比如有如下的使用
如果当前在操作operator2,那么operator1相对于operator2是upstream,operator3相对于operator2是downstream。source.operator1().operator2().operator3().subscribe(consumer);
对象的行为 Objects in motion
在RxJava的文档中,发射(emission,emits),item,事件(event),信号(signal),数据(data)和消息(message)被视为同义词,并表示沿数据流传播的对象。背压 Backpressure
当数据流(dataflow)通过异步(asynchronous)步骤运行时,每个步骤可能以不同的速度执行不同的操作。为了避免严重影响(overwhelm)这些操作,操作被影响时由于临时缓冲(temporary buffer)增加,或者需要删除/丢弃数据(skipping/dropping data)通常会表现出内存增加(increased memory), 所以产生了背压(backpressure)。背压是一种流量控制的方式(flow control), 表达了其处理的能力(how many items are they ready to process)。在通常没有步骤知道上游将发送给它多少数据的情况下,这可以限制数据流的内存使用。
在rxjava中,Flowable支持背压,Observable不支持背压,Single,Complete,Maybe都不支持背压。assembly time
通过应用各种中间运算符来准备数据流,数据没有流动也没有产生副作用(side effect)。Flowable<Integer> flow = Flowable.range(1, 5).map(v -> v * v).filter(v -> v % 3 == 0);
subscribe time
当在内部建立处理步骤链的流上调用subscribe()时,这是一个临时状态。
这是触发订阅副作用(side effect)的时间(请参阅doOnSubscribe)。在此状态下,某些数据源会立即阻止(block)或开始发射(emitter)数据。flow.subscribe(System.out::println);
runtime
这是流主动发出数据(emitting items),错误(errors)或完成信号(completion)时的状态Observable.create(emitter -> {while (!emitter.isDisposed()) {long time = System.currentTimeMillis();emitter.onNext(time);if (time % 2 != 0) {emitter.onError(new IllegalStateException("Odd millisecond!"));break;}}}).subscribe(System.out::println, Throwable::printStackTrace);
简单的后台运算
上面使用了builder的模式,和下面的代码是等价的,可以加深理解 ```java Flowablepublic class SimpleBackgroundComputation {public static void main(String[] args) throws Exception {Flowable.fromCallable(() -> {Thread.sleep(1000); //模拟计算return "DONE";}).subscribeOn(Schedulers.io()).observeOn(Schedulers.single()).subscribe(s -> System.out.println(Thread.currentThread().getName() + " " + s), Throwable::printStackTrace);Thread.sleep(2000);}}
source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return “Done”; });
Flowable
Flowable
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
**通常,您可以通过subscriptionOn将计算移动或阻塞IO到其他线程。数据准备好后,您可以确保通过observeOn在前台或GUI线程上对它们进行处理。(可以通过弹珠图更好的理解这句话,这是scheduler调度的核心思想)**<br />[subscribeOn和observeOn结合使用的弹珠图](https://processon.com/diagraming/5ff3f360e0b34d34e12b1e29)<a name="EnMIe"></a># SchedulersRxJava不直接使用Thread或者ExecutorService,而是使用了Schedulers的抽象。在rxjava3中,Schedulers提供了一些工具方法。**在RxJava中,默认的Scheduler在守护程序线程上运行。**- **Schedulers.computation()**: Run computation intensive work(计算密集) on a fixed number of dedicated threads(固定数量的线程数) in the background. Most asynchronous operators use this as their default Scheduler.- **Schedulers.io()**: Run I/O-like or blocking operations on a dynamically changing set of threads(动态数量的线程数).- **Schedulers.single():** Run work on a single thread in a sequential and FIFO manner.- **Schedulers.trampoline()**: Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.- **Schedulers.from(Executor): **提供自定义的线程池<a name="FdGUA"></a># Parallel processingRxJava中的并行性意味着运行独立的流并将其结果合并回单个流。<a name="f0Czg"></a>## flatMap```javapublic static void testFlatMap() {Flowable.range(1, 10).flatMap(v ->Flowable.just(v).subscribeOn(Schedulers.computation()).map(w -> w * w))//主线程会阻塞 等待子线程执行完//无需使用Thread.sleep方法.blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));}
注意点: flatMap不可以保证流输出的顺序和map的时候一致!
concatMap
public static void testFlatMap() {Flowable.range(1, 10).flatMap(v ->Flowable.just(v).subscribeOn(Schedulers.computation()).map(w -> w * w))//主线程会阻塞 等待子线程执行完//无需使用Thread.sleep方法.blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));}
concatMap可以保证流的有序输出,一次映射并运行一个内部流
concatMapEager
public static void testFlatMap() {Flowable.range(1, 10).concatMapEager(v ->Flowable.just(v).subscribeOn(Schedulers.computation()).map(w -> w * w))//主线程会阻塞 等待子线程执行完//无需使用Thread.sleep方法.blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));}
它将“一次”运行所有内部流,但输出流将按照创建这些内部流的顺序进行。
使用Flowable.parallel
public static void testParallel() {Flowable.range(1, 10).parallel().runOn(Schedulers.computation()).map(v -> v * v).sequential().blockingSubscribe(System.out::println);}
Dependent sub-flows
SkuInfo类
public class SkuInfo {private Long skuId;private String skuName;private Integer price;public SkuInfo(Long skuId, String skuName, Integer price) {this.skuId = skuId;this.skuName = skuName;this.price = price;}public Long getSkuId() {return skuId;}public void setSkuId(Long skuId) {this.skuId = skuId;}public String getSkuName() {return skuName;}public void setSkuName(String skuName) {this.skuName = skuName;}public Integer getPrice() {return price;}public void setPrice(Integer price) {this.price = price;}}
Inventory类
public class Inventory {private Long skuId;private String warehouseCode;private Integer stock;public Long getSkuId() {return skuId;}public void setSkuId(Long skuId) {this.skuId = skuId;}public String getWarehouseCode() {return warehouseCode;}public void setWarehouseCode(String warehouseCode) {this.warehouseCode = warehouseCode;}public Integer getStock() {return stock;}public void setStock(Integer stock) {this.stock = stock;}public Inventory(Long skuId, String warehouseCode, Integer stock) {this.skuId = skuId;this.warehouseCode = warehouseCode;this.stock = stock;}}
public class DependentSubFlowTest {public static void main(String[] args) {test_dependent_sub_flows();}public static void test_dependent_sub_flows() {//1.首先获得库存流Flowable<Inventory> inventorySource = getInventoryFlow();//2.通过库存流获取sku流inventorySource.flatMap(inventory ->getSkuInfoBySkuId(inventory.getSkuId()).map(skuInfo -> "name:" + skuInfo.getSkuName() + ",price:" + skuInfo.getPrice())).subscribe(System.out::println);}/*** 创建库存的flowable** @return 返回库存的flowable*/public static Flowable<Inventory> getInventoryFlow() {Inventory iv1 = new Inventory(1111L, "A1", 10);Inventory iv2 = new Inventory(1112L, "A2", 20);Inventory iv3 = new Inventory(1113L, "A3", 30);List<Inventory> inventories = new ArrayList<>();inventories.add(iv1);inventories.add(iv2);inventories.add(iv3);return Flowable.fromStream(inventories.stream());}/*** 通过skuId获取Sku信息** @param skuId skuId* @return sku信息*/public static Flowable<SkuInfo> getSkuInfoBySkuId(Long skuId) {SkuInfo s1 = new SkuInfo(1111L, "面包", 3);SkuInfo s2 = new SkuInfo(1112L, "可乐", 5);SkuInfo s3 = new SkuInfo(1113L, "方便面", 4);Map<Long, SkuInfo> skuMap = new HashMap<>();skuMap.put(s1.getSkuId(), s1);skuMap.put(s2.getSkuId(), s2);skuMap.put(s3.getSkuId(), s3);return Flowable.just(skuMap.get(skuId));}}
Continuations
depedent
简单来说就是下游数据对上游数据有依赖,在并行执行的时候需要考虑依赖关系。比如有这样一个场景,
- 获取仓库编码列表
- 获取对应仓库下的所有库存
- 通过库存获取skuId,然后通过skuId获取SkuInfo
如果之前是写成
public static void test_dependent_continuation() throws InterruptedException {WarehouseService.getWarehouseCodeFlow().flatMap(warehouseCode -> WarehouseService.getInventoryFlow(warehouseCode)).flatMap(inventory -> InventoryService.getSkuInfoBySkuId(inventory.getSkuId())).subscribeOn(Schedulers.computation()).subscribe(System.out::println);Thread.sleep(1000L);}
需要改成如下形式
public static void test_dependent_continuation() throws InterruptedException {WarehouseService.getWarehouseCodeFlow().flatMap(warehouseCode -> WarehouseService.getInventoryFlow(warehouseCode).flatMap(inventory -> InventoryService.getSkuInfoBySkuId(inventory.getSkuId()))).subscribeOn(Schedulers.computation()).subscribe(System.out::println);Thread.sleep(1000L);}
non dependent
使用andThen
sourceObservable.ignoreElements() // returns Completable.andThen(someSingleSource).map(v -> v.toString())
Deferred-dependent
有时,由于某种原因,先前的数据流和新的数据流之间存在隐式数据依赖性,由于某些原因,这些依赖性未通过“常规通道”流动。
public static void test_deferred_dependent() {AtomicInteger count = new AtomicInteger();Observable.range(1, 10).doOnNext(ignored -> count.incrementAndGet()).ignoreElements().andThen(Single.just(count.get())).subscribe(System.out::println);}
输出结果是 0,因为Single.just(count.get())的结果是在assembly time计算的,这个时候还没有数据流。我们需要推迟这个计算知道runtime。
AtomicInteger count = new AtomicInteger();Observable.range(1, 10).doOnNext(ignored -> count.incrementAndGet()).ignoreElements().andThen(Single.defer(() -> Single.just(count.get()))).subscribe(System.out::println);
类型转换 Type convention
| |
Flowable | Observable | Single | Maybe | Completable |
| Flowable | | toObservable | first, firstOrError, single, singleOrError, last, lastOrError | firstElement, singleElement, lastElement | ignoreElements |
| —- | —- | —- | —- | —- | —- |
| Observable | toFlowable | | first, firstOrError, single, singleOrError, last, lastOrError | firstElement, singleElement, lastElement | ignoreElements |
| Single | toFlowable | toObservable | | toMaybe | ignoreElement |
| Maybe | toFlowable | toObservable | toSingle | | ignoreElement |
| Completable | toFlowable | toObservable | toSingle | toMaybe | |
1:将多值源转换为单值源时,应决定应将多个源值中的哪一个视为结果。
2:将一个Observable变成Flowable需要另外的决定: 如何处理不受限制的observable?有几种可用的策略(buffering, dropping, keeping the lates)使用BackpressureStrategy参数(FlowableonBackpressureBuffer, onBackpressureDrop, onBackpressureLatest。
3:只有(最多)一个数据源时,背压就没有问题,因为它可以一直存储,直到下游准备好使用。
操作符的重载
针对特定的类型。
| Operator | Overloads |
|---|---|
flatMap |
flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable |
concatMap |
concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable |
switchMap |
switchMapSingle, switchMapMaybe, switchMapCompletable |
操作符命名约定
unusable keywords
type erasure
许多期望用户提供返回返回类型的函数的运算符不会被重载,因为围绕Function
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
type ambiguities
比如concatWith有如下几种形式
Flowable<T> concatWith(Publisher<? extends T> other);Flowable<T> concatWith(SingleSource<? extends T> other);
Publisher和SingleSource都显示为函数式接口(一种抽象方法的类型),并可能鼓励用户尝试提供lambda表达式:
someSource.concatWith(s -> Single.just(2)).subscribe(System.out::println, Throwable::printStackTrace);
自从rxjava 2.1.10后上面的代码编译期就会报错。至少存在4个concatWith重载,并且编译器发现上面的代码有类型歧义。在这种情况下,用户可能希望将某些计算推迟到someSource完成,因此应该将正确的明确运算符推迟:
someSource.concatWith(Single.defer(() -> Single.just(2))).subscribe(System.out::println, Throwable::printStackTrace);
有时,添加后缀以避免逻辑歧义,这些歧义可能会编译但会在流中产生错误的类型。
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);Flowable<T> mergeArray(Publisher<? extends T>... sources);
当功能接口类型作为类型参数T参与时,这也可能变得模棱两可。 不是很理解
错误处理
数据流可能会失败,这时会向用户发出错误。但是有时候,多个源可能会失败,这时可以选择是否等待所有源完成或失败。为了表明这种处理,许多运算符名称后缀了DelayError字(而其他运算符在其重载之一中带有delayError或delayErrors布尔标志)。
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
Base class vs base type
RxJava 3的设计在很大程度上受Reactive Streams规范的影响,因此,该库为每个反应类型提供了一个类和一个接口:
| Type | Class | Interface | Consumer |
|---|---|---|---|
| 0..N backpressured | Flowable |
Publisher |
Subscriber |
| 0..N unbounded | Observable |
ObservableSource |
Observer |
| 1 element or error | Single |
SingleSource |
SingleObserver |
| 0..1 element or error | Maybe |
MaybeSource |
MaybeObserver |
| 0 element or error | Completable |
CompletableSource |
CompletableObserver |
1 org.reactivestreams.Publisher是外部Reactive Streams库的一部分。通过响应式流规范控制的标准化机制,它是与其他响应式库进行交互的主要类型。
2 接口的命名约定是将Source附加到原来的名称后,由于Publisher是由Reactive Streams库提供的,因此没有FlowableSource(并且对它进行子类型化也不会对互操作有所帮助),但是,这些接口在Reactive Streams规范的意义上不是标准的,并且当前仅针对RxJava。
