Subscription

Subscription 就是表示 Observable 的执行,可以被清理。这个对象最常用的方法就是 unsubscribe 方法,它不需要任何参数,只是用来清理由 Subscription 占用的资源。同时,它还有 add 方法可以使我们取消多个订阅。

  1. const myObservable = Rx.Observable.create(observer => {
  2. observer.next('foo');
  3. setTimeout(() => observer.next('bar'), 1000);
  4. });
  5. const subscription = myObservable.subscribe(x => console.log(x));
  6. // 稍后:
  7. subscription.unsubscribe();
  8. // 这会取消正在进行中的 Observable 执行
  9. // Observable 执行是通过使用观察者调用 subscribe 方法启动的

Subject (主体)

它是一个代理对象,既是一个 Observable 又是一个 Observer,它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据,同时,Subject 会对内部的 observers 清单进行多播 (multicast)

Subscription 与 Subject - 图1

单播与多播

Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式


这个时候眼尖的读者会发现,这里产生了一个新概念——多播。

  • 那么多播又是什么呢?
  • 有了多播是不是还有单播?
  • 他们的区别又是什么呢?

接下来就让笔者给大家好好分析这两个概念吧。
Subscription 与 Subject - 图2

单播

普通的 Observable 是单播的,那么什么是单播呢?
单播的意思是,每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。

const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);

source.subscribe((value) => console.log('A ' + value)).

setTimeout(() => {  
  source.subscribe((value) => console.log('B ' + value))
}, 1000)

// A 0
// A 1
// B 0
// A 2
// B 1
// B 2

看到陌生的调用不要慌,后面会进行详细解析,这里的 source 你可以理解为就是一个每隔一秒发送一个从 0 开始递增整数的 Observable 就行了,且只会发送三次(take 操作符其实也就是限定拿多少个数就不在发送数据了。)。

从这里我们可以看出两个不同观察者订阅了同一个源(source),一个是直接订阅,另一个延时一秒之后再订阅。

从打印的结果来看,A 从 0 开始每隔一秒打印一个递增的数,而 B 延时了一秒,然后再从 0 开始打印,由此可见,A 与 B 的执行是完全分开的,也就是每次订阅都创建了一个新的实例。
在许多场景下,我们可能会希望 B 能够不从最初始开始接受数据,而是接受在订阅的那一刻开始接受当前正在发送的数据,这就需要用到多播能力了。

多播

那么如果实现多播能力呢,也就是实现我们不论什么时候订阅只会接收到实时的数据的功能。
可能这个时候会有小伙伴跳出来了,直接给个中间人来订阅这个源,然后将数据转发给 A 和 B 不就行了?

const source = Rx.Observable.interval(1000).take(3);

const subject = {
  observers: [],
  subscribe(target) { 
    this.observers.push(target);
  },
  next: function(value) {
    this.observers.forEach((next) => next(value))
  }
}
source.subscribe(subject);

subject.subscribe((value) => console.log('A ' + value))

setTimeout(() => { 
  subject.subscribe((value) => console.log('B ' + value))
}, 1000)

// A 0
// A 1
// B 1
// A 2
// B 2

先分析一下代码,A 和 B 的订阅和单播里代码并无差别,唯一变化的是他们订阅的对象由 source 变成了 subject,然后再看看这个 subject 包含了什么,这里做了一些简化,移除了 error、complete 这样的处理函数,只保留了 next,然后内部含有一个 observers 数组,这里包含了所有的订阅者,暴露一个 subscribe 用于观察者对其进行订阅。
在使用过程中,让这个中间商 subject 来订阅 source,这样便做到了统一管理,以及保证数据的实时性,因为本质上对于 source 来说只有一个订阅者。

这里主要是方便理解,简易实现了 RxJS 中的 Subject 的实例,这里的中间人可以直接换成 RxJS 的 Subject 类实例,效果是一样的

const source = Rx.Observable.interval(1000).take(3);

const subject = new Rx.Subject();
source.subscribe(subject);

subject.subscribe((value) => console.log('A ' + value))

setTimeout(() => {
  subject.subscribe((value) => console.log('B ' + value))
}, 1000)

同样先来看看打印的结果是否符合预期,首先 A 的打印结果并无变化,B 首次打印的数字现在是从 1 开始了,也就当前正在传输的数据,这下满足了我们需要获取实时数据的需求了。

不同于单播订阅者总是需要从头开始获取数据,多播模式能够保证数据的实时性。

Subject变体

除了以上这些,RxJS 还提供了 Subject 的三个变体:

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

