concatAll

定义:

  • public concatAll(): Observable

顾名思义,该操作符有点像我们js中数组方法concat,用于将多个Observable合成一个,不过它有个注意点在于它是串行的,也就是合并了两个Observable,那订阅者在获取值的时候会先获取完第一个Observable,之后才开始接收到后一个Observable的值。

组合操作符 - 图1

concatAll.png

  1. const source1 = Rx.Observable.of(1, 2);const source2 = source1.map(x => Rx.Observable.interval(1000).take(3));const result = source2.concatAll();result.subscribe(x => console.log(x));

根据上面的文字介绍,相信大家对于这段代码应该也能多少看得懂一些,没错,这段代码的含义就是我们的数据源发送了两个数,并且采用map操作符处理完返回了一个新的Observable,这个时候为了订阅者能够正常的接收多个Observable,则采用concatAll合并一下,并且最终订阅者收到的结果依次为:0、1、2、0、1、2。

mergeAll

定义:

  • public mergeAll(concurrent: number): Observable

concatAll几乎没太大差别,唯一不同的就是它是并行的,也就是合并的多个Observable发送数据时是不分先后的。

combineLatest

定义:

  • public combineLatest(other: ObservableInput, project: function): Observable

组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。

这个操作符光从简介来看不太好理解,我们来结合实例进行讲解吧。

组合操作符 - 图2

combineLatest.png

const s1 = Rx.Observable.interval(2000).take(3);const s2 = Rx.Observable.interval(1000).take(5);const result = s1.combineLatest(s2, (a, b) => a + b);result.subscribe(x => console.log(x));

打印结果依次是:0、1、2、3、4、5、6。

首先我们看这个combineLatest的使用方式,它是一个实例操作符,这里演示的是将s1s2结合到一起,并且第二个参数需要传入回调,对结合的值进行处理,由于我们这里只结合了两个,故只接收ab两个参数,该回调函数的返回值即为订阅者获取到的值。

从结果看其实也看不出来啥,主要是这个过程如下:

  1. s2发送一个 0,而此时s1未发送值,则我们传入的回调不会执行,订阅者也不会接收到值。
  2. s1发送一个 0,而s2最后一次发送的值为 0,故调用回调函数,并把这两个参数传入,最终订阅者收到
  3. s2发送一个 1,而s1最后一次发送的为 0,故结果为 1。
  4. s1发送一个 1,而s2最后一次发送的值为 1,故结果为 2。
  5. s2发送一个值为 2,而s1最后一次发送的值为 1,故结果为 3。
  6. s2发送一个值为 3,而s1最后一次发送的值为 1,故结果为 4。
  7. … 重复上述步骤。

这里有个注意点,我们会发现s1s2在某些时候会同时发送数据,但是这个也会有先后顺序的,所以这个时候就看他们谁先定义那么谁就会先发送,从上面步骤中你们应该也能发现这个现象。

其实也就是结合的多个源之间存在一种依赖关系,也就是两个源都至少发送了一个值,订阅者才会收到消息,等到两个源都发送完毕,最后才会发出结束信号。

zip

定义:

  • public static zip(observables: *): Observable<R>

将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。如果最后一个参数是函数, 这个函数被用来计算最终发出的值. 否则, 返回一个顺序包含所有输入值的数组.

通俗点说就是多个源之间会进行顺位对齐计算,跟前面的combineLatest有点差别。

话不多说,上码:

组合操作符 - 图3

zip.png

const s1 = Rx.Observable.interval(1000).take(3);const s2 = Rx.Observable.interval(2000).take(5);const result = s1.zip(s2, (a, b) => a + b);result.subscribe(x => console.log(x));

打印结果依次是:0、2、4。

怎么理解呢,首先我们记住一句话,多个源之间用来计算的数是顺位对齐的,也就是说s1的第一个数对齐s2的第一个数,这种一一对应的计算,最终订阅者收到的就是将多个对齐的数传入我们在调用zip的最后一个回调函数,也就是用来计算完值最终返回给用户的结果,这是可选的。

等到两个源中的任意一个源结束了之后,整体就会发出结束信号,因为后续不存在可以对齐的数了。

startWidth

定义:

  • public startWith(values: ...T, scheduler: Scheduler): Observable

返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。

怎么理解呢,其实很好举例,比如有一串糖葫芦,整体都是一个颜色,你觉得不好看,于是你在这串糖葫芦的前面插了几个颜色不一样的糖葫芦,这个时候用户吃的时候就会先吃到你插在最前面的糖葫芦。

const source = Rx.Observable.interval(1000).take(3);const result = source.startWith(666)result.subscribe(x => console.log(x));

打印结果为:666、0、1、2。

是不是很好理解呢。

switch

定义:

  • public switch(): Observable<T>

通过只订阅最新发出的内部 Observable ,将高阶 Observable 转换成一阶 Observable

对于该操作符的用法其实前面我们在介绍switchMap这个转换操作符时就已经说到了,相当于map+switch=switchMap

举个栗子:

const btn = document.createElement('button');btn.innerText = '我要发言!'document.body.appendChild(btn);const source = Rx.Observable.fromEvent(btn, 'click');const source2 = source.map(x => Rx.Observable.interval(1000).take(3));const result = source2.switch();result.subscribe(x => console.log(x));

上述代码实现的效果与switchMap一致,当用户点击按钮时会开始发送数据,当这次数据发送未完成时,再次点击按钮,则会开始一个新的发射数据流程,将原先的发射数据流程直接抛弃。

其他组合操作符

官网传送门:组合操作符 https://cn.rx.js.org/manual/overview.html#h312

  • combineAll
  • concat
  • exhaust
  • forkJoin
  • merge
  • race
  • withLatestFrom
  • zipAll