image.png

来源

想法

  1. // 观察者接口
  2. interface Observer<T> {
  3. next: (value: T) => void;
  4. error: (value: any) => void;
  5. complete: () => void;
  6. }
  7. // 单元函数 一元函数 类型别名
  8. type UnaryFunction<T, R> = (source: T) => R;
  9. // 操作符函数 类型别名
  10. type OperatorFunction<T, R> = UnaryFunction<Observable<T>, Observable<R>>;
  11. // 拆分逻辑 类型别名
  12. type TeardownLogic = Subscription | Unsubscribable | void | (() => void);
  13. // 不予订阅 接口
  14. interface Unsubscribable {
  15. unsubscribe: () => void;
  16. }
  17. // 从数组开始的管道
  18. function pipeFromArray<T, R>(
  19. /* 数组 元素为单元函数 */
  20. fns: Array<UnaryFunction<T, R>>
  21. ): UnaryFunction<T, R> {
  22. // 如果数组元素为空
  23. if (fns.length === 0) {
  24. // 返回一个匿名箭头函数,此函数返回参数本身
  25. return (input) => input as any as R;
  26. }
  27. // 如果数组元素只有一个
  28. if (fns.length === 1) {
  29. return fns[0];
  30. }
  31. // 排除以上情况
  32. // 返回一个匿名箭头函数,此函数返回以参数位初始值的reduce fns数组
  33. return (input: T): R => {
  34. /*
  35. reduce为数组中的每一个元素依次执行callback函数,不包括数组中被删除或从未被赋值的元素,接受四个参数:
  36. accumulator 累计器
  37. currentValue 当前值
  38. currentIndex 当前索引
  39. array 数组
  40. 回调函数第一次执行时,accumulator 和currentValue的取值有两种情况:
  41. 如果调用reduce()时提供了initialValue,accumulator取值为initialValue,
  42. currentValue取数组中的第一个值;如果没有提供 initialValue,
  43. 那么accumulator取数组中的第一个值,currentValue取数组中的第二个值。
  44. 实例:
  45. const array1 = [1, 2, 3, 4];
  46. const reducer = (previousValue, currentValue) => previousValue + currentValue;
  47. // 5 + 1 + 2 + 3 + 4
  48. console.log(array1.reduce(reducer, 5));
  49. // expected output: 15
  50. */
  51. return fns.reduce((prev: any, fn: any) => fn(prev), input);
  52. };
  53. }
  54. // 订阅类 实现了不予订阅接口
  55. class Subscription implements Unsubscribable {
  56. // 拆解
  57. // private _teardowns: /* 不包含 */Exclude<TeardownLogic, void>[] = []; // 数组第一种写法
  58. private _teardowns: /* 不包含 */Array<Exclude<TeardownLogic, void>> = []; // 数组第二种写法
  59. unsubscribe(): void { // 取消订阅
  60. this._teardowns.forEach((teardown/* 拆卸 */) => {
  61. if (typeof teardown === 'function') { // 如果拆卸是函数
  62. teardown(); // 执行此函数
  63. } else { // 否则
  64. teardown.unsubscribe(); // 执行unsubscribe方法
  65. }
  66. });
  67. }
  68. add(teardown: TeardownLogic): void {
  69. if (teardown /* 拆卸 */) { // 如果teardown非空
  70. this._teardowns.push(teardown); // 将teardown添加到_teardowns
  71. }
  72. }
  73. }
  74. // 订阅者类 继承了订阅类 且实现了观察者接口
  75. class Subscriber<T> extends Subscription implements Observer<T> {
  76. private isStopped = false; // 已停止
  77. type: string;
  78. // 构造函数
  79. constructor(private observer: Partial<Observer<T>>/* 传入观察者实例 */, type: string) {
  80. super(); // 执行父类构造函数
  81. this.type = type;
  82. }
  83. // Observer中的next方法
  84. next(value: T) {
  85. // 传入的观察者实例
  86. if (this.observer.next /* 如果传入观察者实例存在next方法 */ && !this.isStopped/* 没有停止 */) {
  87. this.observer.next(value); // 执行next方法,传入值
  88. }
  89. }
  90. // Observer中的error方法
  91. error(value: any) {
  92. // 有错误,则将isStopped设置为true,停止
  93. this.isStopped = true;
  94. if (this.observer.error) { // 如果存在error方法
  95. this.observer.error(value); // 执行error方法并传入值
  96. }
  97. }
  98. // Observer中的complete方法
  99. complete() {
  100. // 完成,则将isStopped设置为true,停止
  101. this.isStopped = true;
  102. if (this.observer.complete) {// 如果存在complete
  103. this.observer.complete(); // 执行complete方法
  104. }
  105. if (this.unsubscribe) { // 如果存在unsubscribe方法
  106. this.unsubscribe(); // 执行unsubscribe方法
  107. }
  108. }
  109. }
  110. // 可观察类
  111. export class Observable<T> {
  112. type: string;
  113. // 构造函数
  114. constructor(private _subscribe: (observer: Observer<T>) => TeardownLogic, type: string = "user") {
  115. this._subscribe = _subscribe;
  116. this.type = type;
  117. }
  118. // 订阅函数
  119. subscribe(observer: Partial<Observer<T>>): Subscription {
  120. const subscriber = new Subscriber(observer, this.type); // 把观察者传入订阅者
  121. subscriber.add(this._subscribe(subscriber)); // 父类Subscription中的add方法,将订阅者放入_teardowns中
  122. return subscriber; // Subscriber继承了Subscription,所以此处可以返回
  123. }
  124. // 管道
  125. pipe(...operations/* 操作 */: OperatorFunction/* 操作符函数 */<any, any>[]): Observable<any> {
  126. return pipeFromArray(operations)(this); // 执行从数组开始的管道函数
  127. }
  128. }
  129. // map操作符
  130. export function map<T, R>(project: (value: T, index: number) => R) {
  131. return (observable: Observable<T>) =>
  132. new Observable<R>((subscriber) => {
  133. let i = 0;
  134. const subcription = observable.subscribe({
  135. next(value) {
  136. return subscriber.next(project(value, i++));
  137. },
  138. error(err) {
  139. subscriber.error(err);
  140. },
  141. complete() {
  142. subscriber.complete();
  143. },
  144. });
  145. return subcription;
  146. }, "map");
  147. }
  148. // 防抖
  149. // debounceTime 延时发送源 Observable 发送的值,但是会丢弃正在排队的发送如果源 Observable 又发出新值。
  150. export function debounceTime<T, R>(delay: number) {
  151. let time: number = 0;
  152. return (observable: Observable<T>) =>
  153. new Observable<R>((subscriber) => {
  154. const subcription = observable.subscribe({
  155. next(value) {
  156. if (time) {
  157. clearTimeout(time)
  158. }
  159. time = setTimeout(() => {
  160. return subscriber.next(value as any as R);
  161. }, delay)
  162. },
  163. error(err) {
  164. clearTimeout(time)
  165. subscriber.error(err);
  166. },
  167. complete() {
  168. clearTimeout(time)
  169. subscriber.complete();
  170. },
  171. });
  172. return subcription;
  173. }, "debounceTime");
  174. }

源码仓库:

mini-rxjs

RxJS的冷与热

http://jsbin.com/wabuguy/1/edit?js,console,output
https://github.com/RxJS-CN/rxjs-articles-translation/blob/master/articles/Hot-Vs-Cold-Observables.md
https://juejin.cn/post/6959003628816302087
因为它是一个 “冷 “观察变量,所以每次订阅同一个观察变量都会创建一个新的连接。
每次你订阅同一个可观察对象时,都会创建一个新的连接。
(也就是说,每次调用函数。因为可观察变量只是函数)

TL;DR: 当不想一遍又一遍地创建生产者( producer )时,你需要热的 Observable 。

冷(冷清 PS:我的理解)的是指 Observable 创建了生产者

  1. // 冷的
  2. var cold = new Observable((observer) => { var producer = new Producer();
  3. // observer 会监听 producer
  4. });

热(热闹 PS:我的理解)的是指 Observable 复用生产者

  1. // 热的
  2. var producer = new Producer();
  3. var hot = new Observable((observer) => {
  4. // observer 会监听 producer
  5. });

加上了我的理解,否则抽象出来的冷与热属实很抽象!