- 简介
- 操作符分类
- 如何选择操作符
- 我要创建一个新的Observable
- 我要通过其他的Observable组合新的Observable
- 我想让Observable发射出来的数据完成一些转换再发射出去
- 我想让Observable经过延时在发射数据
- 我想让发射的内容和通知都转换为内容再重新发送
- 我只想收到onCompelete/onError的通知
- 我想要镜像的复制一个Observable并在他发射的内容上做一些处理
- 我想把Observable发射的内容收集起来打包发射
- 我想把Observable拆分为多个
- 我想取某一个指定的内容
- 我想取某一些指定的内容
- 我想多个Observable中最先发射数据的Observable发射数据,其他的丢弃
- 我想对序列中所有的值进行一个评估
- 我想把所有发射的内容变成另一种结构发射
- 我想把任务安排到指定的调度器上
- 我想让Observable在指定的时间执行一些操作
- 我想让Observable发一个异常(onError)的通知
- 我想在Observable在结束前返回
Future阻塞代码块重的数据 - 我想在Observer订阅后再开始发射数据
- 所有操作符(按首字母排序,点击可直接查看官方说明)
- 参考资料
简介
ReactiveX的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。
本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面还有一个决策树用于帮助你根据具体的场景选择合适的操作符。最后有一个语言特定实现的按字母排序的操作符列表。
链式操作符
一些操作符会对Observable进行操作再返回另一个Observable,操作符通过这种方式链式地进行调用,从上一个操作符得到一个Observable进行修改再传给写一个操作符,而不是在最初的Observable进行修改保证了逻辑上的整洁。
在建造者模式(Builder Pattern)中,特定的类通过一系列的方法链式地调用修改对象的内容。在建造者模式中方法调用的顺序是不影响的结果的,而在Observable上操作符的顺序是非常重要的。
ReactiveX中的操作符
本节要点:
- 列出ReactiveX中的核心操作符,描述这些操作符的工作方式以及在特定语言中如何实现。
- 如何选择最合适的操作符
- 所有支持的操作符以及在不同语言中的实现
操作符分类
创建型操作符链接
下面列举了所有创建Observable的操作符。
Create—— 通过函数创建Observable。Defer—— Observable被订阅时才创建,并且为每个Observer创建一个新的Observable。Empty/Never/Throw—— 创建指定功能的Observable,通常功能非常有限。From—— 将一些对象或数据结构转换为Observable。Interval—— 创建一个按照给定的时间间隔发射整数序列的Observable。**Just**—— 将一个或多个对象转换成发射这个或这些对象的一个Observable。**Range**—— 创建一个发射指定范围的整数序列的Observable。**Start**—— 创建一个发射指定方法返回值的Observable。**Timer**—— 创建一个延时发射内容的Observable。转换型操作符
对Observable发射的数据进行转换的操作符
**Buffer**—— 将Observable发射的数据进行周期性地打包发射。**FlatMap**—— 将Observable发射的每个数据转换为Observable,再将这些Obserable展平放到一个Observable中进行发射。**GroupBy**—— 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。**Map**—— 将Observable发射的每个数据按指定的方法转换为另一种类型。**Scan**—— 对Observable发射的每个数据进行函数调用,然后将返回值按顺序发射。**window**—— 将Observable发射的数据按“window”大小形成一个子集,然后将每个子集(和Buffer不一样的点)放进一个Observable。过滤型操作符
对Observable发射的数据选择性地向下游发射
**Debounce**—— 按指定的间隔时间发射数据,时间间隔中的数据发射会被过滤掉。**Distinct**—— 过滤重复数据项。**ElementAt**—— 只发射指定位置的内容。**Filter**—— 只发射符合判断条件的内容。**First**—— 只发射第一项(或者满足某个条件的第一项)数据。**IgnoreElements**—— 拒绝除了终止内容(onComplete和onError)以外的内容通过。**Last**—— 只发射最后一项数据。**Sample**—— 定期发射Observable最近一次发射的内容。**Skip**—— 阻止Observable前N项内容的发射。**SkipLast**—— 阻止Observable末尾N项内容的发射。**Take**—— 只允许Observable前N项内容的发射。**TakeLast**—— 只允许Observable末尾N项内容的发射。组合型操作符
将多个Observable组合为一个Observable
And/Then/When—— 通过模式(Pattern)和计划(Plan)组合多个Observables发射的数据集合。**CombineLatest**—— 当两个Observables中的一个发射数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。**Join**—— 在一个Obserable定义的“window”范围内,另一个Observable发射的内容会组合成新的内容。**Merge**—— 将多个Observable发射的内容合并为一个。**StartWith**—— 在发射内容最前面新增一段内容或序列**Switch**—— 将一个发射多个Observable的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。**Zip**—— 通过指定函数将多个Observable发射的内容新的内容,再将每个内容发射出去。异常处理操作符
处理异常的操作符
**Catch**—— 捕获Obserable的异常(onError)通知,替换为其他内容。使得Observable能正常停止或不停止。**Retry**—— 如果Observable遇到错误(onError),尝试重新订阅它期望达到正常终止的目的。辅助操作符
Observable的一些通用的操作符
Delay—— 延时Observable发送的内容。**Do**—— 在Observable的生命周期上插入一下动作。Materialize/Dematerialize—— Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。**ObserveOn**—— 指定Observer工作的调度器。**Serialize**—— 强制一个Observable连续调用并保证行为正确。**Subscribe**—— 操作来自Observable发射的数据。**SubscribeOn**—— 指定Observable工作的调度器。**TimeInterval**—— 将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable。**TimeOut**—— 对Observable进行镜像,如果指定时间内没有发送数据,会发射错误通知(onError)。**Timestamp**—— 给每个发送的内容打上时间戳。**Using**—— 创建一个只在Observable生命周期内存在的一次性资源。条件和布尔操作符
根据条件发射或变换Observable
**All**—— 判断是否所有发射的数据都满足条件。**Amb**—— 多个Observable,只允许第一个发射数据的Obserable发射全部数据。**Contains**—— 判断Observable是否会发射指定的数据。**DefaultIfEmpty**—— 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据。**SequenceEqual**—— 判断两个Observable是否会发射相同的数据或序列。**SkipUntil**—— 一个Observable会在另一个Observabel发射了数据时后才开始发射数据,之前的发射全部丢弃。**SkipWhile**—— 一个Observable会在指定条件返回false之后才会发射数据,之前的发射全部丢弃。**TakeUntil**—— 一个Observable会在另一个Observabel发射了数据或终止后丢弃数据。**TakeWhile**—— 一个Observable会在指定条件返回false之后丢弃数据。算术和聚合操作符
作用域整个序列上的操作符
Average—— 计算原始Observable发射数字的平均值并发射它。**Concat**—— 不交叉的发射两个或多个Observable的内容。**Count**—— 计算原始Observable发射内容的数量并发射它。**Max**—— 计算原始Observable发射内容的最大值并发射它。**Min**—— 计算原始Observable发射内容的最小值并发射它。**Reduce**—— 按顺序对Observable发射的每项数据用一个函数计算,并发射最返回值。**Sum**—— 计算原始Observable发射内容的总和并发射它。背压操作符
用于处理Observable发射数据速度大于Observer处理的速度的策略
Observable连接操作符
对与一些特殊的Observable提供更精确动态订阅控制
Connect—— 让可连接的Observable开始发射数据给Observer。**Publish**—— 把普通的Observable转换为一个可连接的Observable。**RefCount**—— 把可连接的Observable转换为一个普通的Observable。**Replay**—— 保证所有的Observer收到相同的数据序列,即使它们在Observable开始发射数据后才订阅。Observable转换操作符
**To**—— 将Observable转换为另一个对象或数据结构如何选择操作符
我要创建一个新的Observable
- 发射指定的单个内容:
Just- 该内容是在订阅时调用函数返回:
Start - 该内容是在订阅时从Action,Callable,Runable或他们的组合中返回的:
From - 指定延时发射:
Timer
- 该内容是在订阅时调用函数返回:
- 发射的内容来源于数组,迭代器或者类似的源头:
From - 从
Future中获取:Start - 从
Future中获取序列:From - 重复地发送一个序列:
Repeat - 根据给定的方法或逻辑创建:
Create - 为订阅的每个Observer创建:
Defer - 发射整数序列:
Range- 在固定的间隔时间发射:
Interval- 指定发射时延:
Delay
- 指定发射时延:
- 在固定的间隔时间发射:
- 只发送结束通知:
Empty - 什么也不做:
Never
我要通过其他的Observable组合新的Observable
- 发射所有的Observable发射的数据,而且不用考虑收到数据的顺序:
Merge - 发射所有的Observable发射的数据,但是每个Observable发射的内容不能交叉:
Concat - 按顺序组合所有的Observable发射的数据,将产生的新数据发射到下游
- 每个Obserable都发射了一个数据:
Zip - 只要任意一个Obserable发射了一个数据:
CombineLatest - 其中一个Obserable的窗口期另一个Obserable发射了数据:
Join - 按指定的
Pattern或Plan组合数据:And/Then/When
- 每个Obserable都发射了一个数据:
-
我想让Observable发射出来的数据完成一些转换再发射出去
一次转换一个(一对一的关系):
Map- 把所有的数据按规定转换再发出去(一对N的关系):
Flatmap- Obserable的数据发射不能交叉:
ConcatMap
- Obserable的数据发射不能交叉:
- 转换基于之前发射的内容:
Scan - 添加额外的时间戳:
Timestamp -
我想让Observable经过延时在发射数据
-
我想让发射的内容和通知都转换为内容再重新发送
将他们都封装成一个通知的类型:
Materialize将封装成通知的类型转换为内容和通知:
Dematerialize我只想收到onCompelete/onError的通知
-
我想要镜像的复制一个Observable并在他发射的内容上做一些处理
在他之前先发送一些内容:
StartWith-
我想把Observable发射的内容收集起来打包发射
Buffer-
我想把Observable拆分为多个
拆分为多个:
Window-
我想取某一个指定的内容
最后一个:
Last- 第一个:
First -
我想取某一些指定的内容
满足指定的条件:
Filter- 第一个:
First - 位于起始位置的几个:
Take - 最后一个:
Last - 指定位置:
ElementAt - 指定条件之后的内容
- 起始位置N之后的内容:
Skip - 从第一个满足指定条件的位置之后的内容:
SkipWhile - 在另外一个Observable发射了一个数据之后,第一个Observable发射的内容:
SkipUntil
- 起始位置N之后的内容:
- 指定条件之前的内容
- 倒第N个位置之前的内容:
SkipLast - 从第一个满足指定条件的位置之前的内容:
TakeWhile - 在另外一个Observable发射了一个数据之前,第一个Observable发射的内容:
SkipUntil
- 倒第N个位置之前的内容:
- 定时取样:
Sample - 如果在数据发射一定间隔内没有其他数据发射:
Debounce - 过滤掉重复项:
Distinct- if they immediately follow the item they are duplicates of:
DistinctUntilChanged
- if they immediately follow the item they are duplicates of:
by delaying my subscription to it for some time after it begins emitting items:
DelaySubscription我想多个Observable中最先发射数据的Observable发射数据,其他的丢弃
-
我想对序列中所有的值进行一个评估
判断是否所有内容都满足某个条件:
All- 判断是否存在满足某个条件的内容:
Contains - 判断要发送的内容是不是空的:
IsEmpty - 判断两个Observable的序列是否一致:
SequenceEqual - 平均值:
Average - 求和:
Sum - 计数:
Count - 最大值:
Max - 最小值:
Min -
我想把所有发射的内容变成另一种结构发射
-
我想把任务安排到指定的调度器上
Observable的调度器:
SubscribeOn-
我想让Observable在指定的时间执行一些操作
-
我想让Observable发一个异常(onError)的通知
Throw-
我想在Observable在结束前返回
Future阻塞代码块重的数据 -
我想在Observer订阅后再开始发射数据
Publish- 只发送序列的最后一个:
PublishLast - 保证所有的Observer都能收到整个序列:
Replay - but I want it to go away once all of its subscribers unsubscribe:
RefCount - and then I want to ask it to start:
Connect所有操作符(按首字母排序,点击可直接查看官方说明)
AggregateAllAmbambArrayambWithand_AndAnyapplyas_blockingasObservableAssertEqualasyncActionasyncFuncAverageaverageDoubleaverageFloataverageIntegeraverageLongblockingblockingFirstblockingForEachblockingIterableblockingLastblockingLatestblockingMostRecentblockingNextblockingSingleblockingSubscribeBufferbufferWithCountbufferWithTimebufferWithTimeOrCountbyLinecachecacheWithInitialCapacitycaseCastCatchcatchErrorcatchExceptioncollectcollect(RxScala version ofFilter)collectIntoCombineLatestcombineLatestDelayErrorcombineLatestWithConcatconcat_allconcatAllconcatArrayconcatArrayDelayErrorconcatArrayEagerconcatDelayErrorconcatEagerconcatMapconcatMapDelayErrorconcatMapEagerconcatMapEagerDelayErrorconcatMapIterableconcatMapObserverconcatMapToconcatWithConnectconnect_foreverconsContainscontrolledCountcountLongCreatecycleDebouncedecodeDefaultIfEmptyDeferdeferFutureDelaydelaySubscriptiondelayWithSelectorDematerializeDistinctdistinctKeydistinctUntilChangeddistinctUntilKeyChangedDodoActiondoAfterTerminatedoOnCompletedoOnCompleteddoOnDisposedoOnEachdoOnErrordoOnLifecycledoOnNextdoOnRequestdoOnSubscribedoOnTerminatedoOnUnsubscribedoseqdoWhiledropdropRightdropUntildropWhileElementAtElementAtOrDefaultEmptyemptyObservableempty?encodeensureserroreveryexclusiveexistsexpandfailWithFilterfilterNotFinallyfinallyActionfinallyDofindfindIndexFirstfirstElementFirstOrDefaultfirstOrElseFlatMapflatMapFirstflatMapIterableflatMapIterableWithflatMapLatestflatMapObserverflatMapWithflatMapWithMaxConcurrentflat_map_with_indexflattenflattenDelayErrorfoldlfoldLeftforforallForEachforEachFutureforEachWhileforInforkJoinFromfromActionfromArrayFromAsyncPatternfromCallablefromCallbackFromEventFromEventPatternfromFunc0fromFuturefromIterablefromIteratorfrom_listfromNodeCallbackfromPromisefromPublisherfromRunnableGenerategenerateWithAbsoluteTimegenerateWithRelativeTimegeneratorGetEnumeratorgetIteratorGroupByGroupByUntilGroupJoinheadheadOptionheadOrElseififThenIgnoreElementsindexOfinterleaveinterposeIntervalintervalRangeintoisEmptyitemsJoinjoin(string)jortSortjortSortUntilJustkeepkeep-indexedLastlastElementlastOptionLastOrDefaultlastOrElseLatestlatest(Rx.rb version ofSwitch)lengthletletBindliftlimitLongCountManySelectMapmap(RxClojure version ofZip)MapCatmapCat(RxClojure version ofZip)map-indexedmapTomapWithIndexMaterializeMaxMaxByMergemergeAllmergeArraymergeArrayDelayErrormerge_concurrentmergeDelayErrormergeObservablemergeWithMinMinByMostRecentMulticastmulticastWithSelectornestNeverNextNext(BlockingObservable version)nonenonEmptynthObserveOnObserveOnDispatcherobserveSingleOnofof_arrayofArrayChangesof_enumerableof_enumeratorofObjectChangesOfTypeofWithScheduleronBackpressureBlockonBackpressureBufferonBackpressureDropOnErrorResumeNextonErrorReturnonErrorReturnItemonExceptionResumeNextonTerminateDetachorElsepairspairwisepartitionpartition-allpausablepausableBufferedpluckproductPublishPublishLastpublish_synchronizedpublishValueraise_errorRangeReducereduceWithreductionsRefCountRepeatrepeat_infinitelyrepeatUntilrepeatWhenReplayrescue_errorrestRetryretry_infinitelyretryUntilretryWhenReturnreturnElementreturnValuerunAsyncsafeSubscribeSampleScanscanWithscopeSelect(alternate name ofMap)select(alternate name ofFilter)selectConcatselectConcatObserverSelectManyselectManyObserverselect_switchselectSwitchselectSwitchFirstselectWithMaxConcurrentselect_with_indexseqSequenceEqualsequence_eql?SequenceEqualWithSerializeshareshareReplayshareValueSinglesingleElementSingleOrDefaultsingleOptionsingleOrElsesizeSkipSkipLastskipLastWithTimeSkipUntilskipUntilWithTimeSkipWhileskipWhileWithIndexskip_with_timesliceslidingslidingBuffersomesortsortedsort-bysorted-list-bysplitsplit-withStartstartAsyncstartFutureStartWithstartWithArraystringConcatstopAndWaitsubscribesubscribeActualSubscribeOnSubscribeOnDispatchersubscribeOnCompletedsubscribeOnErrorsubscribeOnNextsubscribeWithSumsumDoublesumFloatsumIntegersumLongSwitchswitchCaseswitchIfEmptyswitchLatestswitchMapswitchMapDelayErrorswitchOnNextswitchOnNextDelayErrorSynchronizeTaketake_with_timetakeFirstTakeLasttakeLastBuffertakeLastBufferWithTimetakeLastWithTimetakeRight(see also:TakeLast)TakeUntiltakeUntilWithTimeTakeWhiletakeWhileWithIndextailtaptapOnCompletedtapOnErrortapOnNextThenthenDoThrottlethrottleFirstthrottleLastthrottleWithSelectorthrottleWithTimeoutThrowthrowErrorthrowExceptionTimeIntervalTimeouttimeoutWithSelectorTimerTimestampToto_aToArrayToAsynctoBlockingtoBufferto_dictToDictionaryToEnumerableToEventToEventPatternToFlowableToFutureto_htoIndexedSeqtoIterabletoIteratorToListToLookuptoMaptoMultiMapToObservabletoSettoSortedListtoStreamToTasktoTraversabletoVectortumblingtumblingBufferunsafeCreateunsubscribeOnUsingWhenWherewhilewhileDoWindowwindowWithCountwindowWithTimewindowWithTimeOrCountwindowedwithFilterwithLatestFromZipzipArrayzipIterablezipWithzipWithIndex+++::+参考资料
http://reactivex.io/documentation/operators.html
