本章介绍如何把来自多个Observable对象的数据合并到一个Observable对象中。

1.「合并类」操作符 75’

不少合并类操作符既提供静态操作符也提供实例操作符,要根据具体场景选择使用。
不同的Observable对象合并到一起有很多规则,并且大部分合并操作符能够汇合超过两个以上的上有数据流。

1.1 concat

首尾相连,输出1 2 3 4 5 6 7 8 9

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/concat';
  4. const source1$ = Observable.of(1, 2, 3);
  5. const source2$ = Observable.of(4, 5, 6);
  6. const source3$ = Observable.of(7, 8, 9);
  7. const concated$ = source1$.concat(source2$, source3$);
  8. concated$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

1.2 merge

和concat用法很相似,功能却很不一样。

1.2.1 数据汇流

对上游数据采取先到先得的策略,任何一个Observable只要有数据推下来,立即转给下游。
下面例子中的输出结果为 0A 0B 1A 1B …

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/map';
  4. import 'rxjs/add/operator/merge';
  5. const source1$ = Observable.timer(0, 1000).map(x => x+'A');
  6. const source2$ = Observable.timer(500, 1000).map(x => x+'B');
  7. const merged$ = source1$.merge(source2$);
  8. merged$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

对异步产生的数据流使用merge才有意义,例如下面例子同步产生的数据里,结果会是 1 2 3 4 5 6

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/merge';
  4. const source1 = Observable.of(1, 2, 3);
  5. const source2 = Observable.of(4, 5, 6);
  6. const merged$= Observable.merge(source1, source2);
  7. merged$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

1.2.2 同步限流

最后一个参数限定了可以合并的流的数量,除非某个流结束了,否则超过数量的流永远得不到数据合并的机会。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/map';
  4. import 'rxjs/add/operator/merge';
  5. const source1$ = Observable.timer(0, 1000).map(x => x+'A');
  6. const source2$ = Observable.timer(500, 1000).map(x => x+'B');
  7. const source3$ = Observable.timer(1000, 1000).map(x => x+'C');
  8. const merged$ = source1$.merge(source2$, source3$, 2);
  9. merged$.subscribe(
  10. console.log,
  11. err => console.log('Error: ', err),
  12. () => console.log('complete')
  13. );

1.2.3 merge的应用场景

fromEvent一次只能从一个DOM元素获取一种类型的时间,如果既想要增加click事件,有想要增加touchend时间,可以使用merge的方法合并产生新的Observable对象, 在这个对象上subscribe的eventHandler,可以通过click 和 touchend触发。

1.3 zip

1.3.1 一对一的合并

数据源头你吐一个我吐一个的方式进行合并,下例中两个都是同步数据流,结果为 [1, ‘a’] [2, ‘b’] [3, ‘c’]。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/zip';
  4. const source1$ = Observable.of(1, 2, 3);
  5. const source2$ = Observable.of('a', 'b', 'c');
  6. const zippedStream$ = source1$.zip(source2$);
  7. zippedStream$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

下例子中一个是同步数据流,一个是异步数据流,其中一个需要等待另外一个吐出,一个结束合并的流也结束。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/zip';
  4. import 'rxjs/add/observable/interval';
  5. const source1$ = Observable.interval(1000);
  6. const source2$ = Observable.of('a', 'b', 'c');
  7. const zippedStream$ = Observable.zip(source1$, source2$);
  8. zippedStream$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

1.3.2 数据积压问题

多个数据流中,其中一个数据吐出很快,其他数据流吐出较慢,就会导致其中一个数据流的数据积压问题。操作符自身解决不了这个问题,后续章节会介绍如何解决这个情况。

1.3.3 zip多个数据流

多个数据流仍旧需要一对一咬合,数组元素的个数和上游Observable对象数量相同。

1.4 combineLatest

反复使用上游产生的最后一个数据,如果其他上游数据存在,就合并吐出到下游,例如下面例子中产生的数据为 [0,0] [1, 0] [1,1] [2,1] [2,2] …

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/combineLatest';
  4. const source1$ = Observable.timer(500, 1000);
  5. const source2$ = Observable.timer(1000, 1000);
  6. const result$ = source1$.combineLatest(source2$);
  7. result$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

出乎直觉的结果是 [c,1] [c,2] [c,3],因为$source1作为一个上游数据流先就位,第二个数据流依次出现的数据是1 2 3,那么和 $source1 组合的数据都是它的最后一个元素 c。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/combineLatest';
  4. const source1$ = Observable.of('a', 'b', 'c');
  5. const source2$ = Observable.of(1, 2, 3);
  6. const result$ = Observable.combineLatest(source1$, source2$);
  7. result$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

1.4.1 定制下游数据

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/combineLatest';
  4. const source1$ = Observable.timer(500, 1000);
  5. const source2$ = Observable.timer(1000, 1000);
  6. const project = (a, b) => `${a} and ${b}`;
  7. const result$ = source1$.combineLatest(source2$, project);
  8. result$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

1.4.2 多重依赖问题

