因為實在太多讀者在問要如何實作 Observable,所以特別調整了本系列文章最後幾篇的內容,空出一天的位置來寫如何簡易實作 Observable。
為什麼是簡易實作而不完整實作呢? 當然這個系列的文章是希望讀者能學會如何使用 RxJS,而 實作 Observable 其實只是幫助我們理解 Observable 的運作方式,所以這篇文章會盡可能地簡單,一來讓讀者容易理解及吸收,二來有興趣的讀者可以再沿著這篇文章的內容去完整的實作。

重點觀念

Observable 跟 Observer Pattern 是不同的,Observable 內部並沒有管理一份訂閱清單,訂閱 Observable 就像是執行一個 function 一樣
所以實作過程的重點

  • 訂閱就是執行一個 funciton
  • 訂閱接收的物件具備 next, error, complete 三個方法
  • 訂閱會返回一個可退訂(unsubscribe)的物件

基本 observable 實作

先用最簡單的 function 來建立 observable 物件

  1. function create(subscriber) {
  2. var observable = {
  3. subscribe: function(observer) {
  4. subscriber(observer)
  5. }
  6. };
  7. return observable;
  8. }

上面這段程式碼就可以做最簡單的訂閱,像下面這樣

  1. function create(subscriber) {
  2. var observable = {
  3. subscribe: function(observer) {
  4. subscriber(observer)
  5. }
  6. };
  7. return observable;
  8. }
  9. var observable = create(function(observer) {
  10. observer.next(1);
  11. observer.next(2);
  12. observer.next(3);
  13. })
  14. var observer = {
  15. next: function(value) {
  16. console.log(value)
  17. }
  18. }
  19. observable.subscribe(observer)
  20. // 1
  21. // 2
  22. // 3

這時我們已經有最簡單的功能了,但這裡有一個大問題,就是 observable 在結束(complete)就不應該再發送元素

  1. var observable = create(function(observer) {
  2. observer.next(1);
  3. observer.next(2);
  4. observer.next(3);
  5. observer.complete();
  6. observer.next('still work');
  7. })
  8. var observer = {
  9. next: function(value) {
  10. console.log(value)
  11. },
  12. complete: function() {
  13. console.log('complete!')
  14. }
  15. }
  16. observable.subscribe(observer)
  17. // 1
  18. // 2
  19. // 3
  20. // "complete!"
  21. // "still work"

從上面的程式碼可以看到 complete 之後還是能送元素出來,另外還有一個問題就是 observer,如果是不完整的就會出錯,這也不是我們希望看到的。

  1. var observable = create(function(observer) {
  2. observer.next(1);
  3. observer.next(2);
  4. observer.next(3);
  5. observer.complete(); // error: complete is not a function
  6. })
  7. var observer = {
  8. next: function(value) {
  9. console.log(value)
  10. }
  11. }
  12. observable.subscribe(observer)
  13. // 1
  14. // 2
  15. // 3

上面這段程式碼可以看出來,當使用者 observer 物件沒有 complete 方法時,就會報錯。 我們應該修正這兩個問題!

實作簡易 Observer

要修正這兩個問題其實並不難,我們只要實作一個 Observer 的類別,每次使用者傳入的 observer 都會利用這個類別轉乘我們想要 Observer 物件。
首先訂閱時有可能傳入一個 observer 物件,或是一到三個 function(next, error, complete),所以我們要建立一個類別可以接受各種可能的參數

  1. class Observer {
  2. constructor(destinationOrNext, error, complete) {
  3. switch (arguments.length) {
  4. case 0:
  5. // 空的 observer
  6. case 1:
  7. if (!destinationOrNext) {
  8. // 空的 observer
  9. }
  10. if (typeof destinationOrNext === 'object') {
  11. // 傳入了 observer 物件
  12. }
  13. default:
  14. // 如果上面都不是,代表應該是傳入了一到三個 function
  15. break;
  16. }
  17. }
  18. }

寫一個方法(safeObserver)來回傳正常的 observer

  1. class Observer {
  2. constructor(destinationOrNext, error, complete) {
  3. // ... 一些程式碼
  4. }
  5. safeObserver(observerOrNext, error, complete) {
  6. let next;
  7. if (typeof (observerOrNext) === 'function') {
  8. // observerOrNext 是 next function
  9. next = observerOrNext;
  10. } else if (observerOrNext) {
  11. // observerOrNext 是 observer 物件
  12. next = observerOrNext.next || () => {};
  13. error = observerOrNext.error || function(err) {
  14. throw err
  15. };
  16. complete = observerOrNext.complete || () => {};
  17. }
  18. // 最後回傳我們預期的 observer 物件
  19. return {
  20. next: next,
  21. error: error,
  22. complete: complete
  23. };
  24. }
  25. }

