image.png
RxJS 非常强大,各种操作符连接在一起便能让数据流动到需要用到它的地方,有人甚至觉得 RxJS 是魔法;
这篇文章会通过 10 个有趣的小 demo 渐进式的实现 RxJS 的核心功能,其中包括:

  • 类:Observable 的实现
  • 类的方法:subscribe,pipe 的实现
  • 创建类操作符:of,from,fromEvent,interval,timer 的实现
  • 过滤类操作符:filter,take 的实现
  • 工具类操作符:tap 的实现
  • 组合类操作符:merge 的实现

    实现一个 Observable

    Observable 表示一个可观察对象,他表示一个可调用的未来值或事件的集合。
    比如有以下代码: ```javascript import { Observable } from ‘rxjs’;

const dataStream$ = new Observable(observer => { observer.next(1); setTimeout(() => { observer.next(2); observer.complete(); }, 1000) observer.next(3); });

const observer = { next: x => console.log(x), error: err => console.error(err), complete: () => console.log(‘done’), }

dataStream$.subscribe(observer);

  1. 这段代码引用的是官方的 Observable, 它在运行后会首先打印一个 1,接着打印一个 3,隔一秒后会再打印一个 2,最后运行结束<br />仔细观察 Observable 方法,他会接受一个方法传进它的构造函数,这个方法接受一个对象,对象上有 next, error, complete 等属性,但是这个对象是 Observable 实例在调用 subscribe 方法时才传进去的:<br />有了上面的思路,我们可以大胆的构造出自己的 Observable 如下:
  2. ```javascript
  3. export class Observable {
  4. _subscribe;
  5. constructor(subscribe) {
  6. this._subscribe = subscribe;
  7. }
  8. subscribe(observer) {
  9. this._subscribe(observer);
  10. }
  11. }

把官方的 Observable 替换成自己的 Observable 会发现输出没什么差异。

实现创建类操作符 of

创建类操作符中,最容易理解的莫过于 of,那么我们就先实现 of 操作符。
比如有如下代码:

  1. import { of } from 'rxjs';
  2. const dataStream$ = of(1, 2, 3)
  3. const observer = {
  4. next: x => console.log(x),
  5. error: err => console.error(err),
  6. complete: () => console.log('done'),
  7. }
  8. dataStream$.subscribe(observer);

它在运行后会首先打印一个 1,接着打印一个 2,再会打印一个 3,最后运行结束。
有了前面自己实现的 Observable, of 的实现就会变得非常简单,它实际上只是 Observable 外套了一层包装,本质上还是 Observable,实现如下:

  1. export function of(...args) {
  2. return new Observable(observer => {
  3. args.forEach(arg => {
  4. observer.next(arg);
  5. })
  6. observer.complete();
  7. })
  8. }

把官方的 of 替换成自己的 of ,再配上自己实现的 Observable,我们会发现输出和官方一致。

Observable.subscribe 可以传人一个方法作为参数

官方 Observable 的 subscribe 可以传入一个函数进去,这样的话写起来会清爽很多,如下:

  1. import { of } from 'rxjs';
  2. const dataStream$ = of(1, 2, 3)
  3. dataStream$.subscribe(console.log);

为了我们的 Observable 也能这样的好用,我们可以将 subscribe 适当的改造一下,如下:

  1. export class Observable {
  2. _subscribe;
  3. constructor(subscribe) {
  4. this._subscribe = subscribe;
  5. }
  6. subscribe(observer) {
  7. const defaultObserver = {
  8. next: () => { },
  9. error: () => { },
  10. complete: () => { }
  11. }
  12. if (typeof observer === 'function') {
  13. return this._subscribe({ ...defaultObserver, next: observer });
  14. } else {
  15. return this._subscribe({ ...defaultObserver, ...observer });
  16. }
  17. }
  18. }

实现创建类操作符 fromEvent

但是,Rxjs 核心要解决的是数据流传输的问题,很多时候,我们的数据源头来自用户的人机交互,比如说点击按钮,这样的话就不得不用到 fromEvent,比如如下代码:

  1. import { fromEvent } from 'rxjs';
  2. import { JSDOM } from 'jsdom';
  3. const element = new JSDOM(`<div>Hello world</div>`).window.document.querySelector('div');
  4. const source$ = fromEvent(element, 'click');
  5. source$.subscribe(console.log);
  6. setTimeout(() => {
  7. element.click()
  8. }, 1000)

