1. 过滤类操作符的模式
判定数据是否符合某个条件,符合则传递给下游,否则就抛弃掉。判定是否有资格进入下游的用的是“判定函数”,除此外有的过滤类操作符还可以接受一个结果选择器函数,用于定制传递给下游的函数。
1.1 filter
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/filter';
const source$ = Observable.interval(1000);
const even$ = source$.filter(x => x % 2 === 0);
even$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.2 first
第2、3个参数都是可选的,分别代表期望怎么组合结果,以及当没有找到符合条件的值时默认值是什么。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/first';
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const first$ = source$.first(
x => x < 0,
f => f,
-1
);
first$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.3 last
和first的差别在于,一定要等到所有数据都吐出才能得到结果。例子略。
1.4 take一族操作符
前面的first、last、find、findIndex都是处理单个数据。
taken值传递一个count,指定从数据流中拿几个数据,它还有很多兄弟满足各种需求。
1.4.1 takeLast
下图是take(5)和takeLast(3)的弹珠图,takeLast的特点是必须等到所有数据都吐出。
1.4.2 takeWhile
takeWhite((value, index) => boolean) 只要遇到一个数据符合false的条件,Observable对象就完结。
1.4.3 take和filter的组合
假定想要拿到符合条件的前N个数据,可以使用take和filter组合方式来实现
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/filter';
const source$ = Observable.interval(1000);
Observable.prototype.takeCountWhile = function (count, predicate) {
return this.filter(predicate).take(count);
}
const even$ = source$.takeCountWhile(
2,
value => value % 2 === 0
);
even$.subscribe(
console.log,
null,
() => console.log('complete')
);
它的弹珠图如下(主题看$source永远不会停止;而take的Observable对象拿到第二个数据后就停止了)
1.4.4 takeUtil
可以理解为Observable对象调用takeUtil,传递一个Observable对象作为notifier,一旦notifier突出数据,水龙头就会关闭,数据流就会停止。参考下面例子,在2.5s的时候,被关闭了“水龙头”:d
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/takeUntil';
const source$ = Observable.interval(1000);
const notifier$ = Observable.timer(2500);
const takeUntil$ = source$.takeUntil(notifier$);
takeUntil$.subscribe(
console.log,
null,
() => console.log('complete')
);
1.5 计时的点击计数网页程序
countDown$ 第5秒时突出数据,触发“时间结束”的日志输出;而在按钮上发生的点击行为,会更新clickCount并显示在界面上,到第5秒的时候,再发生点击后,“水龙头”关闭。
<!doctype html>
<html>
<body>
<div>
<button id="clickMe">Click Me</button>
<div id="text">0</div>
<div id="end">抓紧时间</div>
</div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
let clickCount = 0;
const event$ = Rx.Observable.fromEvent(document.querySelector('#clickMe'), 'click');
const countDown$ = Rx.Observable.timer(5000);
const filtered$ = event$.takeUntil(countDown$);
const showEnd = () => {
document.querySelector('#end').innerText = '时间结束';
};
const updateCount = () => {
document.querySelector('#text').innerText = ++clickCount
};
countDown$.subscribe(showEnd);
filtered$.subscribe(updateCount);
</script>
</body>
</html>
1.6 skip
1.7 skipWhile 和 skipUtil
skipWhile和takeWhile、skipUntil和takeUntil是对应关系;
take是取,skip是跳过;
while是符合条件时都do(take/skip)
until是遇到第一个符合条件的case之前之后改变do行为(take or skip);
2. 回压控制
“回压” 是源自传统工程的概念,气体或液体应该朝着一个方向流动,如果管道口径变小,会导致和流动方向相反的压力,RxJS中的数据流动和这个概念很相似,本节介绍的操作符,就是用于舍弃掉一些数据(有损的),本质上它们也是属于过滤类操作符。
注意:后续会有很多涉及到 A-B-C组合的数据流,要理解它们的例子,有两个需要强调
- concat的数据流,是在concat后才开始产生数据,即 a.concat(b) 在时间线上吐数据是串行的
- 下面介绍的方法,起点是数据产生(可能是第一个,可能是第n个),不要去找时间为0的节点
2.1 throttle和debounce
jquery和loadash都实现了throttle和debounce,和RxJS中的throttle和debounce很不一样,它们对应了RxJS的throttleTime和deounceTime。2.1.1 基于时间控制流量:throttleTime和deounceTime
throttleTime:上游产生的数据,在指定时间内只能传递1个给下游。下面例子中:0 2 4 6 … ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/interval’; import ‘rxjs/add/operator/throttleTime’;
const source$ = Observable.interval(1000); const result$ = source$.throttleTime(2000);
result$.subscribe( console.log, null, () => console.log(‘complete’) );
debounceTime:上游产生的数据,在指定时间内如果有新数据产生,会覆盖新的数据;如果指定时间没有新数据,才会交给下游。下面例子中不会产生任何的结果,因为永远有新数据产生,导致2s内无新数据产生的条件无法满足。
```json
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/debounceTime';
const source$ = Observable.interval(1000);
const result$ = source$.debounceTime(2000);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
稍微修改上述例子,可以输出0 3,结合弹珠图理解
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/debounceTime';
const source$ = Observable.interval(500);
const filter$ = source$.filter(x => x % 3 === 0);
const result$ = filter$.debounceTime(1000);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
继续修改例子的debounceTime为throttleTime,输出结果相同
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/throttleTime';
const source$ = Observable.interval(1000);
const filter$ = source$.filter(x => x % 3 === 0);
const result$ = filter$.throttleTime(2000);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
使用这两个函数的典型场景是减少不必要的DOM处理,例如一个用户提交订单,连续1s内点击了多次,我们希望只处理一次,那么用throttleTime来节流;而我们希望根据鼠标的移动来决定是否加载图片的预览内容,那么应该采用的是debounceTime,即停留下来的位置再进行加载。
注:数据的产生是等待上一个数据流完成的。
下面例子模拟了数据产生不规律的时候,debounceTime会得到什么样的数据。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/concat';
import 'rxjs/add/operator/debounceTime';
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(Observable.interval(1000).take(3).mapTo('B'))
.concat(Observable.interval(500).take(3).mapTo('C'));
const result$ = source$.debounceTime(800);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
下面例子模拟了数据产生不规律的时候,debounceTime 和 throttleTime会得到什么样的数据。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/concat';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/throttleTime';
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(Observable.interval(1000).take(3).mapTo('B'))
.concat(Observable.interval(500).take(3).mapTo('C'));
const result$ = source$.throttleTime(800);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.1.2 用数据流来控制流量
下面例子的输出内容为:
call durationSelector with 0
call durationSelector with 2
call durationSelector with 4
过程逻辑描述为:
- $source每间隔1s吐出一个数据;
- throttle(节流)传递一个函数,并且将第一个$source吐出的数据0传递给下游;
- durationSelector函数返回一个Observable对象,throttle对它进行订阅,即:你啥时候吐出数据?数据是啥没关系,主要给我个信号;
- 对象在2s后吐出数据0,throttle收到信息,取消对durationSelector返回对象的订阅,然后重新执行durationSelector,这时候$source产生的数据2进入下游。 ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/interval’; import ‘rxjs/add/observable/timer’; import ‘rxjs/add/operator/throttle’;
const source$ = Observable.interval(1000);
const durationSelector = value => {
console.log(# call durationSelector with ${value}
);
return Observable.timer(2000);
};
const result$ = source$.throttle(durationSelector);
result$.subscribe( console.log, null, () => console.log(‘complete’) );
<a name="Gn2Pr"></a>
##
对 durationSelector 函数稍加改动,可以看到可以更灵活地进行数据输出控制:
```json
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/throttle';
const source$ = Observable.interval(1000);
const durationSelector = value => {
return Observable.timer(value % 3 === 0 ? 2000 : 1000);
};
const result$ = source$.throttle(durationSelector);
result$.subscribe(
console.log,
null,
() => consol·e.log('complete')
);
throttle可以理解为多久之后再次放数据,debounce则可以理解为延迟多久才放出一个数据。结合下面例子:
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/debounce';
const source$ = Observable.interval(1000);
const durationSelector = value => {
return Observable.timer(value % 3 === 0 ? 2000 : 1000);
};
const result$ = source$.debounce(durationSelector);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.2 auditTime和audit
audit和throttle类似,都是遇到一个数据点后,将未来一段时间内作为筛选范围,差别在于(例如800毫秒)
- throttle遇到一个节点后,800ms内的其他数据被废弃,只取了第一个数据
- audit遇到一个节点后,800ms内的数据,只取最后一个,其他都被废弃
例如下面例子中输出到数据为 1 3 5 7。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/auditTime';
const source$ = Observable.interval(1000);
const result$ = source$.auditTime(2000);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
audit 也传递一个函数,下面弹珠图放出和throttle的对比:
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/concat';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/audit';
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(Observable.interval(1000).take(3).mapTo('B'))
.concat(Observable.interval(500).take(3).mapTo('C'));
const durationSelector = value => {
return Observable.timer(800);
};
const result$ = source$.audit(durationSelector);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.3 sampleTime和sample
sample为取样的含义。
auditTime仅仅传递一个时间参数,它不以数据吐出为参考,间隔800毫秒就吐出(最近的)一个数据。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/concat';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/sampleTime';
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(Observable.interval(1000).take(3).mapTo('B'))
.concat(Observable.interval(500).take(3).mapTo('C'));
const result$ = source$.sampleTime(2000);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
sample传递的是一个Observable对象,充当一个notifier的作用,例如下面例子每发生点击以后,就输出距离程序开始时间的时间毫秒数:
<!doctype html>
<html>
<body>
<div>
<button id="sample">Sample</button>
<div id="text">0</div>
</div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
const notifer$ = Rx.Observable.fromEvent(document.querySelector('#sample'), 'click');
const tick$ = Rx.Observable.timer(0, 10).map(x => x*10);
const sample$ = tick$.sample(notifer$);
sample$.subscribe(value => {
document.querySelector('#text').innerText = value
});
</script>
</body>
</html>
2.4 根据数据序列做回压控制
2.4.1 distinct
仅仅返回不同(前面没出现过)的数据,例如下面例子为 0 1 2 3。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/distinct';
const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
const distinct$ = source$.distinct();
distinct$.subscribe(
console.log,
null,
() => console.log('complete')
);
前面例子使用的是===判断相同,如果复杂对象就不合适了,可以通过传递一个函数
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/distinct';
const source$ = Observable.of(
{name: 'RxJS', version: 'v4'},
{name: 'React', version: 'v15'},
{name: 'React', version: 'v16'},
{name: 'RxJS', version: 'v5'}
);
const distinct$ = source$.distinct(x => x.name);
distinct$.subscribe(
console.log,
null,
() => console.log('complete')
);
识别不同的数据,意味着有个数据结构记录了所有数据, 那么可能导致内存泄漏,第二个参数可以指定清除掉多少时间内的数据,即仅保证此段时间内的数据是唯一的:
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/distinct';
const source$ = Observable.interval(100).map(x => x % 1000);
const distinct$ = source$.distinct(null, Observable.interval(500));
distinct$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.4.2 distinctUtilChanged
distinct比较的是过去的所有数据,此方法是比较前一个数据,如果和前一个数据不同则保留。例如下面例子为 0 1 2 0 1 3。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/distinctUntilChanged';
const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
const distinct$ = source$.distinctUntilChanged();
distinct$.subscribe(
console.log,
null,
() => console.log('complete')
);
同样,对于复杂对象,也可以传递一个方法进行比较,但是注意看下例的方法的参数和distinct传入函数的参数居然不一致:
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/distinctUntilChanged';
const source$ = Observable.of(
{name: 'RxJS', version: 'v4'},
{name: 'React', version: 'v15'},
{name: 'React', version: 'v16'},
{name: 'RxJS', version: 'v5'}
);
const compare = (a, b) => a.name === b.name;
const distinct$ = source$.distinctUntilChanged(compare);
distinct$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.4.3 distinctUtilKeyChanged
可以被当作distinctUtilChanged的一个简化写法,可以直接对某些字段进行比较。
3. 其他过滤方式
3.1 ignoreElements
忽略上游产生的所有数据,而只关心complete和error。
3.2 elementAt
$source.elementAt(index, null)
把上游数据当作数据结构,取第index个数据,第二个参数可以指定不存在第index个数据时候的默认值,否则会报错。
3.3 single
判定上游是否只有一个符合条件的数据。