使用和组合操作符石学习RxJS的重要部分,对操作符的使用的熟练程度决定RxJS的掌握程度。

1. 为什么要有操作符 10p

1.1 什么是操作符

一个操作符是返回一个Observable对象的函数。可以是根据其他Observable对象返回,也可以是利用其他类型输入 ,也可以是不需要输入就可以凭空产生。

1.2 为什么需要操作符

我们通过对比 map 和 filter 来对比这两个函数在数组 和 RxJS 中的用法,来理解二者的联系和区别。

  1. const source = [1, 2, 3, 4];
  2. const result = source.filter(x => x % 2 === 0).map(x => x * 2);
  3. console.log(result);

上述代码再简单不过了输出结果是 [4, 8],展示了几个关键特点:

  1. filter 和 map 都是数组对象的成员函数
  2. filter 和 map 的返回结果依旧是数组对象
  3. filter 和 map 不会改变原来的数组对象

上面特点导致了 filter 和 map 可以进行链式调用,虽然每个操作符提供的只是一些通用的简单功能,但是通过链式调用,这些小功能可以组合在一起用来解决复杂的问题。数组不会被修改,任务之间的耦合很低,代码就容易维护。

把上面的 “数组对象” 改成 “Observable对象” 看看结果如何:

  1. import Rx from 'rxjs';
  2. const source$ = Rx.Observable.of(1, 2, 3, 4);
  3. const result$ = source$.filter(x => x % 2 === 0).map(x => x * 2);
  4. result$.subscribe(console.log);

可以看到输出结果和数组对象一样,不同之处是最后使用 result$.subscribe 来接收推送。Observable 对象能够链式调用的原因和数组对象如出一辙(把上述特点的“数组对象” 改成 “Observable对象” 即可。

2. 操作符的分类 17p

不同操作符有各自的优势,一个问题可以用不同的操作符解决,遇到一个具体场景下的问题时,困难在于选择哪一些操作符来解决问题。
v5版本有60多个操作符,可按照多种方式进行分类,返回 Observable 对象是它们的共同特征。

2.1 按功能分类

第4-10章会介绍下面的部分分类

  • 创建类,作为数据流的源头,在第 4 章进行介绍
  • 转化类,将数据进行转化,产生新的数据推给下游,在第8章介绍
  • 过滤类,不是所有数据都需要到河流的终点,第7章会进行介绍
  • 合并类,将多个 Observable 对象合并到一个中,将在第5章中介绍
  • 多播类,一个数据要通知给多个接收者时,就是多播放,在第10章介绍
  • 错误处理类,第9章
  • 辅助工具类,第6章
  • 条件分支类
  • 数学和合计类

还有一些分类不适合归入到上面会进行分散介绍

  • 背压控制类,在第7、8章,过滤类 和 转化类操作符都包含背压控制的成员
  • 可连接类,它返回的对象比较特殊可以支持connect函数,和多播有关在第10章介绍
  • 高阶 Observable 对象是指产生的数据本身体也是 Observable 对象,在介绍 合并类、过滤类、转换类时时都会体现。

    2.2 按静态和实例分类

    根据操作符是属于JavaScript类的静态方法和实例方法,我们可以将草走符分为静态还是实例。链式调用时,静态方法必须在最前面调用。

在 RxJS 中有的操作符是静态方法(例如 Observable.of 位置在 /rxjs/add/observable/of ),有的操作符是实例方法( 例如 map位置在 /rxjs/add/operator/map ),有的(例如merge)则既可以作为静态方法也可以作为实例方法。

注:涉及到了引用方式的问题,什么情况下会需要单独引用?用到时可以细究。

3. 如何实现操作符

虽然应用开发者的重点是使用 RxJS 中的操作符,但是了解操作符的实现方式,会加深对 RxJS 的理解,在可能用的上重复逻辑的时候,可以封装在自定义的操作符中。

3.1 操作符函数的实现 10p

需要符合下面四个特点:

  1. 返回一个全新的Observable对象
  2. 对上游和下游的订阅及退订处理:订阅上游将处理交个下游,下游取消对上游的订阅
  3. 处理异常情况
  4. 及时释放资源

我们以 map 操作符举例,假定自己要实现这样一个操作符,如何满足上述四个特点:

3.1.1 返回一个全新的Observable对象

注:这里先不考虑 map 如何被挂载在 Observable 对象上,this 代表上游的Observable对象。
注:本例 + 上一章的例子理解透,后续才能继续推进。

  1. function map(project) {
  2. return new Observable(observer => {
  3. this.subscribe({
  4. next: value => {
  5. try {
  6. observer.next(project(value))
  7. } catch(err) {
  8. observer.error(err);
  9. }
  10. },
  11. error: err => observer.error(err),
  12. complete: () => observer.complete(),
  13. })
  14. }
  15. });
  16. }