当上游的数据源有相互依赖时候,会带来意想不到的结果。original的数据改变会导致分两次生效:[‘0a’,’0b’] [‘1a’,’0b’] [‘1a’,’1b’] [‘2a’,’1b’] [‘2a’,’2b’],预期的是original在下游同时应用,此问题被称为 glitch。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/combineLatest';
  4. import 'rxjs/add/operator/map';
  5. const original$ = Observable.timer(0, 1000);
  6. const source1$ = original$.map(x => x+'a');
  7. const source2$ = original$.map(x => x+'b');
  8. const result$ = source1$.combineLatest(source2$);
  9. result$.subscribe(
  10. console.log,
  11. null,
  12. () => console.log('complete')
  13. );

1.5 withLatestFrom

调用withLatestFrom对象的操作符,起到了主导数据产生节奏的作用,作为参数的Observable对象只能贡献数据,不能控制数据产生的时机。下面例子中产生的结果为 101 203 305 …

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/withLatestFrom';
  4. import 'rxjs/add/operator/map';
  5. const source1$ = Observable.timer(0, 2000).map(x => 100 * x);
  6. const source2$ = Observable.timer(500, 1000);
  7. const result$ = source1$.withLatestFrom(source2$, (a,b)=> a+b);
  8. result$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

1.6 解决glitch

改为使用 withLatestFrom 就不会出现1.4.2点问题,下例输出为 [‘0a’, ‘0b’] [‘1a’, ‘1b’] …

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/withLatestFrom';
  4. import 'rxjs/add/operator/map';
  5. const original$ = Observable.timer(0, 1000);
  6. const source1$ = original$.map(x => x+'a');
  7. const source2$ = original$.map(x => x+'b');
  8. const result$ = source1$.withLatestFrom(source2$);
  9. result$.subscribe(
  10. console.log,
  11. null,
  12. () => console.log('complete')
  13. );

下面是一个DOM的例子,如果使用 comtineLatest,点击发生时候,回存在先更新x再更新y的问题,然而使用 withLatestFrom可以解决这个问题。

  1. <!doctype html>
  2. <html
  3. <head>
  4. <style type="text/css">
  5. html, body {
  6. width: 100%;
  7. height: 100%;
  8. min-height: 100%;
  9. }
  10. </style>
  11. </head>
  12. <body>
  13. <div>
  14. <div id="text"></div>
  15. </div>
  16. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  17. <script>
  18. const event$ = Rx.Observable.fromEvent(document.body, 'click');
  19. const x$ = event$.map(e => e.x);
  20. const y$ = event$.map(e => e.y);
  21. const result$ = x$.withLatestFrom(y$, (x, y) => `x: ${x}, y: ${y}`);
  22. result$.subscribe(
  23. (location) => {
  24. console.log('#render', location);
  25. document.querySelector('#text').innerText = location;
  26. }
  27. );
  28. </script>
  29. </body>
  30. </html>

1.7 race

多个Observable对象在一起看谁先产生数据,这个胜者会一直成为数据输出的唯一来源。0a 1a 2a …

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/race';
  4. import 'rxjs/add/operator/map';
  5. const source1$ = Observable.timer(0, 2000).map(x => x+'a');
  6. const source2$ = Observable.timer(500, 1000).map(x => x+'b');
  7. const winner$ = source1$.race(source2$);
  8. winner$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

1.8 startWith

让一个Observable对象被订阅时,总是突出若干个数据。下述例子输出为 start 0 1 …

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/startWith';
  4. const original$ = Observable.timer(1000, 1000);
  5. const result$ = original$.startWith('start');
  6. result$.subscribe(
  7. console.log,
  8. null,
  9. () => console.log('complete')
  10. );

1.9 forkJoin

所有Observable对象都完结,确信不会产生新的数据时,把所有输入对象产生的最后一个数据合并,输出结果为 [‘0a’, ‘2b’],相当于RxJS的promiseAll。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/observable/forkJoin';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/take';
  7. const source1$ = Observable.interval(1000).map(x => x + 'a').take(1);
  8. const source2$ = Observable.interval(1000).map(x => x + 'b').take(3);
  9. const concated$ = Observable.forkJoin(source1$, source2$);
  10. concated$.subscribe(
  11. console.log,
  12. err => console.log('Error: ', err),
  13. () => console.log('complete')
  14. );

2. 高阶Observable 30’

高阶函数是产生函数的函数,高阶Observable则是产生Observable的Observable。下面介绍的所有操作符,相比较普通操作符,只是语法上追加了一个All

注:take从上游数据流拿指定数量的数据之后就完结,在第7章中介绍。

  1. const ho$ = Observable.interval(1000)
  2. .take(2)
  3. .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));

image.png

