Cold/Hot Observables - 图1

Cold Observables

Cold Observables 只有被 observers 订阅的时候,才会开始产生值。是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值,所以每个订阅接收到的值都是一样的。

如果大家想要参考 Cold Observables 相关代码,直接看前面的单播示例就行了。

正如单播描述的能力,不管观察者们什么时候开始订阅,源对象都会从初始值开始把所有的数都发给该观察者。

Hot Observables

Hot Observables 不管有没有被订阅都会产生值。是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的,取决于它们是从什么时候开始订阅。
这里有几种场景,我们可以逐一分析一下便于理解:

“加热”

首先可以忽略代码中出现的陌生的函数,后面会细说。

  1. const source = Rx.Observable.of(1, 2).publish();
  2. source.connect();
  3. source.subscribe((value) => console.log('A:' + value));
  4. setTimeout(() => {
  5. source.subscribe((value) => console.log('B:' + value));
  6. }, 1000);

这里首先用 Rx 的操作符 of 创建了一个 Observable,并且后面跟上了一个 publish 函数,在创建完之后调用 connect 函数进行开始数据发送。
最终代码的执行结果就是没有任何数据打印出来,分析一下原因其实也比较好理解,由于开启数据发送的时候还没有订阅,并且这是一个 Hot Observables,它是不会理会你是否有没有订阅它,开启之后就会直接发送数据,所以 A 和 B 都没有接收到数据。

当然你这里如果把 connect 方法放到最后,那么最终的结果就是 A 接收到了,B 还是接不到,因为 A 在开启发数据之前就订阅了,而 B 还要等一秒。

更直观的场景

正如上述多播所描述的,其实我们更多想看到的现象是能够 A 和 B 两个观察者能够都有接收到数据,然后观察数据的差别,这样会方便理解。
这里直接换一个发射源:

const source = Rx.Observable.interval(1000).take(3).publish();

source.subscribe((value: number) => console.log('A:' + value));

setTimeout(() => { 
  source.subscribe((value: number) => console.log('B:' + value));
}, 3000);

source.connect();

// A:0
// A:1
// A:2
// B:2

这里我们利用 interval 配合 take 操作符每秒发射一个递增的数,最多三个,然后这个时候的打印结果就更清晰了,A 正常接收到了三个数,B 三秒之后才订阅,所以只接收到了最后一个数 2,这种方式就是上述多播所描述的并无一二。

两者对比

  • Cold Observables:举个栗子会比较好理解一点:比如我们上 B 站看番,更新了新番,我们不论什么时候去看,都能从头开始看到完整的剧集,与其他人看不看毫无关联,互不干扰。
  • Hot Observables:这就好比我们上 B 站看直播,直播开始之后就直接开始播放了,不管是否有没有订阅者,也就是说如果你没有一开始就订阅它,那么你过一段时候后再去看,是不知道前面直播的内容的。

    上述代码中出现的操作符解析

    在创建 Hot Observables 时我们用到了 publish 与 connect 函数的结合,其实调用了 publish 操作符之后返回的结果是一个 ConnectableObservable,然后该对象上提供了 connect 方法让我们控制发送数据的时间。

  • publish:这个操作符把正常的 ObservableCold Observables)转换成 ConnectableObservable

  • ConnectableObservableConnectableObservable 是多播的共享 Observable,可以同时被多个 observers共享订阅,它是 Hot ObservablesConnectableObservable 是订阅者和真正的源头 Observables(上面例子中的 interval,每隔一秒发送一个值,就是源头 Observables)的中间人,ConnectableObservable 从源头 Observables 接收到值然后再把值转发给订阅者。
  • connect()ConnectableObservable 并不会主动发送值,它有个 connect方法,通过调用 connect 方法,可以启动共享 ConnectableObservable 发送值。当我们调用 ConnectableObservable.prototype.connect 方法,不管有没有被订阅,都会发送值。订阅者共享同一个实例,订阅者接收到的值取决于它们何时开始订阅。

其实这种手动控制的方式还挺麻烦的,有没有什么更加方便的操作方式呢,比如监听到有订阅者订阅了才开始发送数据,一旦所有订阅者都取消了,就停止发送数据?其实也是有的,让我们看看引用计数(refCount):

引用计数

这里主要用到了 publish 结合 refCount 实现一个 “自动挡” 的效果。

const source = Rx.Observable.interval(1000).take(3).publish().refCount();

setTimeout(() => { 
  source.subscribe(data => { console.log("A:" + data) }); 
  setTimeout(() => { 
    source.subscribe(data => { console.log("B:" + data) });
  }, 1000);
}, 2000);

// A:0
// A:1
// B:1
// A:2
// B:2

我们透过结果看本质,能够很轻松的发现,只有当 A 订阅的时候才开始发送数据(A 拿到的数据是从 0 开始的),并且当 B 订阅时,也是只能获取到当前发送的数据,而不能获取到之前的数据。
不仅如此,这种 “自动挡” 当所有订阅者都取消订阅的时候它就会停止再发送数据了。