3.1.2 订阅和退订处理

回顾一下第2章中的例子 Observable 退订:Observable 构造函数传入了一个函数,函数可以返回一个对象,对象包含的 unsubscribe 方法会被挂在 Observable 对象。Observable 对象调用 unsubscribe 后,所有通过 subscribe 建立起来关联的 observer 都会不再收到通知。

  1. function map(project) {
  2. return new Observable(observer => {
  3. + const sub = this.subscribe({
  4. next: value => {
  5. observer.next(project(value))
  6. },
  7. error: err => observer.error(err),
  8. complete: () => observer.complete(),
  9. })
  10. + return {
  11. + unsubscribe: () => {
  12. + sub.unsubscribe();
  13. + }
  14. }
  15. });
  16. }

说明:下游如果不明确告诉上游这些资源不用了的话,会导致上游无法释放这些资源。当然就 map 这个操作符而言,没有什么占用的资源,但是对于通用的操作符,上游有可能分配了特殊资源。

3.1.3 错误处理

project 的代码不受控制,我们能做的是捕获它的错误,然后交给下游进行处理。下面代码中既包含了上游的error,也包含了 project 产生的 error。

  1. function map(project) {
  2. return new Observable(observer => {
  3. const sub = this.subscribe({
  4. next: value => {
  5. + try {
  6. observer.next(project(value))
  7. + } catch(err) {
  8. + observer.error(err);
  9. + }
  10. },
  11. error: err => observer.error(err),
  12. complete: () => observer.complete(),
  13. })
  14. return {
  15. unsubscribe: () => {
  16. sub.unsubscribe();
  17. }
  18. }
  19. });
  20. }

3.1.4 及时释放资源

上面例子中没有体现“及时释放资源”,有的操作符可能涉及到DOM的事件处理函数、Websocket当中获取推送消息,都需要进行资源释放(不理解没关系,有合适场景的时候再细究)。

3.2 关联Observable 9p

3.2.1 给Observable打补丁

  1. Observable.prototype.map = function() {} // 注意:不能使用() => {} 影响this的判断

3.2.2 使用bind绑定特定Observable对象

  1. const results = map.bind($source)(x => x + 2);
  2. const results = map.call($source, x => x + 2);

bind方式存在着么一个问题,无法进行链式调用,例如下面的多层嵌套:

  1. map.bind(map.bind($source)(x => x + 2))(x * 2);

绑定操作符::可以将调用的函数绑定此符号前的对象(单目前李兰器并不支持它,需要借助Babel等转译工具)

  1. $source::map(x => x + 2)::map(x => x * 2);

3.3.3 使用lift函数

v5 版本的很多操作符都是用一个神奇的 lift 函数实现,它的作用就是提升,赋予更多功能。
当 lift 产生的 Observable 对象被订阅时,lift 参数函数就被调用,this 代表的就是Observable对象,$source代表上游的 Observable 对象。
应用开发者通常不使用lift,更多是留给RxJS库的开发者使用。

  1. function map(project) {
  2. return this.lift(function(source$){
  3. return source$.subscribe({
  4. next: value => {
  5. try {
  6. this.next(project(value))
  7. } catch(err) {
  8. this.error(err);
  9. }
  10. },
  11. error: err => this.error(err),
  12. complete: () => this.complete(),
  13. })
  14. });
  15. }
  16. Observeable.prototype.map = map;

