0. 本章包含的内容
- RxJS的版本和运行环境
- Observable 的概念和使用方法
- Hot Observable 和 Cold Observable 的区别
- 陈操作符的概念
- 弹珠图的说明
1. 版本和运行环境
版本和如何安装
被广泛使用的版本有v4和v5,v5无论性能还是可维护性你都优于v4,本书的代码都按照v5的进行讲解。
注:在2021年10月,大版本已经到7,API使用也有变动,为了结合作者的例子进行学习,仍旧安装5.5.2版本。
在项目中引入RxJS有两种方法,分别是 npm安装 和 script标签导入。v4和v5采用不同的Github仓库名,npm包名也不同,v5的安安装为 npm install rxjs@5.5.2
。
用过Webpack应该都清楚tree shaking对于打包的优化是通过静态检查去除不被引用的部分实现的。因为RxJS的核心是Observable,而很多函数都挂在了Observable下面,即直接引用了 Observable 或者 Observable.prototype,这样就被操作符已有被有使用的地方而无法被 tree-shaking。另外一方面,tree-shaking 只对 ES6 适用,对于CommonJS 模块无效(RxJS 是用 CommonJS 模块)。因此对于 RxJS,是无法 tree-shaking 的。
如何运行例子
例子中的 babel 相关包都已过时,笔记 介绍了babel常用的一些工具包和作用。因为我们直接在node中执行,node对于ES6的执行是比较好的,配不配置Babel都不影响我们学习,理论山告知要安装 rxjs
即可。
RxJS v5的代码使用 CJS 模块书写,在一个基于 Webpack 配置的项目应该不存在引用问题,编译器会替你做必要的事情,确保代码可以在浏览器中执行;如果你想省事,直接在命令行中看执行效果,可以通过node直接执行。
但因为例子代码使用 ESM 方式写的(即使用 import 而非 require),因此我们需要在 pacakge.json 中增加 "type": "module",
的配置,并且确保Node的版本至少为14(具体参考 这里),然后就可以直接执行。
2. Observable和Observer
Observable 和 Observer 是最重要的两个概念,RxJS 的运行就是 Observable 和 Observer之间的互动游戏:
Observable,可被观察的对象,RxJS 中的数据流就是 Observable 对象,它实现了观察者模式 和 迭代器模式
Observer,观察者;
Observable.subscribe 将两者连接起来。
观察者模式
它解决的问题:在一个持续产生事件的系统中,让不同模块只需要处理一部分逻辑,实现分而治之。
观察者模式的解决办法是:将逻辑分为 发布者(Publisher)和 观察者(Observer),发布者只管生产事件,它会通知注册的观察者,而不关心观察者如何处理这些事件;观察者可以被注册上发布者,接到事件就处理,而不管数据是如何产生的。在 RxJS 中,Observable 对象就是发布者(下例中为 $source),扮演观察者的是 console.log 函数,它直观把 1、2、3 输出。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
const source$ = Observable.of(1, 2, 3);
source$.subscribe(console.log);
迭代器模式
迭代器模式是指实现了通用接口可以遍历数据集合,通常包含:getCurrent、moveToNext、isDone 这几个函数。
但是在RxJS中,我们看不到类似的代码,因为刚几个函数都属于“拉”式迭代器实现,RxJS实现的是“推”式的迭代器实现。比如,主动 ajax 请求数据是“拉”,服务器通过 Websocket 推送数据到网页则是“推”。
Observable例子
简单的 Observable 用例
参照备注,好好理解这个例子,下例中按顺序输出1、2、3
import { Observable } from 'rxjs/Observable'
// 用于构造 Observer 的函数
// 调用参数 observer 的 next 函数,把数据“推”给observer
// onSubscribe 的唯一的要求是必须包含一个 next 的属性,用于接收被“推”过来的数据
const onSubscribe = (observer) => {
observer.next(1)
observer.next(2)
observer.next(3)
}
// 创建了一个发布者(数据流对象)
// 这时候 onSubscribe 没有被调用,而在等待 subscribe 被调用
const source$ = new Observable(onSubscribe)
// 观察者
const theObserver = { next: (item) => console.log(item) }
// subscribe 将发布者和观察者联系起来,onSubscribe 开始被调用
// onSubscribe 中的 observer 代表的就是 theObserver,但因为经过包装并不完全一样
source$.subscribe(theObserver)
跨越时间的 Observable
下例中展示了 Observable 的一个重要功能,推送数据可以有时间间隔,这让异步操作变得十分容易,因为对于 Observable,只需要被动接收推送数据来数处理,而不需要关心数据何时产生。
间隔1秒输出1 2 3
// 处理Observer的函数,唯一的要求是必须包含一个next的属性,用于接收被推过来的数据(这个例子本质上是自己创造的,不是被推的)
const onSubscribe = (observer) => {
let number = 1;
// 间隔1秒推数
const handle = setInterval(() => {
observer.next(number ++)
if (number > 3) {
clearInterval(handle);
}
}, 1000)
}
// 创建了一个发布者
const source$ = new Observable(onSubscribe)
// 观察者
const theObserver = { next: (item) => console.log(item) }
// subscribe将发布者 和 观察者 联系起来,发布者产生数据时,观察者就会获得通知。
source$.subscribe(theObserver)
Observable 的完结
为了让 Observable 有机会告诉 Observer 已经没有更多数据了,需要有一种通信机制,这就是 complete 函数。下面例子中可以看到输出结果为 1、2、3、no more date。
// 处理Observer的函数,唯一的要求是必须包含一个next的属性,用于接收被推过来的数据(这个例子本质上是自己创造的,不是被推的)
const onSubscribe = (observer) => {
let number = 1;
// 间隔1秒推数
const handle = setInterval(() => {
observer.next(number ++)
if (number > 3) {
clearInterval(handle);
+ observer.complete();
}
}, 1000)
}
// 创建了一个发布者
const source$ = new Observable(onSubscribe)
// 观察者
const theObserver = {
next: (item) => {
console.log(item)
},
+ complete: () => {
+ console.log('no more data')
+ },
}
// subscribe将发布者 和 观察者 联系起来,发布者产生数据时,观察者就会获得通知。
source$.subscribe(theObserver)
Observable的出错处理
下例中,输出1以后,输出 some error happen 和 Em, I am done,但是不会输出complete的处理逻辑。因为Observable对象只有一个终结状态,要么是complete,要么是error,进入了终结状态后,不能再调用next和complete/error。
import { Observable } from 'rxjs/Observable'
// 处理Observer的函数,唯一的要求是必须包含一个next的属性,用于接收被推过来的数据(这个例子本质上是自己创造的,不是被推的)
const onSubscribe = (observer) => {
observer.next(1)
observer.error('some error happen')
console.log('Em, I am done')
observer.complete()
}
// 创建了一个发布者
const source$ = new Observable(onSubscribe)
// 观察者
const theObserver = {
// 数据流有数据到来的处理
next: (item) => console.log(item),
error: (err) => {
console.log(err)
},
// 数据流关闭的处理
complete: () => {
console.log('no more data')
},
}
// subscribe将发布者 和 观察者 联系起来,发布者产生数据时,观察者就会获得通知。
source$.subscribe(theObserver)
注:上述代码中我们给传递给source$.subscribe的对象,实际应用中可以简化直接出传递一个3个参数分别代表next、error、compete的处理,如果中间的某个参数为空,可以直接传递null。
3. 退订Observable
下面的例子在调用source$.subscribe
后得到了一个对象,对象上有一个unsubscribe方法,执行效果可以看到,在输出了 1 2 3 后不再输出数字(complete不在接收到数据),但是console.log的输出仍旧在继续。因此,subscribe返回的对象中,调用了它的unsubscribe方法后,会断开Observable对象和Observer的联系。
import { Observable } from 'rxjs/Observable'
const onSubscribe = (observer) => {
let number = 1
setInterval(() => {
console.log('number is ' + number)
observer.next(number++)
}, 1000)
return {
unsubscribe: () => {},
}
}
const source$ = new Observable(onSubscribe)
const theObserver = {
next: (item) => console.log(item),
}
const subscriber = source$.subscribe(theObserver)
setTimeout(() => {
subscriber.unsubscribe()
}, 3500)
4. Hot Observable 和 Cold Observable
对于Observable对象,可能有不同的Observer订阅了它的内容,但是订阅时机不同。我们是期望只接收到从订阅时刻开始的数据,还是希望所有历史数据都接收到,这是不同的两种策略。订阅时刻开始的叫Hot Observable,历史数据也包含的叫Cold Observable,在第10章会有详细介绍。
5. 操作符简介
前面的例子中,我们通过new Observable来构造了一个Observable对象,然后调用它 的subscribe来连接Observer。但是实际场景中,我们会往往会对数据流进行一些处理,从而得到全新的一个Observable对象(符合纯函数的特点)。这里概括下:操作符是用来产生新的Observable对象的函数,玩RxJS玩的就是操作符。
import { Observable } from 'rxjs/Observable'
import 'rxjs/add/operator/map';
const onSubscribe = (observer) => {
observer.next(1)
observer.next(2)
observer.next(3)
}
const source$ = new Observable(onSubscribe)
source$.map((d) => d * d).subscribe(console.log)
注:map方法需要单独引入。
6. 弹珠图
Observable代表一个数据流,但是有多个数据流的时候大脑不容易抽象,我们借助 被线条连接起来的弹珠 来理解数据流,弹珠代表数据,弹珠距离代表时间间隔,|代表数据流结束,X代表出现异常。
有一些网站(https://rxmarbles.com/) 列举了常见操作符的弹珠图,有一些网站(https://rxviz.com/)能根据代码生成弹珠图,在本书的后续章节,会大量使用这种工具来描述数据流。