image.png

1. 过滤类操作符的模式

判定数据是否符合某个条件,符合则传递给下游,否则就抛弃掉。判定是否有资格进入下游的用的是“判定函数”,除此外有的过滤类操作符还可以接受一个结果选择器函数,用于定制传递给下游的函数。

1.1 filter

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/filter';
  4. const source$ = Observable.interval(1000);
  5. const even$ = source$.filter(x => x % 2 === 0);
  6. even$.subscribe(
  7. console.log,
  8. null,
  9. () => console.log('complete')
  10. );

1.2 first

第2、3个参数都是可选的,分别代表期望怎么组合结果,以及当没有找到符合条件的值时默认值是什么。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/first';
  4. const source$ = Observable.of(3, 1, 4, 1, 5, 9);
  5. const first$ = source$.first(
  6. x => x < 0,
  7. f => f,
  8. -1
  9. );
  10. first$.subscribe(
  11. console.log,
  12. null,
  13. () => console.log('complete')
  14. );

1.3 last

和first的差别在于,一定要等到所有数据都吐出才能得到结果。例子略。

1.4 take一族操作符

前面的first、last、find、findIndex都是处理单个数据。
taken值传递一个count,指定从数据流中拿几个数据,它还有很多兄弟满足各种需求。

1.4.1 takeLast

下图是take(5)和takeLast(3)的弹珠图,takeLast的特点是必须等到所有数据都吐出。
image.png

1.4.2 takeWhile

takeWhite((value, index) => boolean) 只要遇到一个数据符合false的条件,Observable对象就完结。

1.4.3 take和filter的组合

假定想要拿到符合条件的前N个数据,可以使用take和filter组合方式来实现

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/take';
  4. import 'rxjs/add/operator/filter';
  5. const source$ = Observable.interval(1000);
  6. Observable.prototype.takeCountWhile = function (count, predicate) {
  7. return this.filter(predicate).take(count);
  8. }
  9. const even$ = source$.takeCountWhile(
  10. 2,
  11. value => value % 2 === 0
  12. );
  13. even$.subscribe(
  14. console.log,
  15. null,
  16. () => console.log('complete')
  17. );

它的弹珠图如下(主题看$source永远不会停止;而take的Observable对象拿到第二个数据后就停止了)
image.png

1.4.4 takeUtil

可以理解为Observable对象调用takeUtil,传递一个Observable对象作为notifier,一旦notifier突出数据,水龙头就会关闭,数据流就会停止。参考下面例子,在2.5s的时候,被关闭了“水龙头”:d

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/takeUntil';
  5. const source$ = Observable.interval(1000);
  6. const notifier$ = Observable.timer(2500);
  7. const takeUntil$ = source$.takeUntil(notifier$);
  8. takeUntil$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

image.png

1.5 计时的点击计数网页程序

countDown$ 第5秒时突出数据,触发“时间结束”的日志输出;而在按钮上发生的点击行为,会更新clickCount并显示在界面上,到第5秒的时候,再发生点击后,“水龙头”关闭。

  1. <!doctype html>
  2. <html>
  3. <body>
  4. <div>
  5. <button id="clickMe">Click Me</button>
  6. <div id="text">0</div>
  7. <div id="end">抓紧时间</div>
  8. </div>
  9. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  10. <script>
  11. let clickCount = 0;
  12. const event$ = Rx.Observable.fromEvent(document.querySelector('#clickMe'), 'click');
  13. const countDown$ = Rx.Observable.timer(5000);
  14. const filtered$ = event$.takeUntil(countDown$);
  15. const showEnd = () => {
  16. document.querySelector('#end').innerText = '时间结束';
  17. };
  18. const updateCount = () => {
  19. document.querySelector('#text').innerText = ++clickCount
  20. };
  21. countDown$.subscribe(showEnd);
  22. filtered$.subscribe(updateCount);
  23. </script>
  24. </body>
  25. </html>

1.6 skip

skip(count),跳过前count个数据后再拿。

1.7 skipWhile 和 skipUtil