为了方便对比和测试,我们引用了 jsdom,它的作用是在 node 端可以做一些 dom 的相关操作。
以上代码渲染了一个 Hello world 的元素盒子,并且在一秒钟之后会点击这个盒子。与此同时,我们又使用了 Rxjs 中的 fromEvent 来监听盒子的事件。
为了实现自己的fromEvent,我们来分析一下 fromEvent 所需要的参数,第一个传的是 dom 元素的实例,第二个则是事件的类型,于是可以猜到, fromEvent 内部本质上还是通过原生的 addEventListener 来实现的。而且需要注意到,除非自己手动取消订阅,否则fromEvent创造的对象永远不会结束,根据这个推测,我们能猜到它的内部很有可能只有 next 方法。
有了上述的推断,我们很容易就写出了一个 fromEvent 方法,如下:

  1. export function fromEvent(element, event) {
  2. return new Observable(observer => {
  3. const handler = e => observer.next(e);
  4. element.addEventListener(event, handler);
  5. });
  6. }

5、实现创建类操作符 from、interval、timer

例如,interval 操作符可以这样实现:

  1. export function interval(delay) {
  2. return new Observable(observer => {
  3. let index = 0;
  4. setInterval((() => {
  5. observer.next(index++)
  6. }), delay)
  7. })
  8. }

timer 操作符可以这样实现:

  1. export function timer(delay) {
  2. return new Observable(observer => {
  3. setTimeout((() => {
  4. observer.next(0)
  5. }), delay)
  6. })
  7. }

from 操作符的实现稍微比较复杂,因为它可以接受 Array 或者 Promise 类型的参数:

  1. export function from(param) {
  2. if (Array.isArray(param)) {
  3. return new Observable(observer => {
  4. param.forEach(val => observer.next(val));
  5. observer.complete();
  6. });
  7. }
  8. return new Observable(observer => {
  9. Promise.resolve(param)
  10. .then(val => {
  11. observer.next(val);
  12. observer.complete();
  13. })
  14. .catch(e => {
  15. observer.error(e);
  16. });
  17. })
  18. }

用我们自己实现的 from、interval、timer 操作符来替换官方的操作符号,会发现执行结果和官方的表现的一致

  1. import { from, timer, interval } from 'rxjs';
  2. const dataStream1$ = from([1, 2, 3]);
  3. const dataPromise = new Promise((res) => {
  4. setTimeout(() => {
  5. res('dataPromise');
  6. }, 1500)
  7. })
  8. const dataStream2$ = from(dataPromise);
  9. const dataStream3$ = timer(1000);
  10. const dataStream4$ = interval(1000);
  11. setTimeout(() => {
  12. console.log('===== test from =====');
  13. dataStream1$.subscribe(console.log);
  14. dataStream2$.subscribe(console.log);
  15. }, 1000)
  16. setTimeout(() => {
  17. console.log('===== test timer =====');
  18. dataStream3$.subscribe(console.log);
  19. }, 3000)
  20. setTimeout(() => {
  21. console.log('===== test interval =====');
  22. dataStream4$.subscribe(console.log);
  23. }, 5000)

6、为创建类操作符添加取消订阅功能

上面自己实现的操作符,我无法取消订阅它们,处理不好这会造成严重的内存泄漏。
于是我们着手改造,以 fromEvent 为例,只需要给 Observable 构造函数传入的方法一个返回值,在这个返回值加一个 unsubscribe 属性,然后在这个属性中写入取消订阅的操作。

  1. export function fromEvent(element, event) {
  2. return new Observable(observer => {
  3. const handler = e => observer.next(e);
  4. element.addEventListener(event, handler);
  5. return {
  6. unsubscribe: () => element.removeEventListener(event, handler)
  7. };
  8. });
  9. }

