昨天我們介紹完了各種 Subject,不曉得各位讀者還記不記得在一開始講到 Subject 時,是希望能夠讓 Observable 有新訂閱時,可以共用前一個訂閱的執行而不要從頭開始
如下面的例子

  1. var source = Rx.Observable.interval(1000).take(3);
  2. var observerA = {
  3. next: value => console.log('A next: ' + value),
  4. error: error => console.log('A error: ' + error),
  5. complete: () => console.log('A complete!')
  6. }
  7. var observerB = {
  8. next: value => console.log('B next: ' + value),
  9. error: error => console.log('B error: ' + error),
  10. complete: () => console.log('B complete!')
  11. }
  12. var subject = new Rx.Subject()
  13. subject.subscribe(observerA)
  14. source.subscribe(subject);
  15. setTimeout(() => {
  16. subject.subscribe(observerB);
  17. }, 1000);
  18. // "A next: 0"
  19. // "A next: 1"
  20. // "B next: 1"
  21. // "A next: 2"
  22. // "B next: 2"
  23. // "A complete!"
  24. // "B complete!"

上面這段程式碼我們用 subject 訂閱了 source,再把 observerA 跟 observerB 一個個訂閱到 subject,這樣就可以讓 observerA 跟 observerB 共用同一個執行。但這樣的寫法會讓程式碼看起來太過複雜,我們可以用 Observable 的 multicast operator 來簡化這段程式

Operators

multicast

multicast 可以用來掛載 subject 並回傳一個可連結(connectable)的 observable,如下

  1. var source = Rx.Observable.interval(1000)
  2. .take(3)
  3. .multicast(new Rx.Subject());
  4. var observerA = {
  5. next: value => console.log('A next: ' + value),
  6. error: error => console.log('A error: ' + error),
  7. complete: () => console.log('A complete!')
  8. }
  9. var observerB = {
  10. next: value => console.log('B next: ' + value),
  11. error: error => console.log('B error: ' + error),
  12. complete: () => console.log('B complete!')
  13. }
  14. source.subscribe(observerA); // subject.subscribe(observerA)
  15. source.connect(); // source.subscribe(subject)
  16. setTimeout(() => {
  17. source.subscribe(observerB); // subject.subscribe(observerB)
  18. }, 1000);

上面這段程式碼我們透過 multicast 來掛載一個 subject 之後這個 observable(source) 的訂閱其實都是訂閱到 subject 上。

  1. source.subscribe(observerA); // subject.subscribe(observerA)

必須真的等到 執行 connect() 後才會真的用 subject 訂閱 source,並開始送出元素,如果沒有執行 connect() observable 是不會真正執行的。

  1. source.connect();

另外值得注意的是這裡要退訂的話,要把 connect() 回傳的 subscription 退訂才會真正停止 observable 的執行,如下

  1. var source = Rx.Observable.interval(1000)
  2. .do(x => console.log('send: ' + x))
  3. .multicast(new Rx.Subject()); // 無限的 observable
  4. var observerA = {
  5. next: value => console.log('A next: ' + value),
  6. error: error => console.log('A error: ' + error),
  7. complete: () => console.log('A complete!')
  8. }
  9. var observerB = {
  10. next: value => console.log('B next: ' + value),
  11. error: error => console.log('B error: ' + error),
  12. complete: () => console.log('B complete!')
  13. }
  14. var subscriptionA = source.subscribe(observerA);
  15. var realSubscription = source.connect();
  16. var subscriptionB;
  17. setTimeout(() => {
  18. subscriptionB = source.subscribe(observerB);
  19. }, 1000);
  20. setTimeout(() => {
  21. subscriptionA.unsubscribe();
  22. subscriptionB.unsubscribe();
  23. // 這裡雖然 A 跟 B 都退訂了,但 source 還會繼續送元素
  24. }, 5000);
  25. setTimeout(() => {
  26. realSubscription.unsubscribe();
  27. // 這裡 source 才會真正停止送元素
  28. }, 7000);

上面這段的程式碼,必須等到 realSubscription.unsubscribe() 執行完,source 才會真的結束。
雖然用了 multicast 感覺會讓我們處理的對象少一點,但必須搭配 connect 一起使用還是讓程式碼有點複雜,通常我們會希望有 observer 訂閱時,就立即執行並發送元素,而不要再多執行一個方法(connect),這時我們就可以用 refCount。

