手动建立Subject
有些案例下我們會希望第二次訂閱 source 不會從頭開始接收元素,而是從第一次訂閱到當前處理的元素開始發送,我們把這種處理方式稱為組播(multicast)
var source = Rx.Observable.interval(1000).take(3);var observerA = {next: value => console.log('A next: ' + value),error: error => console.log('A error: ' + error),complete: () => console.log('A complete!')}var observerB = {next: value => console.log('B next: ' + value),error: error => console.log('B error: ' + error),complete: () => console.log('B complete!')}var subject = {observers: [],addObserver: function(observer) {this.observers.push(observer)},next: function(value) {this.observers.forEach(o => o.next(value))},error: function(error){this.observers.forEach(o => o.error(error))},complete: function() {this.observers.forEach(o => o.complete())}}subject.addObserver(observerA) // 类似观察者模式加入把observerA加入内部清单source.subscribe(subject); // 用 subject 訂閱 source (真正订阅source就这一次)setTimeout(() => {subject.addObserver(observerB); // 一秒後再把 observerB 加到 subject,這時就可以看到 observerB 是直接收 1 開始,這就是組播(multicast)的行為}, 1000);// "A next: 0"// "A next: 1"// "B next: 1"// "A next: 2"// "B next: 2"// "A complete!"// "B complete!"
什么是Subject
首先 Subject 可以拿去訂閱 Observable(source) 代表他是一個 Observer,
source.subscribe(subject);
同時 Subject 又可以被 Observer(observerA, observerB) 訂閱,代表他是一個 Observable。
subject.addObserver(observerA)subject.addObserver(observerB)
30 天精通 RxJS (23):Subject, BehaviorSubject, ReplaySubject, AsyncSubject
Subject
這裡我們可以直接用 subject 的 next 方法傳送值,所有訂閱的 observer 就會接收到。
即:Subject作为Observer,主动用其next方法(参考上面手动建立的Subject)发出值,然后Subject作为Observable被observerA和observerB订阅
var subject = new Rx.Subject();var observerA = {next: value => console.log('A next: ' + value),error: error => console.log('A error: ' + error),complete: () => console.log('A complete!')}var observerB = {next: value => console.log('B next: ' + value),error: error => console.log('B error: ' + error),complete: () => console.log('B complete!')}subject.subscribe(observerA);subject.subscribe(observerB);subject.next(1);// "A next: 1"// "B next: 1"subject.next(2);// "A next: 2"// "B next: 2"
BehaviorSubject
在一訂閱時就能收到最新的狀態是什麼,而不是訂閱後要等到有變動才能接收到新的狀態
從上面這個範例可以看得出來 BehaviorSubject 在建立時就需要給定一個狀態,並在之後任何一次訂閱,就會先送出最新的狀態。其實這種行為就是一種狀態的表達而非單存的事件,就像是年齡跟生日一樣,年齡是一種狀態而生日就是事件;所以當我們想要用一個 stream 來表達年齡時,就應該用 BehaviorSubject。
var subject = new Rx.BehaviorSubject(0); // 0 為起始值var observerA = {next: value => console.log('A next: ' + value),error: error => console.log('A error: ' + error),complete: () => console.log('A complete!')}var observerB = {next: value => console.log('B next: ' + value),error: error => console.log('B error: ' + error),complete: () => console.log('B complete!')}subject.subscribe(observerA);// "A next: 0"subject.next(1);// "A next: 1"subject.next(2);// "A next: 2"subject.next(3);// "A next: 3"setTimeout(() => {subject.subscribe(observerB);// "B next: 3"},3000)