和此前的弹珠图有所不同

  • 第一条横线代表高阶Observable本身
  • 第二、三条横线代表内部Observable(Inner Observable)
  • 虚线表示了高阶Observable 和 内部Observable 之间的生成关系
  • 高阶Observable完结,不代表内部Observable,它们有自己的生命周期。

    2.1 高阶Observable的意义

    数据流管理的是数据,而数据流也是一种特殊的数据。
    让需要被管理的Observable对象成为其他Observable对象的数据,就是Observable的意义。
    本章只介绍告诫Observable的合并操作符,因为这一类组好理解,后续会介绍其他的高阶操作符。

    2.2 操作高阶Observable的合并操作符

    2.2.1 concatAll

    会先订阅第一个内部Observalbe对象并抽出其中数据,第一Observable对象完结时,才会订阅第二个Observable对象。下例中输出数据为 0:0 0:1 1:0 1:1,理解了这个例子,后高阶合并操作符就不在话下了。 ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/of’; import ‘rxjs/add/observable/interval’; import ‘rxjs/add/operator/concatAll’; import ‘rxjs/add/operator/map’; import ‘rxjs/add/operator/take’;

const ho$ = Observable.interval(1000) .take(2) .map(x => Observable.interval(1500).map(y => x+’:’+y).take(2));

const concated$ = ho$.concatAll();

concated$.subscribe( console.log, err => console.log(‘Error: ‘, err), () => console.log(‘complete’) );

  1. <a name="KKBZs"></a>
  2. ### 2.2.2mergeAll
  3. 和mergeAll差异点在于,发现内部Observable对象后会立即订阅,不会等待。参考弹珠图查看差异。
  4. ```json
  5. import {Observable} from 'rxjs/Observable';
  6. import 'rxjs/add/observable/of';
  7. import 'rxjs/add/observable/interval';
  8. import 'rxjs/add/operator/mergeAll';
  9. import 'rxjs/add/operator/map';
  10. import 'rxjs/add/operator/take';
  11. const ho$ = Observable.interval(1000)
  12. .take(2)
  13. .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
  14. const concated$ = ho$.mergeAll();
  15. concated$.subscribe(
  16. console.log,
  17. err => console.log('Error: ', err),
  18. () => console.log('complete')
  19. );

2.2.3 zipAll

产生的输出是 [‘0:0’, ‘1:0’] [‘0:1’, ‘1:1’]

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/operator/zipAll';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/take';
  7. const ho$ = Observable.interval(1000)
  8. .take(2)
  9. .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
  10. const concated$ = ho$.zipAll();
  11. concated$.subscribe(
  12. console.log,
  13. err => console.log('Error: ', err),
  14. () => console.log('complete')
  15. );

2.2.4 combineAll

相当于处理高阶Observable的combineLatest,大概嫌名字太差了,所以采用了combineAll这个名字。下例子的分析过程和输出结果:

0 0,1 => 0:0 0:1 => [0:0 1:0] [0:1 1:0] [0:1 1:1]
1 0,1 => 1:0 1:1

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/operator/combineAll';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/take';
  7. const ho$ = Observable.interval(1000)
  8. .take(2)
  9. .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
  10. const concated$ = ho$.combineAll();
  11. concated$.subscribe(
  12. console.log,
  13. err => console.log('Error: ', err),
  14. () => console.log('complete')
  15. );

2.2.5 为什么没有withLatestFromAll

意味着多个高阶Observable产生的内部Observable区别对待,看起来不是很好的选择,因此并没有提供这样的操作符。

2.3 进化的高阶Observable处理

concatAll存在一个问题,上游高级Observable对象产生Observable对象过快,导致数据产生较快,而合并速度赶不上,就会存在数据积压的问题。实际上在某场景是可以舍弃一些数据,这就涉及到另外两个合并操作。

2.3.1 switch

总是切换到最新的内部Observable对象上获取数据,下面例子中始终使用interval产生的1,输出结果是 1:0 1:0。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/operator/switch';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/take';
  7. const ho$ = Observable.interval(1000)
  8. .take(2)
  9. .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
  10. const result$ = ho$.switch();
  11. result$.subscribe(
  12. console.log,
  13. err => console.log('Error: ', err),
  14. () => console.log('complete')
  15. );

一个更复杂的例子,结合弹珠图可以很好地理解它

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/operator/switch';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/take';
  7. const ho$ = Observable.interval(1000)
  8. .take(3)
  9. .map(x => Observable.interval(700).map(y => x+':'+y).take(2));
  10. const result$ = ho$.switch();
  11. result$.subscribe(
  12. console.log,
  13. err => console.log('Error: ', err),
  14. () => console.log('complete')
  15. );

image.png

2.3.2 exhaust

耗尽当前内部Observable数据之前,不会切换到下一个Observable对象。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/operator/exhaust';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/take';
  7. const ho$ = Observable.interval(1000)
  8. .take(3)
  9. .map(x => Observable.interval(700).map(y => x+':'+y).take(2));
  10. const result$ = ho$.exhaust();
  11. result$.subscribe(
  12. console.log,
  13. err => console.log('Error: ', err),
  14. () => console.log('complete')
  15. );

image.png

第二个内部Observable产生时,第一个内部Observable数据还没结束,因此被忽略了。