将上面的操作符都整理一下加上返回值

  1. export class Observable {
  2. _subscribe;
  3. constructor(subscribe) {
  4. this._subscribe = subscribe;
  5. }
  6. subscribe(observer) {
  7. const defaultObserver = {
  8. next: () => { },
  9. error: () => { },
  10. complete: () => { }
  11. }
  12. if (typeof observer === 'function') {
  13. return this._subscribe({ ...defaultObserver, next: observer });
  14. } else {
  15. return this._subscribe({ ...defaultObserver, ...observer });
  16. }
  17. }
  18. }
  19. export function of(...args) {
  20. return new Observable(observer => {
  21. args.forEach(arg => {
  22. observer.next(arg);
  23. })
  24. observer.complete();
  25. return {
  26. unsubscribe: () => { }
  27. }
  28. })
  29. }
  30. export function fromEvent(element, event) {
  31. return new Observable(observer => {
  32. const handler = e => observer.next(e);
  33. element.addEventListener(event, handler);
  34. return {
  35. unsubscribe: () => element.removeEventListener(event, handler)
  36. };
  37. });
  38. }
  39. export function from(param) {
  40. if (Array.isArray(param)) {
  41. return new Observable(observer => {
  42. param.forEach(val => observer.next(val));
  43. observer.complete();
  44. return {
  45. unsubscribe: () => { }
  46. }
  47. });
  48. }
  49. return new Observable(observer => {
  50. let canceld = false;
  51. Promise.resolve(param)
  52. .then(val => {
  53. if (!canceld) {
  54. observer.next(val);
  55. observer.complete();
  56. }
  57. })
  58. .catch(e => {
  59. observer.error(e);
  60. });
  61. return {
  62. unsubscribe: () => { canceld = true }
  63. }
  64. })
  65. }
  66. export function interval(delay) {
  67. return new Observable(observer => {
  68. let index = 0;
  69. const time = setInterval((() => {
  70. observer.next(index++)
  71. }), delay)
  72. return {
  73. unsubscribe: () => clearInterval(time)
  74. }
  75. })
  76. }
  77. export function timer(delay) {
  78. return new Observable(observer => {
  79. const time = setTimeout((() => {
  80. observer.next(0)
  81. }), delay)
  82. return {
  83. unsubscribe: () => clearTimeout(time)
  84. }
  85. })
  86. }

7、实现转换类操作符 map 和过滤类操作符 filter(链式调用实现)


实现链式调用的关键是 map 和 filter 的返回值必须也要是 Observable 实例,而且 map 和 filter 需要挂载到实例对象上。们尝试写出了 rxjs5 中 map 、filter的实现,如下:

  1. export class Observable {
  2. _subscribe;
  3. constructor(subscribe) {
  4. this._subscribe = subscribe;
  5. }
  6. map(fn) {
  7. return new Observable(observer => {
  8. this.subscribe({
  9. next: val => observer.next(fn(val)),
  10. error: err => observer.error(err),
  11. complete: () => observer.complete(),
  12. })
  13. })
  14. }
  15. filter(fn) {
  16. return new Observable(observer => {
  17. this.subscribe({
  18. next: val => fn(val) ? observer.next(val) : () => { },
  19. error: err => observer.error(err),
  20. complete: () => observer.complete(),
  21. })
  22. })
  23. }
  24. }
  25. const dataStream1$ = of(1, 2, 3);
  26. dataStream1$
  27. .map(data => data * 2)
  28. .filter(data => data > 3)
  29. .map(data => data + 1)
  30. .subscribe(console.log)

非常完美,之前写的代码片段可以顺利执行,打印出了 5 和 7。
但是我们也发现的了链式调用的缺陷 —— 方法都在实例上。
这也就意味着哪怕仅仅用了一个转换操作符,也将会加载全部操作符。如果实例中的方法比较少,这还能忍受。但是像 rxjs 这样的库,转换类操作符和过滤类操作符加起来有几十种,这样的性能影响是无法忽略的。
而且这样的代码组织方式,打包工具的 tree shaking 将无法起作用,于是有了以下的 pipe 实现。

8、实现转换类操作符 map 和过滤类操作符 filter(pipe 调用实现)

pipe 的思想出现的是间比链式调用还要早,早在 Unix 中就有了 pipe,在 Linux 中,我们也能经常看到 | 符号。和前端相关的,有经验的开发同学会知道在 Gulp 也使用了大量的 pipe 的思想。
以一段 Gulp 的脚本为例:

  1. const { src, dest } = require('gulp');
  2. const babel = require('gulp-babel');
  3. exports.default = function() {
  4. return src('src/*.js')
  5. .pipe(babel())
  6. .pipe(dest('output/'));
  7. }