refCount

refCount 必須搭配 multicast 一起使用,他可以建立一個只要有訂閱就會自動 connect 的 observable,範例如下

  1. var source = Rx.Observable.interval(1000)
  2. .do(x => console.log('send: ' + x))
  3. .multicast(new Rx.Subject())
  4. .refCount();
  5. var observerA = {
  6. next: value => console.log('A next: ' + value),
  7. error: error => console.log('A error: ' + error),
  8. complete: () => console.log('A complete!')
  9. }
  10. var observerB = {
  11. next: value => console.log('B next: ' + value),
  12. error: error => console.log('B error: ' + error),
  13. complete: () => console.log('B complete!')
  14. }
  15. var subscriptionA = source.subscribe(observerA);
  16. // 訂閱數 0 => 1
  17. var subscriptionB;
  18. setTimeout(() => {
  19. subscriptionB = source.subscribe(observerB);
  20. // 訂閱數 0 => 2
  21. }, 1000);

上面這段程式碼,當 source 一被 observerA 訂閱時(訂閱數從 0 變成 1),就會立即執行並發送元素,我們就不需要再額外執行 connect。
同樣的在退訂時只要訂閱數變成 0 就會自動停止發送

  1. var source = Rx.Observable.interval(1000)
  2. .do(x => console.log('send: ' + x))
  3. .multicast(new Rx.Subject())
  4. .refCount();
  5. var observerA = {
  6. next: value => console.log('A next: ' + value),
  7. error: error => console.log('A error: ' + error),
  8. complete: () => console.log('A complete!')
  9. }
  10. var observerB = {
  11. next: value => console.log('B next: ' + value),
  12. error: error => console.log('B error: ' + error),
  13. complete: () => console.log('B complete!')
  14. }
  15. var subscriptionA = source.subscribe(observerA);
  16. // 訂閱數 0 => 1
  17. var subscriptionB;
  18. setTimeout(() => {
  19. subscriptionB = source.subscribe(observerB);
  20. // 訂閱數 0 => 2
  21. }, 1000);
  22. setTimeout(() => {
  23. subscriptionA.unsubscribe(); // 訂閱數 2 => 1
  24. subscriptionB.unsubscribe(); // 訂閱數 1 => 0,source 停止發送元素
  25. }, 5000);

publish

其實 multicast(new Rx.Subject()) 很常用到,我們有一個簡化的寫法那就是 publish,下面這兩段程式碼是完全等價的

  1. var source = Rx.Observable.interval(1000)
  2. .publish()
  3. .refCount();
  4. // var source = Rx.Observable.interval(1000)
  5. // .multicast(new Rx.Subject())
  6. // .refCount();

加上 Subject 的三種變形

  1. var source = Rx.Observable.interval(1000)
  2. .publishReplay(1)
  3. .refCount();
  4. // var source = Rx.Observable.interval(1000)
  5. // .multicast(new Rx.ReplaySubject(1))
  6. // .refCount();
  1. var source = Rx.Observable.interval(1000)
  2. .publishBehavior(0)
  3. .refCount();
  4. // var source = Rx.Observable.interval(1000)
  5. // .multicast(new Rx.BehaviorSubject(0))
  6. // .refCount();
  1. var source = Rx.Observable.interval(1000)
  2. .publishLast()
  3. .refCount();
  4. // var source = Rx.Observable.interval(1000)
  5. // .multicast(new Rx.AsyncSubject(1))
  6. // .refCount();

share

另外 publish + refCount 可以在簡寫成 share

  1. var source = Rx.Observable.interval(1000)
  2. .share();
  3. // var source = Rx.Observable.interval(1000)
  4. // .publish()
  5. // .refCount();
  6. // var source = Rx.Observable.interval(1000)
  7. // .multicast(new Rx.Subject())
  8. // .refCount();

今日小結

今天主要講解了 multicast 和 refCount 兩個 operators 可以幫助我們既可能的簡化程式碼,並同時達到組播的效果。最後介紹 publish 跟 share 幾個簡化寫法,這幾個簡化的寫法是比較常見的,在理解 multicast 跟 refCount 運作方式後就能直接套用到 publish 跟 share 上。