skipWhile和takeWhile、skipUntil和takeUntil是对应关系;
take是取,skip是跳过;
while是符合条件时都do(take/skip)
until是遇到第一个符合条件的case之前之后改变do行为(take or skip);

2. 回压控制

“回压” 是源自传统工程的概念,气体或液体应该朝着一个方向流动,如果管道口径变小,会导致和流动方向相反的压力,RxJS中的数据流动和这个概念很相似,本节介绍的操作符,就是用于舍弃掉一些数据(有损的),本质上它们也是属于过滤类操作符。

注意:后续会有很多涉及到 A-B-C组合的数据流,要理解它们的例子,有两个需要强调

  1. concat的数据流,是在concat后才开始产生数据,即 a.concat(b) 在时间线上吐数据是串行的
  2. 下面介绍的方法,起点是数据产生(可能是第一个,可能是第n个),不要去找时间为0的节点

    2.1 throttle和debounce

    jquery和loadash都实现了throttle和debounce,和RxJS中的throttle和debounce很不一样,它们对应了RxJS的throttleTime和deounceTime。

    2.1.1 基于时间控制流量:throttleTime和deounceTime

    throttleTime:上游产生的数据,在指定时间内只能传递1个给下游。下面例子中:0 2 4 6 … ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/interval’; import ‘rxjs/add/operator/throttleTime’;

const source$ = Observable.interval(1000); const result$ = source$.throttleTime(2000);

result$.subscribe( console.log, null, () => console.log(‘complete’) );

  1. debounceTime:上游产生的数据,在指定时间内如果有新数据产生,会覆盖新的数据;如果指定时间没有新数据,才会交给下游。下面例子中不会产生任何的结果,因为永远有新数据产生,导致2s内无新数据产生的条件无法满足。
  2. ```json
  3. import {Observable} from 'rxjs/Observable';
  4. import 'rxjs/add/observable/interval';
  5. import 'rxjs/add/operator/debounceTime';
  6. const source$ = Observable.interval(1000);
  7. const result$ = source$.debounceTime(2000);
  8. result$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

稍微修改上述例子,可以输出0 3,结合弹珠图理解

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/filter';
  4. import 'rxjs/add/operator/debounceTime';
  5. const source$ = Observable.interval(500);
  6. const filter$ = source$.filter(x => x % 3 === 0);
  7. const result$ = filter$.debounceTime(1000);
  8. result$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

image.png

继续修改例子的debounceTime为throttleTime,输出结果相同

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/filter';
  4. import 'rxjs/add/operator/throttleTime';
  5. const source$ = Observable.interval(1000);
  6. const filter$ = source$.filter(x => x % 3 === 0);
  7. const result$ = filter$.throttleTime(2000);
  8. result$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

使用这两个函数的典型场景是减少不必要的DOM处理,例如一个用户提交订单,连续1s内点击了多次,我们希望只处理一次,那么用throttleTime来节流;而我们希望根据鼠标的移动来决定是否加载图片的预览内容,那么应该采用的是debounceTime,即停留下来的位置再进行加载。

注:数据的产生是等待上一个数据流完成的。

下面例子模拟了数据产生不规律的时候,debounceTime会得到什么样的数据。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/filter';
  4. import 'rxjs/add/operator/take';
  5. import 'rxjs/add/operator/concat';
  6. import 'rxjs/add/operator/debounceTime';
  7. const source$ = Observable.interval(500).take(2).mapTo('A')
  8. .concat(Observable.interval(1000).take(3).mapTo('B'))
  9. .concat(Observable.interval(500).take(3).mapTo('C'));
  10. const result$ = source$.debounceTime(800);
  11. result$.subscribe(
  12. console.log,
  13. null,
  14. () => console.log('complete')
  15. );

image.png

下面例子模拟了数据产生不规律的时候,debounceTime 和 throttleTime会得到什么样的数据。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/filter';
  4. import 'rxjs/add/operator/take';
  5. import 'rxjs/add/operator/concat';
  6. import 'rxjs/add/operator/mapTo';
  7. import 'rxjs/add/operator/throttleTime';
  8. const source$ = Observable.interval(500).take(2).mapTo('A')
  9. .concat(Observable.interval(1000).take(3).mapTo('B'))
  10. .concat(Observable.interval(500).take(3).mapTo('C'));
  11. const result$ = source$.throttleTime(800);
  12. result$.subscribe(
  13. console.log,
  14. null,
  15. () => console.log('complete')
  16. );

image.png

2.1.2 用数据流来控制流量

下面例子的输出内容为:

call durationSelector with 0

call durationSelector with 2

call durationSelector with 4

过程逻辑描述为:

  1. $source每间隔1s吐出一个数据;
  2. throttle(节流)传递一个函数,并且将第一个$source吐出的数据0传递给下游;
  3. durationSelector函数返回一个Observable对象,throttle对它进行订阅,即:你啥时候吐出数据?数据是啥没关系,主要给我个信号;
  4. 对象在2s后吐出数据0,throttle收到信息,取消对durationSelector返回对象的订阅,然后重新执行durationSelector,这时候$source产生的数据2进入下游。 ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/interval’; import ‘rxjs/add/observable/timer’; import ‘rxjs/add/operator/throttle’;

const source$ = Observable.interval(1000); const durationSelector = value => { console.log(# call durationSelector with ${value}); return Observable.timer(2000); }; const result$ = source$.throttle(durationSelector);

result$.subscribe( console.log, null, () => console.log(‘complete’) );

  1. <a name="Gn2Pr"></a>
  2. ##
  3. 对 durationSelector 函数稍加改动,可以看到可以更灵活地进行数据输出控制:
  4. ```json
  5. import {Observable} from 'rxjs/Observable';
  6. import 'rxjs/add/observable/interval';
  7. import 'rxjs/add/observable/timer';
  8. import 'rxjs/add/operator/throttle';
  9. const source$ = Observable.interval(1000);
  10. const durationSelector = value => {
  11. return Observable.timer(value % 3 === 0 ? 2000 : 1000);
  12. };
  13. const result$ = source$.throttle(durationSelector);
  14. result$.subscribe(
  15. console.log,
  16. null,
  17. () => consol·e.log('complete')
  18. );

