Observable
在ReactiveX中,Observer(观察者)订阅了Observable(可观察对象)之后,就能收到Observable发射的内容或者数据序列了。这种模型简化了并发操作,因为在等待Observable发出数据时不需要阻塞,而是以观察者的形式创建了一个监听器,准备随时接收上游发出的内容。
本节会围绕下面的问题进行说明:
- 什么是响应模式(reactive pattern)?
- 什么是Observer和Observable?
- Observer是如何订阅Observable的?
下面通过弹珠图(marble diagrams)描述Observables和Observables的转换:
背景知识
在软件运行时,在某些场景下需要程序指令按编写顺序一次一个以增量方式执行和完成。但在ReactiveX中,许多指令可以并行执行,其结果由“Observer”以不确定顺序接收。你可以“Observable”的形式定义一个转换数据的机制,然后“Observer“订阅该机制,通过这样的方式产生的内容会被及时地被“Observer”接收。
这种做法的优点是,当您有一堆彼此不依赖的任务时,可以同时启动所有任务,而不需要当前任务都完成才开始下一个任务。这样这一堆任务的完成时间取决于执行时间最长的任务。
创建观察者
在普通的方法调用中(不是ReactiveX中典型的异步调用),他的流程是这样的:
- 调用指定方法
- 将方法的返回值存储在变量中
- 使用该变量做一些其他操作
在异步场景下流程如下:
- 在观察者上定义一个方法,用来处理异步调用的返回内容。
- 将异步调用定义为Observable(可观察的)。
- 通过订阅将Observer和Observable联系起来(同时也会触发Observable的操作)。
- 然后您可以处理其他任务。 每当调用返回时,Observer开始对Observable发出的内容进行处理。
//定义Observer接收到内容时onNext处理
Consumer<String> onNext = new Consumer<String>() {
@Override
public void accept(String name) {
//处理接收到的内容
}
};
//定义一个Observable
Observable<String> nameObservable = Observable.fromArray("Andy", "Bob", "Lucy");
//Observerd订阅到Observable上,并调用Observable
nameObservable.subscribe(onNext);
//处理其他事
onNext,onCompleted和onError
通过subscribe方法将Observer和Observavle联系起来的,Observer需要实现下面方法或者其中的一部分:
- onNext(T item)
Observable发射数据时都会调用这个方法,onNext方法会将Observable发出的数据作为参数。
- onError(Exception e)
Observable未能按预期生成数据或遇到了其他错误时调用此方法。之后也不会再调用onNext和 onComplete方法。onError方法会将错误的原因作为其参数。
- onComplete()
Observable最后一次调用onNext之后,在没有错误发生的情况下会调用这个方法。
按照Observeable的规则,它可以调用onNext零次或多次,然后可以在这些调用之后调用onCompleted或onError,但只能调用其中之一,而且这将是它的最后一次调用。
完整的订阅如下所示:
Observable<String> nameObservable = Observable.fromArray("Andy", "Bob", "Lucy");
Consumer<String> onNext = new Consumer<String>() {
@Override
public void accept(String name) {
//处理收到的内容
}
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//异常发生之后的处理
}
};
Action onComplete = new Action() {
@Override
public void run() throws Exception {
//上游内容发送完成之后的处理
}
};
nameObservable.subscribe(onNext, onError, onComplete);
解除订阅
在一些ReactiveX的实现中会提供专门的Subscriber接口,接口提供了unsubscribe方法(ps:在Rxjava的实现中,提供的是Disposable接口,可以通过dispose来取消订阅)。通过调用此方法告知Observable自己不再需要接收上游的内容了。当Observable的Observer全都取消订阅,Observable就不会在发射或产生新的内容了。
取消订阅会通过Observable用到的运算符调用链向后逐级返回。同时调用链每个环节都停止发射内容,但这可能不会立即生效,即使Observer取消了订阅,上游的还有数据发送还是可能继续发射。
关于名词的说明
对于实现了ReactiveX编程语言,尽管他们之间有很多共通之处,由于语言本身的命名习惯或表达上的差异,导致在命名上并没有一个固定的标准。
“Hot” and “Cold” Observable
作为一个Observable,他是什么时候开始发射自己的数据序列的呢?围绕这个问题,可以分为两类:
- Hot Observable:从以创建就开始发射数据了,任何中途开始订阅的Observer都能收到数据了。
Cold Observable:等到有Observer订阅之后才开始发射数据,Observer能接收到整个数据序列。
通过Observable操作符进行组合
Observable和Observer作为ReactiveX最基础的一环,只不过是在观察者模式上的一个扩展,让他更适合处理一系列的事件。
ReactiveX的精髓在于他提供的丰富的操作符,通过这些操作符可以轻松的转换、组合、操纵和处理从Observable发射的内容或序列。
这些运算符能以声明的方式将异步序列组合在一起,同时具有回调的效率优势,但没有异步任务通过回调嵌套进行关联的缺点。
下面是对Observable操作符的一个分类:创建性
Create
Defer
Empty
/Never
/Throw
From
Interval
Just
Range
Repeat
Start
Timer
- 转换型
Buffer
FlatMap
GroupBy
Map
Scan
Window
- 筛选型
Debounce
Distinct
ElementAt
Filter
First
IgnoreElements
Last
Sample
Skip
SkipLast
Take
TakeLast
- 组合型
And
/Then
/When
CombineLatest
Join
Merge
StartWith
Switch
Zip
- 错误处理操作符
Catch
Retry
- 通用操作符
Delay
Do
Materialize
/Dematerialize
ObserveOn
Serialize
Subscribe
SubscribeOn
TimeInterval
Timeout
Timestamp
Using
- 条件运算符和布尔运算符
All
Amb
Contains
DefaultIfEmpty
SequenceEqual
SkipUntil
SkipWhile
TakeUntil
TakeWhile
- 数学运算符和聚合运算符
Average
Concat
Count
Max
Min
Reduce
Sum
- 转换操作符
To
- Observable连接操作符
Connect
Publish
RefCount
Replay
- 背压操作符
用于执行特定流量控制策略
上述列举一些运算符相关的操作符,并不是ReactiveX核心的一部分,在一些语言中选择性的实现了。
链式运算符
一些操作符会对Observable进行操作再返回另一个Observable,操作符通过这种方式链式地进行调用,从上一个操作符得到一个Observable进行修改再传给写一个操作符,而不是在最初的Observable进行修改保证了逻辑上的整洁。