来源
// 观察者接口interface Observer<T> {next: (value: T) => void;error: (value: any) => void;complete: () => void;}// 单元函数 一元函数 类型别名type UnaryFunction<T, R> = (source: T) => R;// 操作符函数 类型别名type OperatorFunction<T, R> = UnaryFunction<Observable<T>, Observable<R>>;// 拆分逻辑 类型别名type TeardownLogic = Subscription | Unsubscribable | void | (() => void);// 不予订阅 接口interface Unsubscribable {unsubscribe: () => void;}// 从数组开始的管道function pipeFromArray<T, R>(/* 数组 元素为单元函数 */fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {// 如果数组元素为空if (fns.length === 0) {// 返回一个匿名箭头函数,此函数返回参数本身return (input) => input as any as R;}// 如果数组元素只有一个if (fns.length === 1) {return fns[0];}// 排除以上情况// 返回一个匿名箭头函数,此函数返回以参数位初始值的reduce fns数组return (input: T): R => {/*reduce为数组中的每一个元素依次执行callback函数,不包括数组中被删除或从未被赋值的元素,接受四个参数:accumulator 累计器currentValue 当前值currentIndex 当前索引array 数组回调函数第一次执行时,accumulator 和currentValue的取值有两种情况:如果调用reduce()时提供了initialValue,accumulator取值为initialValue,currentValue取数组中的第一个值;如果没有提供 initialValue,那么accumulator取数组中的第一个值,currentValue取数组中的第二个值。实例:const array1 = [1, 2, 3, 4];const reducer = (previousValue, currentValue) => previousValue + currentValue;// 5 + 1 + 2 + 3 + 4console.log(array1.reduce(reducer, 5));// expected output: 15*/return fns.reduce((prev: any, fn: any) => fn(prev), input);};}// 订阅类 实现了不予订阅接口class Subscription implements Unsubscribable {// 拆解// private _teardowns: /* 不包含 */Exclude<TeardownLogic, void>[] = []; // 数组第一种写法private _teardowns: /* 不包含 */Array<Exclude<TeardownLogic, void>> = []; // 数组第二种写法unsubscribe(): void { // 取消订阅this._teardowns.forEach((teardown/* 拆卸 */) => {if (typeof teardown === 'function') { // 如果拆卸是函数teardown(); // 执行此函数} else { // 否则teardown.unsubscribe(); // 执行unsubscribe方法}});}add(teardown: TeardownLogic): void {if (teardown /* 拆卸 */) { // 如果teardown非空this._teardowns.push(teardown); // 将teardown添加到_teardowns}}}// 订阅者类 继承了订阅类 且实现了观察者接口class Subscriber<T> extends Subscription implements Observer<T> {private isStopped = false; // 已停止type: string;// 构造函数constructor(private observer: Partial<Observer<T>>/* 传入观察者实例 */, type: string) {super(); // 执行父类构造函数this.type = type;}// Observer中的next方法next(value: T) {// 传入的观察者实例if (this.observer.next /* 如果传入观察者实例存在next方法 */ && !this.isStopped/* 没有停止 */) {this.observer.next(value); // 执行next方法,传入值}}// Observer中的error方法error(value: any) {// 有错误,则将isStopped设置为true,停止this.isStopped = true;if (this.observer.error) { // 如果存在error方法this.observer.error(value); // 执行error方法并传入值}}// Observer中的complete方法complete() {// 完成,则将isStopped设置为true,停止this.isStopped = true;if (this.observer.complete) {// 如果存在completethis.observer.complete(); // 执行complete方法}if (this.unsubscribe) { // 如果存在unsubscribe方法this.unsubscribe(); // 执行unsubscribe方法}}}// 可观察类export class Observable<T> {type: string;// 构造函数constructor(private _subscribe: (observer: Observer<T>) => TeardownLogic, type: string = "user") {this._subscribe = _subscribe;this.type = type;}// 订阅函数subscribe(observer: Partial<Observer<T>>): Subscription {const subscriber = new Subscriber(observer, this.type); // 把观察者传入订阅者subscriber.add(this._subscribe(subscriber)); // 父类Subscription中的add方法,将订阅者放入_teardowns中return subscriber; // Subscriber继承了Subscription,所以此处可以返回}// 管道pipe(...operations/* 操作 */: OperatorFunction/* 操作符函数 */<any, any>[]): Observable<any> {return pipeFromArray(operations)(this); // 执行从数组开始的管道函数}}// map操作符export function map<T, R>(project: (value: T, index: number) => R) {return (observable: Observable<T>) =>new Observable<R>((subscriber) => {let i = 0;const subcription = observable.subscribe({next(value) {return subscriber.next(project(value, i++));},error(err) {subscriber.error(err);},complete() {subscriber.complete();},});return subcription;}, "map");}// 防抖// debounceTime 延时发送源 Observable 发送的值,但是会丢弃正在排队的发送如果源 Observable 又发出新值。export function debounceTime<T, R>(delay: number) {let time: number = 0;return (observable: Observable<T>) =>new Observable<R>((subscriber) => {const subcription = observable.subscribe({next(value) {if (time) {clearTimeout(time)}time = setTimeout(() => {return subscriber.next(value as any as R);}, delay)},error(err) {clearTimeout(time)subscriber.error(err);},complete() {clearTimeout(time)subscriber.complete();},});return subcription;}, "debounceTime");}
源码仓库:
RxJS的冷与热
http://jsbin.com/wabuguy/1/edit?js,console,output
https://github.com/RxJS-CN/rxjs-articles-translation/blob/master/articles/Hot-Vs-Cold-Observables.md
https://juejin.cn/post/6959003628816302087
 因为它是一个 “冷 “观察变量,所以每次订阅同一个观察变量都会创建一个新的连接。
 每次你订阅同一个可观察对象时,都会创建一个新的连接。
(也就是说,每次调用函数。因为可观察变量只是函数)
TL;DR: 当不想一遍又一遍地创建生产者( producer )时,你需要热的 Observable 。
冷(冷清 PS:我的理解)的是指 Observable 创建了生产者
// 冷的var cold = new Observable((observer) => { var producer = new Producer();// observer 会监听 producer});
热(热闹 PS:我的理解)的是指 Observable 复用生产者
// 热的var producer = new Producer();var hot = new Observable((observer) => {// observer 会监听 producer});
加上了我的理解,否则抽象出来的冷与热属实很抽象!