再把 constructor 完成

  1. // 預設空的 observer
  2. const emptyObserver = {
  3. next: () => {},
  4. error: (err) => { throw err; },
  5. complete: () => {}
  6. }
  7. class Observer {
  8. constructor(destinationOrNext, error, complete) {
  9. switch (arguments.length) {
  10. case 0:
  11. // 空的 observer
  12. this.destination = this.safeObserver(emptyObserver);
  13. break;
  14. case 1:
  15. if (!destinationOrNext) {
  16. // 空的 observer
  17. this.destination = this.safeObserver(emptyObserver);
  18. break;
  19. }
  20. if (typeof destinationOrNext === 'object') {
  21. // 傳入了 observer 物件
  22. this.destination = this.safeObserver(destinationOrNext);
  23. break;
  24. }
  25. default:
  26. // 如果上面都不是,代表應該是傳入了一到三個 function
  27. this.destination = this.safeObserver(destinationOrNext, error, complete);
  28. break;
  29. }
  30. }
  31. safeObserver(observerOrNext, error, complete) {
  32. // ... 一些程式碼
  33. }
  34. }

這裡我們把真正的 observer 塞到 this.destination,接著完成 observer 的方法。
Observer 的三個主要的方法(next, error, complete)都應該結束或退訂後不能再被執行,所以我們在物件內部偷塞一個 boolean 值來作為是否曾經結束的依據。

  1. class Observer {
  2. constructor(destinationOrNext, error, complete) {
  3. // ... 一些程式碼
  4. }
  5. safeObserver(observerOrNext, error, complete) {
  6. // ... 一些程式碼
  7. }
  8. unsubscribe() {
  9. this.isStopped = true; // 偷塞一個屬性 isStopped
  10. }
  11. }

接著要實作三個主要的方法就很簡單了,只要先判斷 isStopped 再使用 this.destination 物件來傳送值就可以了

  1. class Observer {
  2. constructor(destinationOrNext, error, complete) {
  3. // ... 一些程式碼
  4. }
  5. safeObserver(observerOrNext, error, complete) {
  6. // ... 一些程式碼
  7. }
  8. next(value) {
  9. if (!this.isStopped && this.next) {
  10. // 先判斷是否停止過
  11. try {
  12. this.destination.next(value); // 傳送值
  13. } catch (err) {
  14. this.unsubscribe();
  15. throw err;
  16. }
  17. }
  18. }
  19. error(err) {
  20. if (!this.isStopped && this.error) {
  21. // 先判斷是否停止過
  22. try {
  23. this.destination.error(err); // 傳送錯誤
  24. } catch (anotherError) {
  25. this.unsubscribe();
  26. throw anotherError;
  27. }
  28. this.unsubscribe();
  29. }
  30. }
  31. complete() {
  32. if (!this.isStopped && this.complete) {
  33. // 先判斷是否停止過
  34. try {
  35. this.destination.complete(); // 發送停止訊息
  36. } catch (err) {
  37. this.unsubscribe();
  38. throw err;
  39. }
  40. this.unsubscribe(); // 發送停止訊息後退訂
  41. }
  42. }
  43. unsubscribe() {
  44. this.isStopped = true;
  45. }
  46. }

到這裡我們就完成基本的 Observer 實作了,接著讓我們拿到基本版的 observable 中使用吧。

  1. function create(subscriber) {
  2. const observable = {
  3. subscribe: function(observerOrNext, error, complete) {
  4. const realObserver = new Observer(observerOrNext, error, complete)
  5. subscriber(realObserver);
  6. return realObserver;
  7. }
  8. };
  9. return observable;
  10. }
  11. var observable = create(function(observer) {
  12. observer.next(1);
  13. observer.next(2);
  14. observer.next(3);
  15. observer.complete();
  16. observer.next('not work');
  17. })
  18. var observer = {
  19. next: function(value) {
  20. console.log(value)
  21. },
  22. complete: function() {
  23. console.log('complete!')
  24. }
  25. }
  26. observable.subscribe(observer);
  27. // 1
  28. // 2
  29. // 3
  30. // complete!

到這裡我們就完成最基本的 observable 了,至少基本的行為都跟我們期望的一致,我知道讀者們仍然不會放過我,你們會希望做出一個 Observable 型別以及至少一個 operator 對吧? 不用擔心,我們下一篇就會講解如何建立一個 Observable 型別和 operator 的方法!

今日小結

今天我們複習了 Observable 的重要概念,並用這些重要的概念實作出了基本的 observable 以及 Observer 的類別。