3.3 改进的操作符定义 16p

3.3.1 操作符和 Observable 关联的缺陷

tree-shaking 的目的是去掉 死代码,它们可能最早是开发者团队写出来,然后被其他函数所替代;更多的是第三方库的代码,通常提供了完善的功能,但是具体只使用到其中一部分。在第2章 版本和如何安装 中介绍了为什么在 RxJS中无法应用 tree-shaking。那么缺陷之一就是,我们自己将操作符和 Observable 对象关联存在着无法 tree-shaking 的问题。

另外一个缺点在于,直接修改 Observable 这个全局对象,让我们嗅到了一些坏代码的味道。例如你通过给 Observable 添加了一个名为 map 的操作符,而另外一个应用直接依赖了你的应用,它是可以直接使用 map 操作符的。那么你如果升级去除了这个操作符,依赖你的应用可能无法工作。

3.3.2 使用 call 来创建

使用自定义操作符的原则:不要使用 “打补丁” 的方式导入其他操作符,避免污染全局 Observable:

  • 对于实例操作符,使用前面介绍的 bind/call 方法,让一个操作符只对一个具体的 Observable 对象生效;
  • 对于静态操作符,直接使用产生 Observable 对象的函数。

即:cusotmOperator($source, param)cusotmOperator.call(param)

在介绍下面例子之前,我们需要知道一些知识:

  • 首先,不同的操作符引用不同的路径
    • 通过 rxjs/add/observable 路径被引用,会被当做静态操作符(挂在 Observable 下)
    • 通过 rxjs/add/operator 路径被引用,会被当做实例操作符(挂在 Observable.prototype 下)
  • 其次,他们的源码都在 rxjs/observable/ 下
    • of 函数来自 rxjs/observable/of
    • map 函数来自 rxjs/observable/map
  • 然后,静态操作符、实例操作符,都是引用的 rxjs/observable/ 函数
  • 最后,要直接使用函数,差异有两个
    • 导入的路径和静态操作符、实例操作符不同
    • 操作符、实例操作符是导入默认变量,现在需要指定具体的变量

下面例子是一个很奇怪的例子,作者说不要污染Observable却将double挂在了它的原型下,为的是演示如何使用 of 和 map 这里两个操作符的使用方式。

  1. import { Observable } from 'rxjs/Observable'
  2. import { of } from 'rxjs/observable/of'
  3. import { map } from 'rxjs/operator/map'
  4. Observable.prototype.double = function () {
  5. // 不污染 Observable 的实例操作符用法,基于this bind call使用
  6. return this::map((x) => x * 2)
  7. // return map.bind(this)((x) => x * 2)
  8. // return map.call(this, (x) => {
  9. // return x * 2
  10. // })
  11. }
  12. // 不污染 Observable 的静态操作符用法,当普通函数使用
  13. const source$ = of(1, 2, 3)
  14. const result$ = source$.double()
  15. result$.subscribe((value) => console.log(value))

注意:绑定操作符::需要使用 Babel 转译才能执行。

3.4 lettable/pipeable操作符 25p

通过 bind/call 方式使用了 map 函数,但是第1章中介绍了纯函数的两个特点,因此 double 函数不是纯函数:

  1. 函数的执行过程完全由输入的参数决定(上节例子还依赖了this)
  2. 函数不会修改任何外部状态,比如全局变量、传入参数

另外在 call 的返回结果中是无法判定 TypeScript 类型的,从而丧失了类型检查的优势。

3.4.1 let 6p

v5.5.0开始加入了一种更先进的操作符的定义和操作方式,曾经被称为lettable操作符,后改成pipeable。

下例中 let 函数接收一个函数,函数返回一个新的 Observable对象,double 函数没再使用 this,是一个函数纯函数,满足函数式编程的要求。使用 map 要借助 let 操作符,方式能够被 let 当做参数使用的操作符,就叫 lettable 操作符。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/map';
  4. import 'rxjs/add/operator/let';
  5. const source$ = Observable.of(1, 2, 3);
  6. const double$ = obs$ => obs$.map(x => x * 2);
  7. const result$ = source$.let(double$);
  8. result$.subscribe(console.log);

