RxJS 非常强大,各种操作符连接在一起便能让数据流动到需要用到它的地方,有人甚至觉得 RxJS 是魔法;
这篇文章会通过 10 个有趣的小 demo 渐进式的实现 RxJS 的核心功能,其中包括:
- 类:Observable 的实现
- 类的方法:subscribe,pipe 的实现
- 创建类操作符:of,from,fromEvent,interval,timer 的实现
- 过滤类操作符:filter,take 的实现
- 工具类操作符:tap 的实现
- 组合类操作符:merge 的实现
实现一个 Observable
Observable 表示一个可观察对象,他表示一个可调用的未来值或事件的集合。
比如有以下代码: ```javascript import { Observable } from ‘rxjs’;
const dataStream$ = new Observable(observer => { observer.next(1); setTimeout(() => { observer.next(2); observer.complete(); }, 1000) observer.next(3); });
const observer = { next: x => console.log(x), error: err => console.error(err), complete: () => console.log(‘done’), }
dataStream$.subscribe(observer);
这段代码引用的是官方的 Observable, 它在运行后会首先打印一个 1,接着打印一个 3,隔一秒后会再打印一个 2,最后运行结束<br />仔细观察 Observable 方法,他会接受一个方法传进它的构造函数,这个方法接受一个对象,对象上有 next, error, complete 等属性,但是这个对象是 Observable 实例在调用 subscribe 方法时才传进去的:<br />有了上面的思路,我们可以大胆的构造出自己的 Observable, 如下:
```javascript
export class Observable {
_subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(observer) {
this._subscribe(observer);
}
}
把官方的 Observable 替换成自己的 Observable 会发现输出没什么差异。
实现创建类操作符 of
创建类操作符中,最容易理解的莫过于 of,那么我们就先实现 of 操作符。
比如有如下代码:
import { of } from 'rxjs';
const dataStream$ = of(1, 2, 3)
const observer = {
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('done'),
}
dataStream$.subscribe(observer);
它在运行后会首先打印一个 1,接着打印一个 2,再会打印一个 3,最后运行结束。
有了前面自己实现的 Observable, of 的实现就会变得非常简单,它实际上只是 Observable 外套了一层包装,本质上还是 Observable,实现如下:
export function of(...args) {
return new Observable(observer => {
args.forEach(arg => {
observer.next(arg);
})
observer.complete();
})
}
把官方的 of 替换成自己的 of ,再配上自己实现的 Observable,我们会发现输出和官方一致。
Observable.subscribe 可以传人一个方法作为参数
官方 Observable 的 subscribe 可以传入一个函数进去,这样的话写起来会清爽很多,如下:
import { of } from 'rxjs';
const dataStream$ = of(1, 2, 3)
dataStream$.subscribe(console.log);
为了我们的 Observable 也能这样的好用,我们可以将 subscribe 适当的改造一下,如下:
export class Observable {
_subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(observer) {
const defaultObserver = {
next: () => { },
error: () => { },
complete: () => { }
}
if (typeof observer === 'function') {
return this._subscribe({ ...defaultObserver, next: observer });
} else {
return this._subscribe({ ...defaultObserver, ...observer });
}
}
}
实现创建类操作符 fromEvent
但是,Rxjs 核心要解决的是数据流传输的问题,很多时候,我们的数据源头来自用户的人机交互,比如说点击按钮,这样的话就不得不用到 fromEvent,比如如下代码:
import { fromEvent } from 'rxjs';
import { JSDOM } from 'jsdom';
const element = new JSDOM(`<div>Hello world</div>`).window.document.querySelector('div');
const source$ = fromEvent(element, 'click');
source$.subscribe(console.log);
setTimeout(() => {
element.click()
}, 1000)
为了方便对比和测试,我们引用了 jsdom,它的作用是在 node 端可以做一些 dom 的相关操作。
以上代码渲染了一个 Hello world 的元素盒子,并且在一秒钟之后会点击这个盒子。与此同时,我们又使用了 Rxjs 中的 fromEvent 来监听盒子的事件。
为了实现自己的fromEvent,我们来分析一下 fromEvent 所需要的参数,第一个传的是 dom 元素的实例,第二个则是事件的类型,于是可以猜到, fromEvent 内部本质上还是通过原生的 addEventListener 来实现的。而且需要注意到,除非自己手动取消订阅,否则fromEvent创造的对象永远不会结束,根据这个推测,我们能猜到它的内部很有可能只有 next 方法。
有了上述的推断,我们很容易就写出了一个 fromEvent 方法,如下:
export function fromEvent(element, event) {
return new Observable(observer => {
const handler = e => observer.next(e);
element.addEventListener(event, handler);
});
}
5、实现创建类操作符 from、interval、timer
例如,interval 操作符可以这样实现:
export function interval(delay) {
return new Observable(observer => {
let index = 0;
setInterval((() => {
observer.next(index++)
}), delay)
})
}
timer 操作符可以这样实现:
export function timer(delay) {
return new Observable(observer => {
setTimeout((() => {
observer.next(0)
}), delay)
})
}
from 操作符的实现稍微比较复杂,因为它可以接受 Array 或者 Promise 类型的参数:
export function from(param) {
if (Array.isArray(param)) {
return new Observable(observer => {
param.forEach(val => observer.next(val));
observer.complete();
});
}
return new Observable(observer => {
Promise.resolve(param)
.then(val => {
observer.next(val);
observer.complete();
})
.catch(e => {
observer.error(e);
});
})
}
用我们自己实现的 from、interval、timer 操作符来替换官方的操作符号,会发现执行结果和官方的表现的一致
import { from, timer, interval } from 'rxjs';
const dataStream1$ = from([1, 2, 3]);
const dataPromise = new Promise((res) => {
setTimeout(() => {
res('dataPromise');
}, 1500)
})
const dataStream2$ = from(dataPromise);
const dataStream3$ = timer(1000);
const dataStream4$ = interval(1000);
setTimeout(() => {
console.log('===== test from =====');
dataStream1$.subscribe(console.log);
dataStream2$.subscribe(console.log);
}, 1000)
setTimeout(() => {
console.log('===== test timer =====');
dataStream3$.subscribe(console.log);
}, 3000)
setTimeout(() => {
console.log('===== test interval =====');
dataStream4$.subscribe(console.log);
}, 5000)
6、为创建类操作符添加取消订阅功能
上面自己实现的操作符,我无法取消订阅它们,处理不好这会造成严重的内存泄漏。
于是我们着手改造,以 fromEvent 为例,只需要给 Observable 构造函数传入的方法一个返回值,在这个返回值加一个 unsubscribe 属性,然后在这个属性中写入取消订阅的操作。
export function fromEvent(element, event) {
return new Observable(observer => {
const handler = e => observer.next(e);
element.addEventListener(event, handler);
return {
unsubscribe: () => element.removeEventListener(event, handler)
};
});
}
将上面的操作符都整理一下加上返回值
export class Observable {
_subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(observer) {
const defaultObserver = {
next: () => { },
error: () => { },
complete: () => { }
}
if (typeof observer === 'function') {
return this._subscribe({ ...defaultObserver, next: observer });
} else {
return this._subscribe({ ...defaultObserver, ...observer });
}
}
}
export function of(...args) {
return new Observable(observer => {
args.forEach(arg => {
observer.next(arg);
})
observer.complete();
return {
unsubscribe: () => { }
}
})
}
export function fromEvent(element, event) {
return new Observable(observer => {
const handler = e => observer.next(e);
element.addEventListener(event, handler);
return {
unsubscribe: () => element.removeEventListener(event, handler)
};
});
}
export function from(param) {
if (Array.isArray(param)) {
return new Observable(observer => {
param.forEach(val => observer.next(val));
observer.complete();
return {
unsubscribe: () => { }
}
});
}
return new Observable(observer => {
let canceld = false;
Promise.resolve(param)
.then(val => {
if (!canceld) {
observer.next(val);
observer.complete();
}
})
.catch(e => {
observer.error(e);
});
return {
unsubscribe: () => { canceld = true }
}
})
}
export function interval(delay) {
return new Observable(observer => {
let index = 0;
const time = setInterval((() => {
observer.next(index++)
}), delay)
return {
unsubscribe: () => clearInterval(time)
}
})
}
export function timer(delay) {
return new Observable(observer => {
const time = setTimeout((() => {
observer.next(0)
}), delay)
return {
unsubscribe: () => clearTimeout(time)
}
})
}
7、实现转换类操作符 map 和过滤类操作符 filter(链式调用实现)
实现链式调用的关键是 map 和 filter 的返回值必须也要是 Observable 实例,而且 map 和 filter 需要挂载到实例对象上。们尝试写出了 rxjs5 中 map 、filter的实现,如下:
export class Observable {
_subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
map(fn) {
return new Observable(observer => {
this.subscribe({
next: val => observer.next(fn(val)),
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
}
filter(fn) {
return new Observable(observer => {
this.subscribe({
next: val => fn(val) ? observer.next(val) : () => { },
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
}
}
const dataStream1$ = of(1, 2, 3);
dataStream1$
.map(data => data * 2)
.filter(data => data > 3)
.map(data => data + 1)
.subscribe(console.log)
非常完美,之前写的代码片段可以顺利执行,打印出了 5 和 7。
但是我们也发现的了链式调用的缺陷 —— 方法都在实例上。
这也就意味着哪怕仅仅用了一个转换操作符,也将会加载全部操作符。如果实例中的方法比较少,这还能忍受。但是像 rxjs 这样的库,转换类操作符和过滤类操作符加起来有几十种,这样的性能影响是无法忽略的。
而且这样的代码组织方式,打包工具的 tree shaking 将无法起作用,于是有了以下的 pipe 实现。
8、实现转换类操作符 map 和过滤类操作符 filter(pipe 调用实现)
pipe 的思想出现的是间比链式调用还要早,早在 Unix 中就有了 pipe,在 Linux 中,我们也能经常看到 | 符号。和前端相关的,有经验的开发同学会知道在 Gulp 也使用了大量的 pipe 的思想。
以一段 Gulp 的脚本为例:
const { src, dest } = require('gulp');
const babel = require('gulp-babel');
exports.default = function() {
return src('src/*.js')
.pipe(babel())
.pipe(dest('output/'));
}
乍一看和链式调用很像,但是由于 pipe 在中间做了一层隔离,实例对象和具体的转换方法解耦,所以完全没有了上面链式调用出现的问题。
rxjs 中的 pipe 实现更进一步,它能接受多个参数,参考如下写法:
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const dataStream1$ = of(1, 2, 3);
const dataStream2$ = dataStream1$
.pipe(map(data => data * 2))
.pipe(filter(data => data > 3))
.pipe(map(data => data + 1))
const dataStream3$ = dataStream1$.pipe(
map(data => data * 2),
filter(data => data > 3),
map(data => data + 1),
)
上面的代码中 dataStream2$ 和 dataStream3$ 实现的效果完全一致,所以想要 pipe 调用,核心在于如何实现这个加强版 pipe 方法。
一次性实现会有难度,我们可以尝试把这个问题分开,先实现只接受一个参数的 pipe,如下:
仔细观察 pipe, 我们会发现它接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable 实例。
而且 map 和 filter 不像之前链式调用那样挂载在 Observable 实例上,而仅仅是一个纯函数。
而且这个纯函数会在传入 pipe 的时候执行一次,目的是把具体 map 的逻辑传进去。然后在 pipe 方法里再执行一次,目的是吐出一个 Observable 以供后续使用。我们可以猜到 map 、 filter 这样的操作符实际上是包了两层 “纸” 的 Observable 实例。
有了上面的思路,我们简单推断便可以写出如下的代码(完整的代码查看demo08):
export class Observable {
_subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
// 接受单个参数的 pipe 实现
pipe(operation) {
return operation(this);
}
}
export function filter(fn) {
return (observable) => (
new Observable(observer => {
observable.subscribe({
next: val => fn(val) ? observer.next(val) : () => { },
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
)
}
export function map(fn) {
return (observable) => (
new Observable(observer => {
observable.subscribe({
next: val => observer.next(fn(val)),
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
)
}
我们在 pipe 中将 当前的 Observable 实例传递给操作符以生成一个新的 Observable 实例。经过测试发现,可以完美的运行。就这样,我们实现了可以接收单个参数的 pipe。
接下来我们来 实现可以接受多个参数的 pipe,如下:
import { of } from ‘rxjs’;import { map, filter } from ‘rxjs/operators’;
const dataStream1$ = of(1, 2, 3);
const dataStream2$ = dataStream1$.pipe( map(data => data * 2), filter(data => data > 3), map(data => data + 1),)
复制代码
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const dataStream1$ = of(1, 2, 3);
const dataStream2$ = dataStream1$.pipe(
map(data => data * 2),
filter(data => data > 3),
map(data => data + 1),
)
// 尝试将上面的 pipe 方法做一些改造成
pipe(...operations) {
return operations.reduce((prev, fn) => fn(prev), this);
}
tap 、 take 以及 merge 的实现
最后还有 tap 、 take 以及 merge 的实现,实现的原理和前面提到的操作符大同小异。感兴趣的读者可以继续查阅 demo09 和 demo10](https://github.com/WangYuLue/simple-rxjs/tree/master/demo10)),由于篇幅原因,这里就不展开讲了。
10、完整的核心代码
最后,完整的核心代码如下:
export class Observable {
_subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
pipe(...operations) {
return operations.reduce((prev, fn) => fn(prev), this);
}
subscribe(observer) {
const defaultObserver = {
next: () => { },
error: () => { },
complete: () => { }
}
if (typeof observer === 'function') {
return this._subscribe({ ...defaultObserver, next: observer });
} else {
return this._subscribe({ ...defaultObserver, ...observer });
}
}
}
export function of(...args) {
return new Observable(observer => {
args.forEach(arg => {
observer.next(arg);
})
observer.complete();
return {
unsubscribe: () => { }
}
})
}
export function fromEvent(element, event) {
return new Observable(observer => {
const handler = e => observer.next(e);
element.addEventListener(event, handler);
return {
unsubscribe: () => element.removeEventListener(event, handler)
};
});
}
/**
* @param param array or promise
*/
export function from(param) {
if (Array.isArray(param)) {
return new Observable(observer => {
param.forEach(val => observer.next(val));
observer.complete();
return {
unsubscribe: () => { }
}
});
}
return new Observable(observer => {
let canceld = false;
Promise.resolve(param)
.then(val => {
if (!canceld) {
observer.next(val);
observer.complete();
}
})
.catch(e => {
observer.error(e);
});
return {
unsubscribe: () => { canceld = true }
}
})
}
export function interval(delay) {
return new Observable(observer => {
let index = 0;
const time = setInterval((() => {
observer.next(index++)
}), delay)
return {
unsubscribe: () => clearInterval(time)
}
})
}
export function timer(delay) {
return new Observable(observer => {
const time = setTimeout((() => {
observer.next(0)
}), delay)
return {
unsubscribe: () => clearTimeout(time)
}
})
}
export function filter(fn) {
return (observable) => (
new Observable(observer => {
observable.subscribe({
next: val => fn(val) ? observer.next(val) : () => { },
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
)
}
export function map(fn) {
return (observable) => (
new Observable(observer => {
observable.subscribe({
next: val => observer.next(fn(val)),
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
)
}
export function take(num) {
return (observable) => (
new Observable(observer => {
let times = 0;
const subscription = observable.subscribe({
next: val => {
times++;
if (num >= times) {
observer.next(val)
} else {
observer.complete()
subscription.unsubscribe()
}
},
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
)
}
export function tap(fn) {
return (observable) => (
new Observable(observer => {
observable.subscribe({
next: val => {
fn(val);
observer.next(val);
},
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
)
}
export function merge(...observables) {
return (observable) => {
let completeNum = 0;
if (observable) {
observables = [observable, ...observables];
}
return new Observable(observer => {
observables.forEach(observable => {
observable.subscribe({
next: val => observer.next(val),
error: err => {
observables.forEach(observable.unsubscribe());
observer.error(err)
},
complete: () => {
completeNum++;
if (completeNum === observables.length) {
observer.complete()
}
},
})
})
})
}
}