image.png

throttle可以理解为多久之后再次放数据,debounce则可以理解为延迟多久才放出一个数据。结合下面例子:

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/debounce';
  5. const source$ = Observable.interval(1000);
  6. const durationSelector = value => {
  7. return Observable.timer(value % 3 === 0 ? 2000 : 1000);
  8. };
  9. const result$ = source$.debounce(durationSelector);
  10. result$.subscribe(
  11. console.log,
  12. null,
  13. () => console.log('complete')
  14. );

image.png

2.2 auditTime和audit

audit和throttle类似,都是遇到一个数据点后,将未来一段时间内作为筛选范围,差别在于(例如800毫秒)

  • throttle遇到一个节点后,800ms内的其他数据被废弃,只取了第一个数据
  • audit遇到一个节点后,800ms内的数据,只取最后一个,其他都被废弃

例如下面例子中输出到数据为 1 3 5 7。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/auditTime';
  5. const source$ = Observable.interval(1000);
  6. const result$ = source$.auditTime(2000);
  7. result$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

audit 也传递一个函数,下面弹珠图放出和throttle的对比:

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/take';
  5. import 'rxjs/add/operator/concat';
  6. import 'rxjs/add/operator/mapTo';
  7. import 'rxjs/add/operator/audit';
  8. const source$ = Observable.interval(500).take(2).mapTo('A')
  9. .concat(Observable.interval(1000).take(3).mapTo('B'))
  10. .concat(Observable.interval(500).take(3).mapTo('C'));
  11. const durationSelector = value => {
  12. return Observable.timer(800);
  13. };
  14. const result$ = source$.audit(durationSelector);
  15. result$.subscribe(
  16. console.log,
  17. null,
  18. () => console.log('complete')
  19. );

image.png

2.3 sampleTime和sample