BehaviorSubject

BehaviorSubject 是一种在有新的订阅时会额外发出最近一次发出的值的 Subject。

Subscription 与 Subject - 图3
]

同样我们结合现实场景来进行理解。假设我们需要使用它来维护一个状态,在它变化之后给所有重新订阅的人都能发送一个当前状态的数据。这就好比我们要实现一个计算属性,我们只关心该计算属性最终的状态,而不关心过程中变化的数,那么又该怎么处理呢?
我们知道普通的 Subject 只会在当前有新数据的时候发送当前的数据,而发送完毕之后就不会再发送已发送过的数据,那么这个时候我们就可以引入 BehaviorSubject 来进行终态维护了,因为订阅了该对象的观察者在订阅的同时能够收到该对象发送的最近一次的值,这样就能满足我们上述的需求了。 :::info 一句话解释:订阅时直接重新计算一次最新值; ::: 然后再结合代码来分析这种 Subject 应用的场景:

const subject = new Rx.Subject();
subject.subscribe((value) => console.log('A:' + value))

subject.next(1);// A:1
subject.next(2);// A:2

setTimeout(() => {
  // 1s后订阅,因此无法收到值
  subject.subscribe(
    (value) => console.log('B:' + value)); 
}, 1000)

首先演示的是采用普通 Subject 来作为订阅的对象,然后观察者 A 在实例对象 subject 调用 next 发送新的值之前订阅的,然后观察者是延时一秒之后订阅的,所以 A 接受数据正常,那么这个时候由于 B 在数据发送的时候还没订阅,所以它并没有收到数据。

那么我们再来看看采用 BehaviorSubject 实现的效果:

const subject = new Rx.BehaviorSubject(0);  // 需要传入初始值
subject.subscribe((value: number) => console.log('A:' + value))
// A:0
subject.next(1);// A:1
subject.next(2);// A:2
setTimeout(() => {
  subject.subscribe((value: number) => console.log('B:' + value)) // B:2
}, 1000)

同样从打印的结果来看,与普通 Subject 的区别在于,在订阅的同时源对象就发送了最近一次改变的值(如果没改变则发送初始值),这个时候我们的 B 也如愿获取到了最新的状态。 :::tips 这里在实例化 BehaviorSubject 的时候需要传入一个初始值。 :::

ReplaySubject

在理解了 BehaviorSubject 之后再来理解 ReplaySubject 就比较轻松了,ReplaySubject 会保存所有值,然后回放给新的订阅者,同时它提供了入参用于控制重放值的数量(默认重放所有)。
Subscription 与 Subject - 图4

什么?还不理解?看码:

const subject = new Rx.ReplaySubject(2);

subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe((value: number) =>
                  console.log('A:' + value))
// A:1
// A:2
subject.next(3);// A:3
subject.next(4);// A:4
setTimeout(() => {
  subject.subscribe((value: number) => console.log('B:' + value))  // B:3 // B:4
}, 1000)
// 整体打印顺序:
// A:1
// A:2
// A:3
// A:4
// B:3
// B:4

我们先从构造函数传参来看,BehaviorSubject 与 ReplaySubject 都需要传入一个参数,对 BehaviorSubject 来说是初始值,而对于 ReplaySubject 来说就是重放先前多少次的值,如果不传入重放次数,那么它将重放所有发射过的值。

从结果上看,如果你不传入确定的重放次数,那么实现的效果与之前介绍的单播效果几乎没有差别。

所以我们再分析代码可以知道在订阅的那一刻,观察者们就能收到源对象前多少次发送的值。

AsyncSubject

AsyncSubject 只有当 Observable 执行完成时 (执行 complete()),它才会将执行的最后一个值发送给观察者,如果因异常而终止,AsyncSubject 将不会释放任何数据,但是会向 Observer 传递一个异常通知。

Subscription 与 Subject - 图5
]

AsyncSubject 一般用的比较少,更多的还是使用前面三种。

const subject = new Rx.AsyncSubject();

subject.next(1);
subject.subscribe(res => {
  console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res => {
  console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res => {
  console.log('C:' + res);
});
subject.complete();
subject.next(4);

// 整体打印结果:
// A:3
// B:3
// C:3

从打印结果来看其实已经很好理解了,也就是说对于所有的观察者们来说,源对象只会在所有数据发送完毕也就是调用 complete 方法之后才会把最后一个数据返回给观察者们。

这就好比小说里经常有的,当你要放技能的时候,先要打一套起手式,打完之后才会放出你的大招。