手动建立Subject

有些案例下我們會希望第二次訂閱 source 不會從頭開始接收元素,而是從第一次訂閱到當前處理的元素開始發送,我們把這種處理方式稱為組播(multicast)

  1. var source = Rx.Observable.interval(1000).take(3);
  2. var observerA = {
  3. next: value => console.log('A next: ' + value),
  4. error: error => console.log('A error: ' + error),
  5. complete: () => console.log('A complete!')
  6. }
  7. var observerB = {
  8. next: value => console.log('B next: ' + value),
  9. error: error => console.log('B error: ' + error),
  10. complete: () => console.log('B complete!')
  11. }
  12. var subject = {
  13. observers: [],
  14. addObserver: function(observer) {
  15. this.observers.push(observer)
  16. },
  17. next: function(value) {
  18. this.observers.forEach(o => o.next(value))
  19. },
  20. error: function(error){
  21. this.observers.forEach(o => o.error(error))
  22. },
  23. complete: function() {
  24. this.observers.forEach(o => o.complete())
  25. }
  26. }
  27. subject.addObserver(observerA) // 类似观察者模式加入把observerA加入内部清单
  28. source.subscribe(subject); // 用 subject 訂閱 source (真正订阅source就这一次)
  29. setTimeout(() => {
  30. subject.addObserver(observerB); // 一秒後再把 observerB 加到 subject,這時就可以看到 observerB 是直接收 1 開始,這就是組播(multicast)的行為
  31. }, 1000);
  32. // "A next: 0"
  33. // "A next: 1"
  34. // "B next: 1"
  35. // "A next: 2"
  36. // "B next: 2"
  37. // "A complete!"
  38. // "B complete!"

什么是Subject

首先 Subject 可以拿去訂閱 Observable(source) 代表他是一個 Observer,

  1. source.subscribe(subject);

同時 Subject 又可以被 Observer(observerA, observerB) 訂閱,代表他是一個 Observable。

  1. subject.addObserver(observerA)
  2. subject.addObserver(observerB)

30 天精通 RxJS (23):Subject, BehaviorSubject, ReplaySubject, AsyncSubject

Subject

這裡我們可以直接用 subject 的 next 方法傳送值,所有訂閱的 observer 就會接收到。
即:Subject作为Observer,主动用其next方法(参考上面手动建立的Subject)发出值,然后Subject作为Observable被observerA和observerB订阅

  1. var subject = new Rx.Subject();
  2. var observerA = {
  3. next: value => console.log('A next: ' + value),
  4. error: error => console.log('A error: ' + error),
  5. complete: () => console.log('A complete!')
  6. }
  7. var observerB = {
  8. next: value => console.log('B next: ' + value),
  9. error: error => console.log('B error: ' + error),
  10. complete: () => console.log('B complete!')
  11. }
  12. subject.subscribe(observerA);
  13. subject.subscribe(observerB);
  14. subject.next(1);
  15. // "A next: 1"
  16. // "B next: 1"
  17. subject.next(2);
  18. // "A next: 2"
  19. // "B next: 2"

BehaviorSubject

在一訂閱時就能收到最新的狀態是什麼,而不是訂閱後要等到有變動才能接收到新的狀態
從上面這個範例可以看得出來 BehaviorSubject 在建立時就需要給定一個狀態,並在之後任何一次訂閱,就會先送出最新的狀態。其實這種行為就是一種狀態的表達而非單存的事件,就像是年齡跟生日一樣,年齡是一種狀態而生日就是事件;所以當我們想要用一個 stream 來表達年齡時,就應該用 BehaviorSubject。

  1. var subject = new Rx.BehaviorSubject(0); // 0 為起始值
  2. var observerA = {
  3. next: value => console.log('A next: ' + value),
  4. error: error => console.log('A error: ' + error),
  5. complete: () => console.log('A complete!')
  6. }
  7. var observerB = {
  8. next: value => console.log('B next: ' + value),
  9. error: error => console.log('B error: ' + error),
  10. complete: () => console.log('B complete!')
  11. }
  12. subject.subscribe(observerA);
  13. // "A next: 0"
  14. subject.next(1);
  15. // "A next: 1"
  16. subject.next(2);
  17. // "A next: 2"
  18. subject.next(3);
  19. // "A next: 3"
  20. setTimeout(() => {
  21. subject.subscribe(observerB);
  22. // "B next: 3"
  23. },3000)