- 简介
- 操作符分类
- 如何选择操作符
- 我要创建一个新的Observable
- 我要通过其他的Observable组合新的Observable
- 我想让Observable发射出来的数据完成一些转换再发射出去
- 我想让Observable经过延时在发射数据
- 我想让发射的内容和通知都转换为内容再重新发送
- 我只想收到onCompelete/onError的通知
- 我想要镜像的复制一个Observable并在他发射的内容上做一些处理
- 我想把Observable发射的内容收集起来打包发射
- 我想把Observable拆分为多个
- 我想取某一个指定的内容
- 我想取某一些指定的内容
- 我想多个Observable中最先发射数据的Observable发射数据,其他的丢弃
- 我想对序列中所有的值进行一个评估
- 我想把所有发射的内容变成另一种结构发射
- 我想把任务安排到指定的调度器上
- 我想让Observable在指定的时间执行一些操作
- 我想让Observable发一个异常(onError)的通知
- 我想在Observable在结束前返回
Future
阻塞代码块重的数据 - 我想在Observer订阅后再开始发射数据
- 所有操作符(按首字母排序,点击可直接查看官方说明)
- 参考资料
简介
ReactiveX的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。
本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面还有一个决策树用于帮助你根据具体的场景选择合适的操作符。最后有一个语言特定实现的按字母排序的操作符列表。
链式操作符
一些操作符会对Observable进行操作再返回另一个Observable,操作符通过这种方式链式地进行调用,从上一个操作符得到一个Observable进行修改再传给写一个操作符,而不是在最初的Observable进行修改保证了逻辑上的整洁。
在建造者模式(Builder Pattern)中,特定的类通过一系列的方法链式地调用修改对象的内容。在建造者模式中方法调用的顺序是不影响的结果的,而在Observable上操作符的顺序是非常重要的。
ReactiveX中的操作符
本节要点:
- 列出ReactiveX中的核心操作符,描述这些操作符的工作方式以及在特定语言中如何实现。
- 如何选择最合适的操作符
- 所有支持的操作符以及在不同语言中的实现
操作符分类
创建型操作符链接
下面列举了所有创建Observable的操作符。
Create
—— 通过函数创建Observable。Defer
—— Observable被订阅时才创建,并且为每个Observer创建一个新的Observable。Empty
/Never
/Throw
—— 创建指定功能的Observable,通常功能非常有限。From
—— 将一些对象或数据结构转换为Observable。Interval
—— 创建一个按照给定的时间间隔发射整数序列的Observable。**Just**
—— 将一个或多个对象转换成发射这个或这些对象的一个Observable。**Range**
—— 创建一个发射指定范围的整数序列的Observable。**Start**
—— 创建一个发射指定方法返回值的Observable。**Timer**
—— 创建一个延时发射内容的Observable。转换型操作符
对Observable发射的数据进行转换的操作符
**Buffer**
—— 将Observable发射的数据进行周期性地打包发射。**FlatMap**
—— 将Observable发射的每个数据转换为Observable,再将这些Obserable展平放到一个Observable中进行发射。**GroupBy**
—— 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。**Map**
—— 将Observable发射的每个数据按指定的方法转换为另一种类型。**Scan**
—— 对Observable发射的每个数据进行函数调用,然后将返回值按顺序发射。**window**
—— 将Observable发射的数据按“window”大小形成一个子集,然后将每个子集(和Buffer
不一样的点)放进一个Observable。过滤型操作符
对Observable发射的数据选择性地向下游发射
**Debounce**
—— 按指定的间隔时间发射数据,时间间隔中的数据发射会被过滤掉。**Distinct**
—— 过滤重复数据项。**ElementAt**
—— 只发射指定位置的内容。**Filter**
—— 只发射符合判断条件的内容。**First**
—— 只发射第一项(或者满足某个条件的第一项)数据。**IgnoreElements**
—— 拒绝除了终止内容(onComplete和onError)以外的内容通过。**Last**
—— 只发射最后一项数据。**Sample**
—— 定期发射Observable最近一次发射的内容。**Skip**
—— 阻止Observable前N项内容的发射。**SkipLast**
—— 阻止Observable末尾N项内容的发射。**Take**
—— 只允许Observable前N项内容的发射。**TakeLast**
—— 只允许Observable末尾N项内容的发射。组合型操作符
将多个Observable组合为一个Observable
And
/Then
/When
—— 通过模式(Pattern)和计划(Plan)组合多个Observables发射的数据集合。**CombineLatest**
—— 当两个Observables中的一个发射数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。**Join**
—— 在一个Obserable定义的“window”范围内,另一个Observable发射的内容会组合成新的内容。**Merge**
—— 将多个Observable发射的内容合并为一个。**StartWith**
—— 在发射内容最前面新增一段内容或序列**Switch**
—— 将一个发射多个Observable的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。**Zip**
—— 通过指定函数将多个Observable发射的内容新的内容,再将每个内容发射出去。异常处理操作符
处理异常的操作符
**Catch**
—— 捕获Obserable的异常(onError)通知,替换为其他内容。使得Observable能正常停止或不停止。**Retry**
—— 如果Observable遇到错误(onError),尝试重新订阅它期望达到正常终止的目的。辅助操作符
Observable的一些通用的操作符
Delay
—— 延时Observable发送的内容。**Do**
—— 在Observable的生命周期上插入一下动作。Materialize
/Dematerialize
—— Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。**ObserveOn**
—— 指定Observer工作的调度器。**Serialize**
—— 强制一个Observable连续调用并保证行为正确。**Subscribe**
—— 操作来自Observable发射的数据。**SubscribeOn**
—— 指定Observable工作的调度器。**TimeInterval**
—— 将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable。**TimeOut**
—— 对Observable进行镜像,如果指定时间内没有发送数据,会发射错误通知(onError)。**Timestamp**
—— 给每个发送的内容打上时间戳。**Using**
—— 创建一个只在Observable生命周期内存在的一次性资源。条件和布尔操作符
根据条件发射或变换Observable
**All**
—— 判断是否所有发射的数据都满足条件。**Amb**
—— 多个Observable,只允许第一个发射数据的Obserable发射全部数据。**Contains**
—— 判断Observable是否会发射指定的数据。**DefaultIfEmpty**
—— 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据。**SequenceEqual**
—— 判断两个Observable是否会发射相同的数据或序列。**SkipUntil**
—— 一个Observable会在另一个Observabel发射了数据时后才开始发射数据,之前的发射全部丢弃。**SkipWhile**
—— 一个Observable会在指定条件返回false之后才会发射数据,之前的发射全部丢弃。**TakeUntil**
—— 一个Observable会在另一个Observabel发射了数据或终止后丢弃数据。**TakeWhile**
—— 一个Observable会在指定条件返回false之后丢弃数据。算术和聚合操作符
作用域整个序列上的操作符
Average
—— 计算原始Observable发射数字的平均值并发射它。**Concat**
—— 不交叉的发射两个或多个Observable的内容。**Count**
—— 计算原始Observable发射内容的数量并发射它。**Max**
—— 计算原始Observable发射内容的最大值并发射它。**Min**
—— 计算原始Observable发射内容的最小值并发射它。**Reduce**
—— 按顺序对Observable发射的每项数据用一个函数计算,并发射最返回值。**Sum**
—— 计算原始Observable发射内容的总和并发射它。背压操作符
用于处理Observable发射数据速度大于Observer处理的速度的策略
Observable连接操作符
对与一些特殊的Observable提供更精确动态订阅控制
Connect
—— 让可连接的Observable开始发射数据给Observer。**Publish**
—— 把普通的Observable转换为一个可连接的Observable。**RefCount**
—— 把可连接的Observable转换为一个普通的Observable。**Replay**
—— 保证所有的Observer收到相同的数据序列,即使它们在Observable开始发射数据后才订阅。Observable转换操作符
**To**
—— 将Observable转换为另一个对象或数据结构如何选择操作符
我要创建一个新的Observable
- 发射指定的单个内容:
Just
- 该内容是在订阅时调用函数返回:
Start
- 该内容是在订阅时从Action,Callable,Runable或他们的组合中返回的:
From
- 指定延时发射:
Timer
- 该内容是在订阅时调用函数返回:
- 发射的内容来源于数组,迭代器或者类似的源头:
From
- 从
Future
中获取:Start
- 从
Future
中获取序列:From
- 重复地发送一个序列:
Repeat
- 根据给定的方法或逻辑创建:
Create
- 为订阅的每个Observer创建:
Defer
- 发射整数序列:
Range
- 在固定的间隔时间发射:
Interval
- 指定发射时延:
Delay
- 指定发射时延:
- 在固定的间隔时间发射:
- 只发送结束通知:
Empty
- 什么也不做:
Never
我要通过其他的Observable组合新的Observable
- 发射所有的Observable发射的数据,而且不用考虑收到数据的顺序:
Merge
- 发射所有的Observable发射的数据,但是每个Observable发射的内容不能交叉:
Concat
- 按顺序组合所有的Observable发射的数据,将产生的新数据发射到下游
- 每个Obserable都发射了一个数据:
Zip
- 只要任意一个Obserable发射了一个数据:
CombineLatest
- 其中一个Obserable的窗口期另一个Obserable发射了数据:
Join
- 按指定的
Pattern
或Plan
组合数据:And
/Then
/When
- 每个Obserable都发射了一个数据:
-
我想让Observable发射出来的数据完成一些转换再发射出去
一次转换一个(一对一的关系):
Map
- 把所有的数据按规定转换再发出去(一对N的关系):
Flatmap
- Obserable的数据发射不能交叉:
ConcatMap
- Obserable的数据发射不能交叉:
- 转换基于之前发射的内容:
Scan
- 添加额外的时间戳:
Timestamp
-
我想让Observable经过延时在发射数据
-
我想让发射的内容和通知都转换为内容再重新发送
将他们都封装成一个通知的类型:
Materialize
将封装成通知的类型转换为内容和通知:
Dematerialize
我只想收到onCompelete/onError的通知
-
我想要镜像的复制一个Observable并在他发射的内容上做一些处理
在他之前先发送一些内容:
StartWith
-
我想把Observable发射的内容收集起来打包发射
Buffer
-
我想把Observable拆分为多个
拆分为多个:
Window
-
我想取某一个指定的内容
最后一个:
Last
- 第一个:
First
-
我想取某一些指定的内容
满足指定的条件:
Filter
- 第一个:
First
- 位于起始位置的几个:
Take
- 最后一个:
Last
- 指定位置:
ElementAt
- 指定条件之后的内容
- 起始位置N之后的内容:
Skip
- 从第一个满足指定条件的位置之后的内容:
SkipWhile
- 在另外一个Observable发射了一个数据之后,第一个Observable发射的内容:
SkipUntil
- 起始位置N之后的内容:
- 指定条件之前的内容
- 倒第N个位置之前的内容:
SkipLast
- 从第一个满足指定条件的位置之前的内容:
TakeWhile
- 在另外一个Observable发射了一个数据之前,第一个Observable发射的内容:
SkipUntil
- 倒第N个位置之前的内容:
- 定时取样:
Sample
- 如果在数据发射一定间隔内没有其他数据发射:
Debounce
- 过滤掉重复项:
Distinct
- if they immediately follow the item they are duplicates of:
DistinctUntilChanged
- if they immediately follow the item they are duplicates of:
by delaying my subscription to it for some time after it begins emitting items:
DelaySubscription
我想多个Observable中最先发射数据的Observable发射数据,其他的丢弃
-
我想对序列中所有的值进行一个评估
判断是否所有内容都满足某个条件:
All
- 判断是否存在满足某个条件的内容:
Contains
- 判断要发送的内容是不是空的:
IsEmpty
- 判断两个Observable的序列是否一致:
SequenceEqual
- 平均值:
Average
- 求和:
Sum
- 计数:
Count
- 最大值:
Max
- 最小值:
Min
-
我想把所有发射的内容变成另一种结构发射
-
我想把任务安排到指定的调度器上
Observable的调度器:
SubscribeOn
-
我想让Observable在指定的时间执行一些操作
-
我想让Observable发一个异常(onError)的通知
Throw
-
我想在Observable在结束前返回
Future
阻塞代码块重的数据 -
我想在Observer订阅后再开始发射数据
Publish
- 只发送序列的最后一个:
PublishLast
- 保证所有的Observer都能收到整个序列:
Replay
- but I want it to go away once all of its subscribers unsubscribe:
RefCount
- and then I want to ask it to start:
Connect
所有操作符(按首字母排序,点击可直接查看官方说明)
Aggregate
All
Amb
ambArray
ambWith
and_
And
Any
apply
as_blocking
asObservable
AssertEqual
asyncAction
asyncFunc
Average
averageDouble
averageFloat
averageInteger
averageLong
blocking
blockingFirst
blockingForEach
blockingIterable
blockingLast
blockingLatest
blockingMostRecent
blockingNext
blockingSingle
blockingSubscribe
Buffer
bufferWithCount
bufferWithTime
bufferWithTimeOrCount
byLine
cache
cacheWithInitialCapacity
case
Cast
Catch
catchError
catchException
collect
collect
(RxScala version ofFilter
)collectInto
CombineLatest
combineLatestDelayError
combineLatestWith
Concat
concat_all
concatAll
concatArray
concatArrayDelayError
concatArrayEager
concatDelayError
concatEager
concatMap
concatMapDelayError
concatMapEager
concatMapEagerDelayError
concatMapIterable
concatMapObserver
concatMapTo
concatWith
Connect
connect_forever
cons
Contains
controlled
Count
countLong
Create
cycle
Debounce
decode
DefaultIfEmpty
Defer
deferFuture
Delay
delaySubscription
delayWithSelector
Dematerialize
Distinct
distinctKey
distinctUntilChanged
distinctUntilKeyChanged
Do
doAction
doAfterTerminate
doOnComplete
doOnCompleted
doOnDispose
doOnEach
doOnError
doOnLifecycle
doOnNext
doOnRequest
doOnSubscribe
doOnTerminate
doOnUnsubscribe
doseq
doWhile
drop
dropRight
dropUntil
dropWhile
ElementAt
ElementAtOrDefault
Empty
emptyObservable
empty?
encode
ensures
error
every
exclusive
exists
expand
failWith
Filter
filterNot
Finally
finallyAction
finallyDo
find
findIndex
First
firstElement
FirstOrDefault
firstOrElse
FlatMap
flatMapFirst
flatMapIterable
flatMapIterableWith
flatMapLatest
flatMapObserver
flatMapWith
flatMapWithMaxConcurrent
flat_map_with_index
flatten
flattenDelayError
foldl
foldLeft
for
forall
ForEach
forEachFuture
forEachWhile
forIn
forkJoin
From
fromAction
fromArray
FromAsyncPattern
fromCallable
fromCallback
FromEvent
FromEventPattern
fromFunc0
fromFuture
fromIterable
fromIterator
from_list
fromNodeCallback
fromPromise
fromPublisher
fromRunnable
Generate
generateWithAbsoluteTime
generateWithRelativeTime
generator
GetEnumerator
getIterator
GroupBy
GroupByUntil
GroupJoin
head
headOption
headOrElse
if
ifThen
IgnoreElements
indexOf
interleave
interpose
Interval
intervalRange
into
isEmpty
items
Join
join
(string)jortSort
jortSortUntil
Just
keep
keep-indexed
Last
lastElement
lastOption
LastOrDefault
lastOrElse
Latest
latest
(Rx.rb version ofSwitch
)length
let
letBind
lift
limit
LongCount
ManySelect
Map
map
(RxClojure version ofZip
)MapCat
mapCat
(RxClojure version ofZip
)map-indexed
mapTo
mapWithIndex
Materialize
Max
MaxBy
Merge
mergeAll
mergeArray
mergeArrayDelayError
merge_concurrent
mergeDelayError
mergeObservable
mergeWith
Min
MinBy
MostRecent
Multicast
multicastWithSelector
nest
Never
Next
Next
(BlockingObservable version)none
nonEmpty
nth
ObserveOn
ObserveOnDispatcher
observeSingleOn
of
of_array
ofArrayChanges
of_enumerable
of_enumerator
ofObjectChanges
OfType
ofWithScheduler
onBackpressureBlock
onBackpressureBuffer
onBackpressureDrop
OnErrorResumeNext
onErrorReturn
onErrorReturnItem
onExceptionResumeNext
onTerminateDetach
orElse
pairs
pairwise
partition
partition-all
pausable
pausableBuffered
pluck
product
Publish
PublishLast
publish_synchronized
publishValue
raise_error
Range
Reduce
reduceWith
reductions
RefCount
Repeat
repeat_infinitely
repeatUntil
repeatWhen
Replay
rescue_error
rest
Retry
retry_infinitely
retryUntil
retryWhen
Return
returnElement
returnValue
runAsync
safeSubscribe
Sample
Scan
scanWith
scope
Select
(alternate name ofMap
)select
(alternate name ofFilter
)selectConcat
selectConcatObserver
SelectMany
selectManyObserver
select_switch
selectSwitch
selectSwitchFirst
selectWithMaxConcurrent
select_with_index
seq
SequenceEqual
sequence_eql?
SequenceEqualWith
Serialize
share
shareReplay
shareValue
Single
singleElement
SingleOrDefault
singleOption
singleOrElse
size
Skip
SkipLast
skipLastWithTime
SkipUntil
skipUntilWithTime
SkipWhile
skipWhileWithIndex
skip_with_time
slice
sliding
slidingBuffer
some
sort
sorted
sort-by
sorted-list-by
split
split-with
Start
startAsync
startFuture
StartWith
startWithArray
stringConcat
stopAndWait
subscribe
subscribeActual
SubscribeOn
SubscribeOnDispatcher
subscribeOnCompleted
subscribeOnError
subscribeOnNext
subscribeWith
Sum
sumDouble
sumFloat
sumInteger
sumLong
Switch
switchCase
switchIfEmpty
switchLatest
switchMap
switchMapDelayError
switchOnNext
switchOnNextDelayError
Synchronize
Take
take_with_time
takeFirst
TakeLast
takeLastBuffer
takeLastBufferWithTime
takeLastWithTime
takeRight
(see also:TakeLast
)TakeUntil
takeUntilWithTime
TakeWhile
takeWhileWithIndex
tail
tap
tapOnCompleted
tapOnError
tapOnNext
Then
thenDo
Throttle
throttleFirst
throttleLast
throttleWithSelector
throttleWithTimeout
Throw
throwError
throwException
TimeInterval
Timeout
timeoutWithSelector
Timer
Timestamp
To
to_a
ToArray
ToAsync
toBlocking
toBuffer
to_dict
ToDictionary
ToEnumerable
ToEvent
ToEventPattern
ToFlowable
ToFuture
to_h
toIndexedSeq
toIterable
toIterator
ToList
ToLookup
toMap
toMultiMap
ToObservable
toSet
toSortedList
toStream
ToTask
toTraversable
toVector
tumbling
tumblingBuffer
unsafeCreate
unsubscribeOn
Using
When
Where
while
whileDo
Window
windowWithCount
windowWithTime
windowWithTimeOrCount
windowed
withFilter
withLatestFrom
Zip
zipArray
zipIterable
zipWith
zipWithIndex
++
+:
:+
参考资料
http://reactivex.io/documentation/operators.html