乍一看和链式调用很像,但是由于 pipe 在中间做了一层隔离,实例对象和具体的转换方法解耦,所以完全没有了上面链式调用出现的问题。
rxjs 中的 pipe 实现更进一步,它能接受多个参数,参考如下写法:

  1. import { of } from 'rxjs';
  2. import { map, filter } from 'rxjs/operators';
  3. const dataStream1$ = of(1, 2, 3);
  4. const dataStream2$ = dataStream1$
  5. .pipe(map(data => data * 2))
  6. .pipe(filter(data => data > 3))
  7. .pipe(map(data => data + 1))
  8. const dataStream3$ = dataStream1$.pipe(
  9. map(data => data * 2),
  10. filter(data => data > 3),
  11. map(data => data + 1),
  12. )

上面的代码中 dataStream2$ 和 dataStream3$ 实现的效果完全一致,所以想要 pipe 调用,核心在于如何实现这个加强版 pipe 方法。
一次性实现会有难度,我们可以尝试把这个问题分开,先实现只接受一个参数的 pipe,如下:
仔细观察 pipe, 我们会发现它接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable 实例。
而且 map 和 filter 不像之前链式调用那样挂载在 Observable 实例上,而仅仅是一个纯函数。

而且这个纯函数会在传入 pipe 的时候执行一次,目的是把具体 map 的逻辑传进去。然后在 pipe 方法里再执行一次,目的是吐出一个 Observable 以供后续使用。我们可以猜到 map 、 filter 这样的操作符实际上是包了两层 “纸” 的 Observable 实例。
有了上面的思路,我们简单推断便可以写出如下的代码(完整的代码查看demo08):

  1. export class Observable {
  2. _subscribe;
  3. constructor(subscribe) {
  4. this._subscribe = subscribe;
  5. }
  6. // 接受单个参数的 pipe 实现
  7. pipe(operation) {
  8. return operation(this);
  9. }
  10. }
  11. export function filter(fn) {
  12. return (observable) => (
  13. new Observable(observer => {
  14. observable.subscribe({
  15. next: val => fn(val) ? observer.next(val) : () => { },
  16. error: err => observer.error(err),
  17. complete: () => observer.complete(),
  18. })
  19. })
  20. )
  21. }
  22. export function map(fn) {
  23. return (observable) => (
  24. new Observable(observer => {
  25. observable.subscribe({
  26. next: val => observer.next(fn(val)),
  27. error: err => observer.error(err),
  28. complete: () => observer.complete(),
  29. })
  30. })
  31. )
  32. }

我们在 pipe 中将 当前的 Observable 实例传递给操作符以生成一个新的 Observable 实例。经过测试发现,可以完美的运行。就这样,我们实现了可以接收单个参数的 pipe。
接下来我们来 实现可以接受多个参数的 pipe,如下:

import { of } from ‘rxjs’;import { map, filter } from ‘rxjs/operators’;
const dataStream1$ = of(1, 2, 3);
const dataStream2$ = dataStream1$.pipe( map(data => data * 2), filter(data => data > 3), map(data => data + 1),)
复制代码

  1. import { of } from 'rxjs';
  2. import { map, filter } from 'rxjs/operators';
  3. const dataStream1$ = of(1, 2, 3);
  4. const dataStream2$ = dataStream1$.pipe(
  5. map(data => data * 2),
  6. filter(data => data > 3),
  7. map(data => data + 1),
  8. )
  9. // 尝试将上面的 pipe 方法做一些改造成
  10. pipe(...operations) {
  11. return operations.reduce((prev, fn) => fn(prev), this);
  12. }

我们就实现了可以接受任意多参数的 pipe

tap 、 take 以及 merge 的实现

