hello world
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("hello,world!").subscribe(System.out::println);
}
}
基本类
RxJava 3 features several base classes you can discover operators on:
io.reactivex.rxjava3.core.Flowable
: 0..N flows, supporting Reactive-Streams and backpressureio.reactivex.rxjava3.core.Observable
: 0..N flows, no backpressure,io.reactivex.rxjava3.core.Single
: a flow of exactly 1 item or an error,io.reactivex.rxjava3.core.Completable
: a flow without items but only a completion or error signal,io.reactivex.rxjava3.core.Maybe
: a flow with no items, exactly one item or an error.术语
上游(upstream),下游(downstream)
RxJava中的数据流(dataflows)包含一个源(source), 0-N个中间步骤(intermediate steps),然后是数据流的消费者(data consumer)或者combinator步骤(负责消费数据流,个人的理解比如可以sink到其他地方)。
比如有如下的使用
如果当前在操作operator2,那么operator1相对于operator2是upstream,operator3相对于operator2是downstream。source.operator1().operator2().operator3().subscribe(consumer);
对象的行为 Objects in motion
在RxJava的文档中,发射(emission,emits),item,事件(event),信号(signal),数据(data)和消息(message)被视为同义词,并表示沿数据流传播的对象。背压 Backpressure
当数据流(dataflow)通过异步(asynchronous)步骤运行时,每个步骤可能以不同的速度执行不同的操作。为了避免严重影响(overwhelm)这些操作,操作被影响时由于临时缓冲(temporary buffer)增加,或者需要删除/丢弃数据(skipping/dropping data)通常会表现出内存增加(increased memory), 所以产生了背压(backpressure)。背压是一种流量控制的方式(flow control), 表达了其处理的能力(how many items are they ready to process)。在通常没有步骤知道上游将发送给它多少数据的情况下,这可以限制数据流的内存使用。
在rxjava中,Flowable支持背压,Observable不支持背压,Single,Complete,Maybe都不支持背压。assembly time
通过应用各种中间运算符来准备数据流,数据没有流动也没有产生副作用(side effect)。Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);
subscribe time
当在内部建立处理步骤链的流上调用subscribe()时,这是一个临时状态。
这是触发订阅副作用(side effect)的时间(请参阅doOnSubscribe)。在此状态下,某些数据源会立即阻止(block)或开始发射(emitter)数据。flow.subscribe(System.out::println);
runtime
这是流主动发出数据(emitting items),错误(errors)或完成信号(completion)时的状态Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
简单的后台运算
上面使用了builder的模式,和下面的代码是等价的,可以加深理解 ```java Flowablepublic class SimpleBackgroundComputation {
public static void main(String[] args) throws Exception {
Flowable.fromCallable(() -> {
Thread.sleep(1000); //模拟计算
return "DONE";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(s -> System.out.println(Thread.currentThread().getName() + " " + s), Throwable::printStackTrace);
Thread.sleep(2000);
}
}
source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return “Done”; });
Flowable
Flowable
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
**通常,您可以通过subscriptionOn将计算移动或阻塞IO到其他线程。数据准备好后,您可以确保通过observeOn在前台或GUI线程上对它们进行处理。(可以通过弹珠图更好的理解这句话,这是scheduler调度的核心思想)**<br />[subscribeOn和observeOn结合使用的弹珠图](https://processon.com/diagraming/5ff3f360e0b34d34e12b1e29)
<a name="EnMIe"></a>
# Schedulers
RxJava不直接使用Thread或者ExecutorService,而是使用了Schedulers的抽象。在rxjava3中,Schedulers提供了一些工具方法。**在RxJava中,默认的Scheduler在守护程序线程上运行。**
- **Schedulers.computation()**: Run computation intensive work(计算密集) on a fixed number of dedicated threads(固定数量的线程数) in the background. Most asynchronous operators use this as their default Scheduler.
- **Schedulers.io()**: Run I/O-like or blocking operations on a dynamically changing set of threads(动态数量的线程数).
- **Schedulers.single():** Run work on a single thread in a sequential and FIFO manner.
- **Schedulers.trampoline()**: Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.
- **Schedulers.from(Executor): **提供自定义的线程池
<a name="FdGUA"></a>
# Parallel processing
RxJava中的并行性意味着运行独立的流并将其结果合并回单个流。
<a name="f0Czg"></a>
## flatMap
```java
public static void testFlatMap() {
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w))
//主线程会阻塞 等待子线程执行完
//无需使用Thread.sleep方法
.blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
}
注意点: flatMap不可以保证流输出的顺序和map的时候一致!
concatMap
public static void testFlatMap() {
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w))
//主线程会阻塞 等待子线程执行完
//无需使用Thread.sleep方法
.blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
}
concatMap可以保证流的有序输出,一次映射并运行一个内部流
concatMapEager
public static void testFlatMap() {
Flowable.range(1, 10)
.concatMapEager(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w))
//主线程会阻塞 等待子线程执行完
//无需使用Thread.sleep方法
.blockingSubscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
}
它将“一次”运行所有内部流,但输出流将按照创建这些内部流的顺序进行。
使用Flowable.parallel
public static void testParallel() {
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
}
Dependent sub-flows
SkuInfo类
public class SkuInfo {
private Long skuId;
private String skuName;
private Integer price;
public SkuInfo(Long skuId, String skuName, Integer price) {
this.skuId = skuId;
this.skuName = skuName;
this.price = price;
}
public Long getSkuId() {
return skuId;
}
public void setSkuId(Long skuId) {
this.skuId = skuId;
}
public String getSkuName() {
return skuName;
}
public void setSkuName(String skuName) {
this.skuName = skuName;
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
}
Inventory类
public class Inventory {
private Long skuId;
private String warehouseCode;
private Integer stock;
public Long getSkuId() {
return skuId;
}
public void setSkuId(Long skuId) {
this.skuId = skuId;
}
public String getWarehouseCode() {
return warehouseCode;
}
public void setWarehouseCode(String warehouseCode) {
this.warehouseCode = warehouseCode;
}
public Integer getStock() {
return stock;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public Inventory(Long skuId, String warehouseCode, Integer stock) {
this.skuId = skuId;
this.warehouseCode = warehouseCode;
this.stock = stock;
}
}
public class DependentSubFlowTest {
public static void main(String[] args) {
test_dependent_sub_flows();
}
public static void test_dependent_sub_flows() {
//1.首先获得库存流
Flowable<Inventory> inventorySource = getInventoryFlow();
//2.通过库存流获取sku流
inventorySource
.flatMap(inventory ->
getSkuInfoBySkuId(inventory.getSkuId())
.map(skuInfo -> "name:" + skuInfo.getSkuName() + ",price:" + skuInfo.getPrice()))
.subscribe(System.out::println);
}
/**
* 创建库存的flowable
*
* @return 返回库存的flowable
*/
public static Flowable<Inventory> getInventoryFlow() {
Inventory iv1 = new Inventory(1111L, "A1", 10);
Inventory iv2 = new Inventory(1112L, "A2", 20);
Inventory iv3 = new Inventory(1113L, "A3", 30);
List<Inventory> inventories = new ArrayList<>();
inventories.add(iv1);
inventories.add(iv2);
inventories.add(iv3);
return Flowable.fromStream(inventories.stream());
}
/**
* 通过skuId获取Sku信息
*
* @param skuId skuId
* @return sku信息
*/
public static Flowable<SkuInfo> getSkuInfoBySkuId(Long skuId) {
SkuInfo s1 = new SkuInfo(1111L, "面包", 3);
SkuInfo s2 = new SkuInfo(1112L, "可乐", 5);
SkuInfo s3 = new SkuInfo(1113L, "方便面", 4);
Map<Long, SkuInfo> skuMap = new HashMap<>();
skuMap.put(s1.getSkuId(), s1);
skuMap.put(s2.getSkuId(), s2);
skuMap.put(s3.getSkuId(), s3);
return Flowable.just(skuMap.get(skuId));
}
}
Continuations
depedent
简单来说就是下游数据对上游数据有依赖,在并行执行的时候需要考虑依赖关系。比如有这样一个场景,
- 获取仓库编码列表
- 获取对应仓库下的所有库存
- 通过库存获取skuId,然后通过skuId获取SkuInfo
如果之前是写成
public static void test_dependent_continuation() throws InterruptedException {
WarehouseService.getWarehouseCodeFlow()
.flatMap(warehouseCode -> WarehouseService.getInventoryFlow(warehouseCode)
)
.flatMap(inventory -> InventoryService.getSkuInfoBySkuId(inventory.getSkuId()))
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
Thread.sleep(1000L);
}
需要改成如下形式
public static void test_dependent_continuation() throws InterruptedException {
WarehouseService.getWarehouseCodeFlow()
.flatMap(warehouseCode -> WarehouseService.getInventoryFlow(warehouseCode)
.flatMap(inventory -> InventoryService.getSkuInfoBySkuId(inventory.getSkuId())))
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
Thread.sleep(1000L);
}
non dependent
使用andThen
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
Deferred-dependent
有时,由于某种原因,先前的数据流和新的数据流之间存在隐式数据依赖性,由于某些原因,这些依赖性未通过“常规通道”流动。
public static void test_deferred_dependent() {
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
}
输出结果是 0,因为Single.just(count.get())的结果是在assembly time计算的,这个时候还没有数据流。我们需要推迟这个计算知道runtime。
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
类型转换 Type convention
| |
Flowable | Observable | Single | Maybe | Completable |
| Flowable | | toObservable
| first
, firstOrError
, single
, singleOrError
, last
, lastOrError
| firstElement
, singleElement
, lastElement
| ignoreElements
|
| —- | —- | —- | —- | —- | —- |
| Observable | toFlowable
| | first
, firstOrError
, single
, singleOrError
, last
, lastOrError
| firstElement
, singleElement
, lastElement
| ignoreElements
|
| Single | toFlowable
| toObservable
| | toMaybe
| ignoreElement
|
| Maybe | toFlowable
| toObservable
| toSingle
| | ignoreElement
|
| Completable | toFlowable
| toObservable
| toSingle
| toMaybe
| |
1:将多值源转换为单值源时,应决定应将多个源值中的哪一个视为结果。
2:将一个Observable变成Flowable需要另外的决定: 如何处理不受限制的observable?有几种可用的策略(buffering, dropping, keeping the lates)使用BackpressureStrategy参数(FlowableonBackpressureBuffer
, onBackpressureDrop
, onBackpressureLatest。
3:只有(最多)一个数据源时,背压就没有问题,因为它可以一直存储,直到下游准备好使用。
操作符的重载
针对特定的类型。
Operator | Overloads |
---|---|
flatMap |
flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap |
concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap |
switchMapSingle , switchMapMaybe , switchMapCompletable |
操作符命名约定
unusable keywords
type erasure
许多期望用户提供返回返回类型的函数的运算符不会被重载,因为围绕Function
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
type ambiguities
比如concatWith有如下几种形式
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
Publisher和SingleSource都显示为函数式接口(一种抽象方法的类型),并可能鼓励用户尝试提供lambda表达式:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
自从rxjava 2.1.10后上面的代码编译期就会报错。至少存在4个concatWith重载,并且编译器发现上面的代码有类型歧义。在这种情况下,用户可能希望将某些计算推迟到someSource完成,因此应该将正确的明确运算符推迟:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有时,添加后缀以避免逻辑歧义,这些歧义可能会编译但会在流中产生错误的类型。
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
当功能接口类型作为类型参数T参与时,这也可能变得模棱两可。 不是很理解
错误处理
数据流可能会失败,这时会向用户发出错误。但是有时候,多个源可能会失败,这时可以选择是否等待所有源完成或失败。为了表明这种处理,许多运算符名称后缀了DelayError字(而其他运算符在其重载之一中带有delayError或delayErrors布尔标志)。
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
Base class vs base type
RxJava 3的设计在很大程度上受Reactive Streams规范的影响,因此,该库为每个反应类型提供了一个类和一个接口:
Type | Class | Interface | Consumer |
---|---|---|---|
0..N backpressured | Flowable |
Publisher |
Subscriber |
0..N unbounded | Observable |
ObservableSource |
Observer |
1 element or error | Single |
SingleSource |
SingleObserver |
0..1 element or error | Maybe |
MaybeSource |
MaybeObserver |
0 element or error | Completable |
CompletableSource |
CompletableObserver |
1 org.reactivestreams.Publisher是外部Reactive Streams库的一部分。通过响应式流规范控制的标准化机制,它是与其他响应式库进行交互的主要类型。
2 接口的命名约定是将Source附加到原来的名称后,由于Publisher是由Reactive Streams库提供的,因此没有FlowableSource(并且对它进行子类型化也不会对互操作有所帮助),但是,这些接口在Reactive Streams规范的意义上不是标准的,并且当前仅针对RxJava。