sample为取样的含义。
auditTime仅仅传递一个时间参数,它不以数据吐出为参考,间隔800毫秒就吐出(最近的)一个数据。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/concat';
  4. import 'rxjs/add/operator/mapTo';
  5. import 'rxjs/add/operator/take';
  6. import 'rxjs/add/operator/sampleTime';
  7. const source$ = Observable.interval(500).take(2).mapTo('A')
  8. .concat(Observable.interval(1000).take(3).mapTo('B'))
  9. .concat(Observable.interval(500).take(3).mapTo('C'));
  10. const result$ = source$.sampleTime(2000);
  11. result$.subscribe(
  12. console.log,
  13. null,
  14. () => console.log('complete')
  15. );

image.png

sample传递的是一个Observable对象,充当一个notifier的作用,例如下面例子每发生点击以后,就输出距离程序开始时间的时间毫秒数:

  1. <!doctype html>
  2. <html>
  3. <body>
  4. <div>
  5. <button id="sample">Sample</button>
  6. <div id="text">0</div>
  7. </div>
  8. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  9. <script>
  10. const notifer$ = Rx.Observable.fromEvent(document.querySelector('#sample'), 'click');
  11. const tick$ = Rx.Observable.timer(0, 10).map(x => x*10);
  12. const sample$ = tick$.sample(notifer$);
  13. sample$.subscribe(value => {
  14. document.querySelector('#text').innerText = value
  15. });
  16. </script>
  17. </body>
  18. </html>

2.4 根据数据序列做回压控制

2.4.1 distinct

仅仅返回不同(前面没出现过)的数据,例如下面例子为 0 1 2 3。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/distinct';
  4. const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
  5. const distinct$ = source$.distinct();
  6. distinct$.subscribe(
  7. console.log,
  8. null,
  9. () => console.log('complete')
  10. );

前面例子使用的是===判断相同,如果复杂对象就不合适了,可以通过传递一个函数

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/distinct';
  4. const source$ = Observable.of(
  5. {name: 'RxJS', version: 'v4'},
  6. {name: 'React', version: 'v15'},
  7. {name: 'React', version: 'v16'},
  8. {name: 'RxJS', version: 'v5'}
  9. );
  10. const distinct$ = source$.distinct(x => x.name);
  11. distinct$.subscribe(
  12. console.log,
  13. null,
  14. () => console.log('complete')
  15. );


识别不同的数据,意味着有个数据结构记录了所有数据, 那么可能导致内存泄漏,第二个参数可以指定清除掉多少时间内的数据,即仅保证此段时间内的数据是唯一的:

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/map';
  4. import 'rxjs/add/operator/distinct';
  5. const source$ = Observable.interval(100).map(x => x % 1000);
  6. const distinct$ = source$.distinct(null, Observable.interval(500));
  7. distinct$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

2.4.2 distinctUtilChanged

distinct比较的是过去的所有数据,此方法是比较前一个数据,如果和前一个数据不同则保留。例如下面例子为 0 1 2 0 1 3。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/mapTo';
  4. import 'rxjs/add/operator/distinctUntilChanged';
  5. const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
  6. const distinct$ = source$.distinctUntilChanged();
  7. distinct$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

同样,对于复杂对象,也可以传递一个方法进行比较,但是注意看下例的方法的参数和distinct传入函数的参数居然不一致:

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/distinctUntilChanged';
  4. const source$ = Observable.of(
  5. {name: 'RxJS', version: 'v4'},
  6. {name: 'React', version: 'v15'},
  7. {name: 'React', version: 'v16'},
  8. {name: 'RxJS', version: 'v5'}
  9. );
  10. const compare = (a, b) => a.name === b.name;
  11. const distinct$ = source$.distinctUntilChanged(compare);
  12. distinct$.subscribe(
  13. console.log,
  14. null,
  15. () => console.log('complete')
  16. );

2.4.3 distinctUtilKeyChanged

可以被当作distinctUtilChanged的一个简化写法,可以直接对某些字段进行比较。

3. 其他过滤方式

有趣的但是不一定实用的操作符。

3.1 ignoreElements

忽略上游产生的所有数据,而只关心complete和error。

3.2 elementAt

$source.elementAt(index, null)把上游数据当作数据结构,取第index个数据,第二个参数可以指定不存在第index个数据时候的默认值,否则会报错。

3.3 single

判定上游是否只有一个符合条件的数据。