最后还有 tap 、 take 以及 merge 的实现,实现的原理和前面提到的操作符大同小异。感兴趣的读者可以继续查阅 demo09demo10](https://github.com/WangYuLue/simple-rxjs/tree/master/demo10)),由于篇幅原因,这里就不展开讲了。

10、完整的核心代码

最后,完整的核心代码如下:

  1. export class Observable {
  2. _subscribe;
  3. constructor(subscribe) {
  4. this._subscribe = subscribe;
  5. }
  6. pipe(...operations) {
  7. return operations.reduce((prev, fn) => fn(prev), this);
  8. }
  9. subscribe(observer) {
  10. const defaultObserver = {
  11. next: () => { },
  12. error: () => { },
  13. complete: () => { }
  14. }
  15. if (typeof observer === 'function') {
  16. return this._subscribe({ ...defaultObserver, next: observer });
  17. } else {
  18. return this._subscribe({ ...defaultObserver, ...observer });
  19. }
  20. }
  21. }
  22. export function of(...args) {
  23. return new Observable(observer => {
  24. args.forEach(arg => {
  25. observer.next(arg);
  26. })
  27. observer.complete();
  28. return {
  29. unsubscribe: () => { }
  30. }
  31. })
  32. }
  33. export function fromEvent(element, event) {
  34. return new Observable(observer => {
  35. const handler = e => observer.next(e);
  36. element.addEventListener(event, handler);
  37. return {
  38. unsubscribe: () => element.removeEventListener(event, handler)
  39. };
  40. });
  41. }
  42. /**
  43. * @param param array or promise
  44. */
  45. export function from(param) {
  46. if (Array.isArray(param)) {
  47. return new Observable(observer => {
  48. param.forEach(val => observer.next(val));
  49. observer.complete();
  50. return {
  51. unsubscribe: () => { }
  52. }
  53. });
  54. }
  55. return new Observable(observer => {
  56. let canceld = false;
  57. Promise.resolve(param)
  58. .then(val => {
  59. if (!canceld) {
  60. observer.next(val);
  61. observer.complete();
  62. }
  63. })
  64. .catch(e => {
  65. observer.error(e);
  66. });
  67. return {
  68. unsubscribe: () => { canceld = true }
  69. }
  70. })
  71. }
  72. export function interval(delay) {
  73. return new Observable(observer => {
  74. let index = 0;
  75. const time = setInterval((() => {
  76. observer.next(index++)
  77. }), delay)
  78. return {
  79. unsubscribe: () => clearInterval(time)
  80. }
  81. })
  82. }
  83. export function timer(delay) {
  84. return new Observable(observer => {
  85. const time = setTimeout((() => {
  86. observer.next(0)
  87. }), delay)
  88. return {
  89. unsubscribe: () => clearTimeout(time)
  90. }
  91. })
  92. }
  93. export function filter(fn) {
  94. return (observable) => (
  95. new Observable(observer => {
  96. observable.subscribe({
  97. next: val => fn(val) ? observer.next(val) : () => { },
  98. error: err => observer.error(err),
  99. complete: () => observer.complete(),
  100. })
  101. })
  102. )
  103. }
  104. export function map(fn) {
  105. return (observable) => (
  106. new Observable(observer => {
  107. observable.subscribe({
  108. next: val => observer.next(fn(val)),
  109. error: err => observer.error(err),
  110. complete: () => observer.complete(),
  111. })
  112. })
  113. )
  114. }
  115. export function take(num) {
  116. return (observable) => (
  117. new Observable(observer => {
  118. let times = 0;
  119. const subscription = observable.subscribe({
  120. next: val => {
  121. times++;
  122. if (num >= times) {
  123. observer.next(val)
  124. } else {
  125. observer.complete()
  126. subscription.unsubscribe()
  127. }
  128. },
  129. error: err => observer.error(err),
  130. complete: () => observer.complete(),
  131. })
  132. })
  133. )
  134. }
  135. export function tap(fn) {
  136. return (observable) => (
  137. new Observable(observer => {
  138. observable.subscribe({
  139. next: val => {
  140. fn(val);
  141. observer.next(val);
  142. },
  143. error: err => observer.error(err),
  144. complete: () => observer.complete(),
  145. })
  146. })
  147. )
  148. }
  149. export function merge(...observables) {
  150. return (observable) => {
  151. let completeNum = 0;
  152. if (observable) {
  153. observables = [observable, ...observables];
  154. }
  155. return new Observable(observer => {
  156. observables.forEach(observable => {
  157. observable.subscribe({
  158. next: val => observer.next(val),
  159. error: err => {
  160. observables.forEach(observable.unsubscribe());
  161. observer.error(err)
  162. },
  163. complete: () => {
  164. completeNum++;
  165. if (completeNum === observables.length) {
  166. observer.complete()
  167. }
  168. },
  169. })
  170. })
  171. })
  172. }
  173. }

参考
https://xie.infoq.cn/article/d03b3f4274fead911da55b349