本章介绍如何把来自多个Observable对象的数据合并到一个Observable对象中。
1.「合并类」操作符 75’
不少合并类操作符既提供静态操作符也提供实例操作符,要根据具体场景选择使用。
不同的Observable对象合并到一起有很多规则,并且大部分合并操作符能够汇合超过两个以上的上有数据流。
1.1 concat
首尾相连,输出1 2 3 4 5 6 7 8 9
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/concat';
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of(4, 5, 6);
const source3$ = Observable.of(7, 8, 9);
const concated$ = source1$.concat(source2$, source3$);
concated$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.2 merge
1.2.1 数据汇流
对上游数据采取先到先得的策略,任何一个Observable只要有数据推下来,立即转给下游。
下面例子中的输出结果为 0A 0B 1A 1B …
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/merge';
const source1$ = Observable.timer(0, 1000).map(x => x+'A');
const source2$ = Observable.timer(500, 1000).map(x => x+'B');
const merged$ = source1$.merge(source2$);
merged$.subscribe(
console.log,
null,
() => console.log('complete')
);
对异步产生的数据流使用merge才有意义,例如下面例子同步产生的数据里,结果会是 1 2 3 4 5 6
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/merge';
const source1 = Observable.of(1, 2, 3);
const source2 = Observable.of(4, 5, 6);
const merged$= Observable.merge(source1, source2);
merged$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.2.2 同步限流
最后一个参数限定了可以合并的流的数量,除非某个流结束了,否则超过数量的流永远得不到数据合并的机会。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/merge';
const source1$ = Observable.timer(0, 1000).map(x => x+'A');
const source2$ = Observable.timer(500, 1000).map(x => x+'B');
const source3$ = Observable.timer(1000, 1000).map(x => x+'C');
const merged$ = source1$.merge(source2$, source3$, 2);
merged$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
1.2.3 merge的应用场景
fromEvent一次只能从一个DOM元素获取一种类型的时间,如果既想要增加click事件,有想要增加touchend时间,可以使用merge的方法合并产生新的Observable对象, 在这个对象上subscribe的eventHandler,可以通过click 和 touchend触发。
1.3 zip
1.3.1 一对一的合并
数据源头你吐一个我吐一个的方式进行合并,下例中两个都是同步数据流,结果为 [1, ‘a’] [2, ‘b’] [3, ‘c’]。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/zip';
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of('a', 'b', 'c');
const zippedStream$ = source1$.zip(source2$);
zippedStream$.subscribe(
console.log,
null,
() => console.log('complete')
);
下例子中一个是同步数据流,一个是异步数据流,其中一个需要等待另外一个吐出,一个结束合并的流也结束。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';
import 'rxjs/add/observable/interval';
const source1$ = Observable.interval(1000);
const source2$ = Observable.of('a', 'b', 'c');
const zippedStream$ = Observable.zip(source1$, source2$);
zippedStream$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.3.2 数据积压问题
多个数据流中,其中一个数据吐出很快,其他数据流吐出较慢,就会导致其中一个数据流的数据积压问题。操作符自身解决不了这个问题,后续章节会介绍如何解决这个情况。
1.3.3 zip多个数据流
多个数据流仍旧需要一对一咬合,数组元素的个数和上游Observable对象数量相同。
1.4 combineLatest
反复使用上游产生的最后一个数据,如果其他上游数据存在,就合并吐出到下游,例如下面例子中产生的数据为 [0,0] [1, 0] [1,1] [2,1] [2,2] …
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/combineLatest';
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const result$ = source1$.combineLatest(source2$);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
出乎直觉的结果是 [c,1] [c,2] [c,3],因为$source1作为一个上游数据流先就位,第二个数据流依次出现的数据是1 2 3,那么和 $source1 组合的数据都是它的最后一个元素 c。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/combineLatest';
const source1$ = Observable.of('a', 'b', 'c');
const source2$ = Observable.of(1, 2, 3);
const result$ = Observable.combineLatest(source1$, source2$);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.4.1 定制下游数据
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/combineLatest';
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$, project);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.4.2 多重依赖问题
当上游的数据源有相互依赖时候,会带来意想不到的结果。original的数据改变会导致分两次生效:[‘0a’,’0b’] [‘1a’,’0b’] [‘1a’,’1b’] [‘2a’,’1b’] [‘2a’,’2b’],预期的是original在下游同时应用,此问题被称为 glitch。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/combineLatest';
import 'rxjs/add/operator/map';
const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x+'a');
const source2$ = original$.map(x => x+'b');
const result$ = source1$.combineLatest(source2$);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.5 withLatestFrom
调用withLatestFrom对象的操作符,起到了主导数据产生节奏的作用,作为参数的Observable对象只能贡献数据,不能控制数据产生的时机。下面例子中产生的结果为 101 203 305 …
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
const source1$ = Observable.timer(0, 2000).map(x => 100 * x);
const source2$ = Observable.timer(500, 1000);
const result$ = source1$.withLatestFrom(source2$, (a,b)=> a+b);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.6 解决glitch
改为使用 withLatestFrom 就不会出现1.4.2点问题,下例输出为 [‘0a’, ‘0b’] [‘1a’, ‘1b’] …
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x+'a');
const source2$ = original$.map(x => x+'b');
const result$ = source1$.withLatestFrom(source2$);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
下面是一个DOM的例子,如果使用 comtineLatest,点击发生时候,回存在先更新x再更新y的问题,然而使用 withLatestFrom可以解决这个问题。
<!doctype html>
<html
<head>
<style type="text/css">
html, body {
width: 100%;
height: 100%;
min-height: 100%;
}
</style>
</head>
<body>
<div>
<div id="text"></div>
</div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
const event$ = Rx.Observable.fromEvent(document.body, 'click');
const x$ = event$.map(e => e.x);
const y$ = event$.map(e => e.y);
const result$ = x$.withLatestFrom(y$, (x, y) => `x: ${x}, y: ${y}`);
result$.subscribe(
(location) => {
console.log('#render', location);
document.querySelector('#text').innerText = location;
}
);
</script>
</body>
</html>
1.7 race
多个Observable对象在一起看谁先产生数据,这个胜者会一直成为数据输出的唯一来源。0a 1a 2a …
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/race';
import 'rxjs/add/operator/map';
const source1$ = Observable.timer(0, 2000).map(x => x+'a');
const source2$ = Observable.timer(500, 1000).map(x => x+'b');
const winner$ = source1$.race(source2$);
winner$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.8 startWith
让一个Observable对象被订阅时,总是突出若干个数据。下述例子输出为 start 0 1 …
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/startWith';
const original$ = Observable.timer(1000, 1000);
const result$ = original$.startWith('start');
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.9 forkJoin
所有Observable对象都完结,确信不会产生新的数据时,把所有输入对象产生的最后一个数据合并,输出结果为 [‘0a’, ‘2b’],相当于RxJS的promiseAll。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const source1$ = Observable.interval(1000).map(x => x + 'a').take(1);
const source2$ = Observable.interval(1000).map(x => x + 'b').take(3);
const concated$ = Observable.forkJoin(source1$, source2$);
concated$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
2. 高阶Observable 30’
高阶函数是产生函数的函数,高阶Observable则是产生Observable的Observable。下面介绍的所有操作符,相比较普通操作符,只是语法上追加了一个All
。
注:take从上游数据流拿指定数量的数据之后就完结,在第7章中介绍。
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
和此前的弹珠图有所不同
- 第一条横线代表高阶Observable本身
- 第二、三条横线代表内部Observable(Inner Observable)
- 虚线表示了高阶Observable 和 内部Observable 之间的生成关系
- 高阶Observable完结,不代表内部Observable,它们有自己的生命周期。
2.1 高阶Observable的意义
数据流管理的是数据,而数据流也是一种特殊的数据。
让需要被管理的Observable对象成为其他Observable对象的数据,就是Observable的意义。
本章只介绍告诫Observable的合并操作符,因为这一类组好理解,后续会介绍其他的高阶操作符。2.2 操作高阶Observable的合并操作符
2.2.1 concatAll
会先订阅第一个内部Observalbe对象并抽出其中数据,第一Observable对象完结时,才会订阅第二个Observable对象。下例中输出数据为 0:0 0:1 1:0 1:1,理解了这个例子,后高阶合并操作符就不在话下了。 ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/of’; import ‘rxjs/add/observable/interval’; import ‘rxjs/add/operator/concatAll’; import ‘rxjs/add/operator/map’; import ‘rxjs/add/operator/take’;
const ho$ = Observable.interval(1000) .take(2) .map(x => Observable.interval(1500).map(y => x+’:’+y).take(2));
const concated$ = ho$.concatAll();
concated$.subscribe( console.log, err => console.log(‘Error: ‘, err), () => console.log(‘complete’) );
<a name="KKBZs"></a>
### 2.2.2mergeAll
和mergeAll差异点在于,发现内部Observable对象后会立即订阅,不会等待。参考弹珠图查看差异。
```json
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/mergeAll';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const concated$ = ho$.mergeAll();
concated$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
2.2.3 zipAll
产生的输出是 [‘0:0’, ‘1:0’] [‘0:1’, ‘1:1’]
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/zipAll';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const concated$ = ho$.zipAll();
concated$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
2.2.4 combineAll
相当于处理高阶Observable的combineLatest,大概嫌名字太差了,所以采用了combineAll这个名字。下例子的分析过程和输出结果:
0 0,1 => 0:0 0:1 => [0:0 1:0] [0:1 1:0] [0:1 1:1]
1 0,1 => 1:0 1:1
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/combineAll';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const concated$ = ho$.combineAll();
concated$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
2.2.5 为什么没有withLatestFromAll
意味着多个高阶Observable产生的内部Observable区别对待,看起来不是很好的选择,因此并没有提供这样的操作符。
2.3 进化的高阶Observable处理
concatAll存在一个问题,上游高级Observable对象产生Observable对象过快,导致数据产生较快,而合并速度赶不上,就会存在数据积压的问题。实际上在某场景是可以舍弃一些数据,这就涉及到另外两个合并操作。
2.3.1 switch
总是切换到最新的内部Observable对象上获取数据,下面例子中始终使用interval产生的1,输出结果是 1:0 1:0。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/switch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const result$ = ho$.switch();
result$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
一个更复杂的例子,结合弹珠图可以很好地理解它
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/switch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const ho$ = Observable.interval(1000)
.take(3)
.map(x => Observable.interval(700).map(y => x+':'+y).take(2));
const result$ = ho$.switch();
result$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
2.3.2 exhaust
耗尽当前内部Observable数据之前,不会切换到下一个Observable对象。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/exhaust';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const ho$ = Observable.interval(1000)
.take(3)
.map(x => Observable.interval(700).map(y => x+':'+y).take(2));
const result$ = ho$.exhaust();
result$.subscribe(
console.log,
err => console.log('Error: ', err),
() => console.log('complete')
);
第二个内部Observable产生时,第一个内部Observable数据还没结束,因此被忽略了。