image.png

3.4.2 lettable 和 pipeable 11p

出于如下几个原因,决定放弃对此书的考古工作:

  1. 代码不可运行,RxJS 是CommonJS写的,例子的引入方式直接报错,增加困惑
  2. 第3章开始,出现一些表述含混,前后乱序的地方,例如没有提起pipeable前就直接说到和pipeable有关的结论
  3. 版本已经是 7,花费太多时间纠再5版本上的各种语法问题,以及猜测作者表述的真实含义,太费时间

但是每一节都尝试用最简短的语言概括下介绍了什么。

在 let 操作符之后,又引入了 pipe 操作符,它除了具备 let 的功能,还能在讲多个 let 操作符 串起来。

  1. import {of} from 'rxjs/observable/of';
  2. import {map, filter} from 'rxjs/operators';
  3. const source$ = of(1, 2, 3);
  4. const result$ = source$.pipe(
  5. filter(x => x % 2 === 0),
  6. map(x => x * 2)
  7. );
  8. result$.subscribe(console.log);

本节内容有点前后交代不清楚,即在 pipeable 还没介绍就引出了相关的论述:

RxJS 从 v5.5.0 版本引入了 lettable 操作符,大部分操作符都有 pipeable 实现,注意是 “大部分” 而不是全部:

  • 静态类型操作符美俄有pipeable操作符的对应形式
  • 拥有多个上有的Observable对象的操作符没有pipeable操作符的对应形式

在引入 lettable 之后,以 map 为例存在如下几个目录:

  1. rxjs/add/operator/map.js
  2. rxjs/operator/map.js
  3. rxjs/operators/map.js

引用的操作符最终都是引用的3,因为1引用了2,2引用了3(大意如此)。lettable 操作符有不同于其他形式操作符的导入和使用方式:

  1. import {of} from 'rxjs/observable/of';
  2. import {map} from 'rxjs/operators';
  3. const source$ = of(1,2,3);
  4. const result$ = source$.pipe(map(x => x * x));
  5. $source.subscribe(console.log);

而后作者用一定篇幅介绍了为什么 lettable 被废弃,进化成了 pipeable:

存在lettable形式的操作符,引入方式破耗费精力理解:

“打补丁”的方法,引入的是 rxjs/add/operator/map,挂在Observable.prototype下,实际上是从 rxjs/operator/map.js 导入 map 函数的实现;支持call方法的 rxjs/operator/map.js,也不过是从 rxjs/operators/map.js 导入的真正的lettable操作符,然后把 this作为参数传递进去。

在引入了lettable操作符以后,实现来自新目录 rxjs/operators/map.js,“打补丁” 和 call 方法的操作符,都是为了兼容以前的写法。lettable操作符是为未来的趋势,在V6中 lettable 操作符可能是唯一支持的操作符。

🤔 内容混乱,已经无法准确地进行书写和记录了。

下面例子中引入了新的操作符 ~~**pipe**,of 属于静态操作符,只能从 rxjs/observable 导入;map 则从 rjx使用了来自 operators 的 map 操作符(operators下汇聚了而所有lettable操作符)。在前面例子中,为了引入 let,不得不使用了打补丁的方式引入lettable 或 ~~

🤔:是不是实例类的操作符,都必须单独引入然后形成打补丁的方式引用?

  1. import {of} from 'rxjs/observable/of';
  2. import {map} from 'rxjs/operators';
  3. const source$ = of(1,2,3);
  4. const result$ = source$.pipe(map(x => x * x));
  5. $source.subscribe(console.log);

3.4.3 被迫改名的pipeable操作符 5p

一些操作符曾应为和 JavaScript 的关键字冲突,而进行了重命名。

3.4.4 管道操作符 4p

为了让JavaScript支持管道操作符,提议了一个形式为 |> 的操作符。