0. 本章包含的内容

  • RxJS的版本和运行环境
  • Observable 的概念和使用方法
  • Hot Observable 和 Cold Observable 的区别
  • 陈操作符的概念
  • 弹珠图的说明

1. 版本和运行环境

版本和如何安装

被广泛使用的版本有v4和v5,v5无论性能还是可维护性你都优于v4,本书的代码都按照v5的进行讲解。
注:在2021年10月,大版本已经到7,API使用也有变动,为了结合作者的例子进行学习,仍旧安装5.5.2版本。

在项目中引入RxJS有两种方法,分别是 npm安装script标签导入。v4和v5采用不同的Github仓库名,npm包名也不同,v5的安安装为 npm install rxjs@5.5.2

用过Webpack应该都清楚tree shaking对于打包的优化是通过静态检查去除不被引用的部分实现的。因为RxJS的核心是Observable,而很多函数都挂在了Observable下面,即直接引用了 Observable 或者 Observable.prototype,这样就被操作符已有被有使用的地方而无法被 tree-shaking。另外一方面,tree-shaking 只对 ES6 适用,对于CommonJS 模块无效(RxJS 是用 CommonJS 模块)。因此对于 RxJS,是无法 tree-shaking 的。

如何运行例子

例子中的 babel 相关包都已过时,笔记 介绍了babel常用的一些工具包和作用。因为我们直接在node中执行,node对于ES6的执行是比较好的,配不配置Babel都不影响我们学习,理论山告知要安装 rxjs即可。

RxJS v5的代码使用 CJS 模块书写,在一个基于 Webpack 配置的项目应该不存在引用问题,编译器会替你做必要的事情,确保代码可以在浏览器中执行;如果你想省事,直接在命令行中看执行效果,可以通过node直接执行。

但因为例子代码使用 ESM 方式写的(即使用 import 而非 require),因此我们需要在 pacakge.json 中增加 "type": "module",的配置,并且确保Node的版本至少为14(具体参考 这里),然后就可以直接执行。

2. Observable和Observer

Observable 和 Observer 是最重要的两个概念,RxJS 的运行就是 Observable 和 Observer之间的互动游戏:

Observable,可被观察的对象,RxJS 中的数据流就是 Observable 对象,它实现了观察者模式迭代器模式
Observer,观察者;
Observable.subscribe 将两者连接起来。

观察者模式

它解决的问题:在一个持续产生事件的系统中,让不同模块只需要处理一部分逻辑,实现分而治之。

观察者模式的解决办法是:将逻辑分为 发布者(Publisher) 观察者(Observer),发布者只管生产事件,它会通知注册的观察者,而不关心观察者如何处理这些事件;观察者可以被注册上发布者,接到事件就处理,而不管数据是如何产生的。在 RxJS 中,Observable 对象就是发布者(下例中为 $source),扮演观察者的是 console.log 函数,它直观把 1、2、3 输出。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. const source$ = Observable.of(1, 2, 3);
  4. source$.subscribe(console.log);

迭代器模式

迭代器模式是指实现了通用接口可以遍历数据集合,通常包含:getCurrent、moveToNext、isDone 这几个函数。

但是在RxJS中,我们看不到类似的代码,因为刚几个函数都属于“拉”式迭代器实现,RxJS实现的是“推”式的迭代器实现。比如,主动 ajax 请求数据是“拉”,服务器通过 Websocket 推送数据到网页则是“推”。

Observable例子

简单的 Observable 用例

参照备注,好好理解这个例子,下例中按顺序输出1、2、3

  1. import { Observable } from 'rxjs/Observable'
  2. // 用于构造 Observer 的函数
  3. // 调用参数 observer next 函数,把数据“推”给observer
  4. // onSubscribe 的唯一的要求是必须包含一个 next 的属性,用于接收被“推”过来的数据
  5. const onSubscribe = (observer) => {
  6. observer.next(1)
  7. observer.next(2)
  8. observer.next(3)
  9. }
  10. // 创建了一个发布者(数据流对象)
  11. // 这时候 onSubscribe 没有被调用,而在等待 subscribe 被调用
  12. const source$ = new Observable(onSubscribe)
  13. // 观察者
  14. const theObserver = { next: (item) => console.log(item) }
  15. // subscribe 将发布者和观察者联系起来,onSubscribe 开始被调用
  16. // onSubscribe 中的 observer 代表的就是 theObserver,但因为经过包装并不完全一样
  17. source$.subscribe(theObserver)

跨越时间的 Observable

下例中展示了 Observable 的一个重要功能,推送数据可以有时间间隔,这让异步操作变得十分容易,因为对于 Observable,只需要被动接收推送数据来数处理,而不需要关心数据何时产生。

间隔1秒输出1 2 3

  1. // 处理Observer的函数,唯一的要求是必须包含一个next的属性,用于接收被推过来的数据(这个例子本质上是自己创造的,不是被推的)
  2. const onSubscribe = (observer) => {
  3. let number = 1;
  4. // 间隔1秒推数
  5. const handle = setInterval(() => {
  6. observer.next(number ++)
  7. if (number > 3) {
  8. clearInterval(handle);
  9. }
  10. }, 1000)
  11. }
  12. // 创建了一个发布者
  13. const source$ = new Observable(onSubscribe)
  14. // 观察者
  15. const theObserver = { next: (item) => console.log(item) }
  16. // subscribe将发布者 观察者 联系起来,发布者产生数据时,观察者就会获得通知。
  17. source$.subscribe(theObserver)

Observable 的完结

为了让 Observable 有机会告诉 Observer 已经没有更多数据了,需要有一种通信机制,这就是 complete 函数。下面例子中可以看到输出结果为 1、2、3、no more date。

  1. // 处理Observer的函数,唯一的要求是必须包含一个next的属性,用于接收被推过来的数据(这个例子本质上是自己创造的,不是被推的)
  2. const onSubscribe = (observer) => {
  3. let number = 1;
  4. // 间隔1秒推数
  5. const handle = setInterval(() => {
  6. observer.next(number ++)
  7. if (number > 3) {
  8. clearInterval(handle);
  9. + observer.complete();
  10. }
  11. }, 1000)
  12. }
  13. // 创建了一个发布者
  14. const source$ = new Observable(onSubscribe)
  15. // 观察者
  16. const theObserver = {
  17. next: (item) => {
  18. console.log(item)
  19. },
  20. + complete: () => {
  21. + console.log('no more data')
  22. + },
  23. }
  24. // subscribe将发布者 观察者 联系起来,发布者产生数据时,观察者就会获得通知。
  25. source$.subscribe(theObserver)

Observable的出错处理

下例中,输出1以后,输出 some error happen 和 Em, I am done,但是不会输出complete的处理逻辑。因为Observable对象只有一个终结状态,要么是complete,要么是error,进入了终结状态后,不能再调用next和complete/error。

  1. import { Observable } from 'rxjs/Observable'
  2. // 处理Observer的函数,唯一的要求是必须包含一个next的属性,用于接收被推过来的数据(这个例子本质上是自己创造的,不是被推的)
  3. const onSubscribe = (observer) => {
  4. observer.next(1)
  5. observer.error('some error happen')
  6. console.log('Em, I am done')
  7. observer.complete()
  8. }
  9. // 创建了一个发布者
  10. const source$ = new Observable(onSubscribe)
  11. // 观察者
  12. const theObserver = {
  13. // 数据流有数据到来的处理
  14. next: (item) => console.log(item),
  15. error: (err) => {
  16. console.log(err)
  17. },
  18. // 数据流关闭的处理
  19. complete: () => {
  20. console.log('no more data')
  21. },
  22. }
  23. // subscribe将发布者 观察者 联系起来,发布者产生数据时,观察者就会获得通知。
  24. source$.subscribe(theObserver)

注:上述代码中我们给传递给source$.subscribe的对象,实际应用中可以简化直接出传递一个3个参数分别代表next、error、compete的处理,如果中间的某个参数为空,可以直接传递null。

3. 退订Observable

下面的例子在调用source$.subscribe后得到了一个对象,对象上有一个unsubscribe方法,执行效果可以看到,在输出了 1 2 3 后不再输出数字(complete不在接收到数据),但是console.log的输出仍旧在继续。因此,subscribe返回的对象中,调用了它的unsubscribe方法后,会断开Observable对象和Observer的联系。

  1. import { Observable } from 'rxjs/Observable'
  2. const onSubscribe = (observer) => {
  3. let number = 1
  4. setInterval(() => {
  5. console.log('number is ' + number)
  6. observer.next(number++)
  7. }, 1000)
  8. return {
  9. unsubscribe: () => {},
  10. }
  11. }
  12. const source$ = new Observable(onSubscribe)
  13. const theObserver = {
  14. next: (item) => console.log(item),
  15. }
  16. const subscriber = source$.subscribe(theObserver)
  17. setTimeout(() => {
  18. subscriber.unsubscribe()
  19. }, 3500)

4. Hot Observable 和 Cold Observable

对于Observable对象,可能有不同的Observer订阅了它的内容,但是订阅时机不同。我们是期望只接收到从订阅时刻开始的数据,还是希望所有历史数据都接收到,这是不同的两种策略。订阅时刻开始的叫Hot Observable,历史数据也包含的叫Cold Observable,在第10章会有详细介绍。

5. 操作符简介

前面的例子中,我们通过new Observable来构造了一个Observable对象,然后调用它 的subscribe来连接Observer。但是实际场景中,我们会往往会对数据流进行一些处理,从而得到全新的一个Observable对象(符合纯函数的特点)。这里概括下:操作符是用来产生新的Observable对象的函数,玩RxJS玩的就是操作符。

  1. import { Observable } from 'rxjs/Observable'
  2. import 'rxjs/add/operator/map';
  3. const onSubscribe = (observer) => {
  4. observer.next(1)
  5. observer.next(2)
  6. observer.next(3)
  7. }
  8. const source$ = new Observable(onSubscribe)
  9. source$.map((d) => d * d).subscribe(console.log)

注:map方法需要单独引入。

6. 弹珠图

Observable代表一个数据流,但是有多个数据流的时候大脑不容易抽象,我们借助 被线条连接起来的弹珠 来理解数据流,弹珠代表数据,弹珠距离代表时间间隔,|代表数据流结束,X代表出现异常。

有一些网站(https://rxmarbles.com/) 列举了常见操作符的弹珠图,有一些网站(https://rxviz.com/)能根据代码生成弹珠图,在本书的后续章节,会大量使用这种工具来描述数据流。