前一篇文章我們已經完成了基本的 observable 以及 Observer 的簡易實作,這篇文章我們會接續上一篇的內容實作簡易的 Observable 類別,以及一個 creation operator 和一個 transform operator。

建立簡易 Observable 類別

這是我們上一篇文章寫的建立 observable 物件的函式

  1. function create(subscribe) {
  2. const observable = {
  3. subscribe: function() {
  4. const realObserver = new Observer(...arguments);
  5. subscribe(realObserver);
  6. return realObserver;
  7. }
  8. };
  9. return observable;
  10. }

從這個函式可以看出來,回傳的 observable 物件至少會有 subscribe 方法,所以最簡單的 Observable 類別大概會長像下面這樣

  1. class Observable {
  2. subscribe() {
  3. // ...做某些事
  4. }
  5. }

另外 create 的函式在執行時會傳入一個 subscribe 的 function,這個 function 會決定 observable 的行為

  1. var observable = create(function(observer) {
  2. observer.next(1);
  3. observer.next(2);
  4. observer.next(3);
  5. observer.complete();
  6. observer.next('not work');
  7. })

把上面這一段改成下面這樣

  1. var observable = new Observable(function(observer) {
  2. observer.next(1);
  3. observer.next(2);
  4. observer.next(3);
  5. observer.complete();
  6. observer.next('not work');
  7. })

所以我們的 Observable 的建構式應該會接收一個 subscribe function

  1. class Observable {
  2. constructor(subscribe) {
  3. if(subscribe) {
  4. this._subscribe = subscribe; // 把 subscribe 存到屬性中
  5. }
  6. }
  7. subscribe() {
  8. // ...做某些事
  9. }
  10. }

接著我們就能完成 subscribe 要做的事情了

  1. class Observable {
  2. constructor(subscribe) {
  3. if(subscribe) {
  4. this._subscribe = subscribe; // 把 subscribe 存到 _subscribe 屬性中
  5. }
  6. }
  7. subscribe() {
  8. const observer = new Observer(...arguments);
  9. this._subscribe(observer); // 就是執行一個 function 對吧
  10. return observer;
  11. }
  12. }

到這裡我們就成功的把 create 的函式改成 Observable 的類別了,我們可以直接來使用看看

  1. class Observable {
  2. constructor(subscribe) {
  3. if(subscribe) {
  4. this._subscribe = subscribe; // 把 subscribe 存到屬性中
  5. }
  6. }
  7. subscribe() {
  8. const observer = new Observer(...arguments);
  9. this._subscribe(observer);
  10. return observer;
  11. }
  12. }
  13. var observable = new Observable(function(observer) {
  14. observer.next(1);
  15. observer.next(2);
  16. observer.next(3);
  17. observer.complete();
  18. observer.next('not work');
  19. })
  20. var observer = {
  21. next: function(value) {
  22. console.log(value)
  23. },
  24. complete: function() {
  25. console.log('complete!')
  26. }
  27. }
  28. observable.subscribe(observer);

當然我們可以仿 RxJS 在靜態方法中加入 create,如下

  1. class Observable {
  2. constructor(subscribe) {
  3. if(subscribe) {
  4. this._subscribe = subscribe; // 把 subscribe 存到屬性中
  5. }
  6. }
  7. subscribe() {
  8. const observer = new Observer(...arguments);
  9. this._subscribe(observer);
  10. return observer;
  11. }
  12. }
  13. Observable.create = function(subscribe) {
  14. return new Observable(subscribe);
  15. }

這樣一來我們就可以用 Observable.create 建立 observable 物件實例。

  1. var observable = Observable.create(function(observer) {
  2. observer.next(1);
  3. observer.next(2);
  4. observer.next(3);
  5. observer.complete();
  6. observer.next('not work');
  7. });

建立 creation operator - fromArray

當我們有 Observable 類別後要建立 creation operator 就不難了,這裡我們建立一個 fromArray 的方法,可以接收 array 來建立 observable,算是 Rx 的 Observable.from 的簡化版本,記得 creation operators 都屬於 static 方法

  1. class Observable {
  2. constructor(subscribe) {
  3. if(subscribe) {
  4. this._subscribe = subscribe; // 把 subscribe 存到屬性中
  5. }
  6. }
  7. subscribe() {
  8. const observer = new Observer(...arguments);
  9. this._subscribe(observer);
  10. return observer;
  11. }
  12. }
  13. // 建立靜態方法
  14. Observable.fromArray = function(array) {
  15. if(!Array.isArray(array)) {
  16. // 如果傳入的參數不是陣列,則拋出例外
  17. throw new Error('params need to be an array');
  18. }
  19. return new Observable(function(observer) {
  20. try{
  21. // 遍歷每個元素並送出
  22. array.forEach(value => observer.next(value))
  23. observer.complete()
  24. } catch(err) {
  25. observer.error(err)
  26. }
  27. });
  28. }
  29. var observable = Observable.fromArray([1,2,3,4,5]);

上面的程式碼我們只是簡單的用 new Observable 就可以輕鬆地實現我們要的功能,之後就可以用 fromArray 來建立 observable 物件。
相信讀者到這之前應該都不會有太大的問題,接下來這個部份就困難的多,請讀者們一定要搞懂前面的各個實作再接著往下看。

建立 transform operator - map

相信很多人在實作 Observable 都是卡在這個階段,因為 operators 都是回傳一個新的 observable 這中間有很多細節需要注意,並且有些小技巧才能比較好的實現,在開始實作之前先讓我們釐清幾個重點

  • operators(transform, filter, conditional…) 都是回傳一個新個 observable
  • 大部分的 operator 其實就是在原本 observer 外包裹一層物件,讓執行 next 方法前先把元素做一次處理
  • operator 回傳的 observable 訂閱時,還是需要執行原本的 observable(資料源),也就說我們要想辦法保留原本的 observable

讓我們一步一步來,首先 operators 執行完會回傳一個新的 observable,這個 observable 在訂閱時會先去執行 operator 的行為再發送元素,所以 observable 的訂閱方法就不能像現在這樣直接把 observer 傳給 subscribe 執行

  1. class Observable {
  2. constructor(subscribe) {
  3. if(subscribe) {
  4. this._subscribe = subscribe; // 把 subscribe 存到屬性中
  5. }
  6. }
  7. subscribe() {
  8. const observer = new Observer(...arguments);
  9. // 先做某個判斷是否當前的 observable 是具有 operator 的
  10. if(??) {
  11. // 用 operator 的操作
  12. } else {
  13. // 如果沒有 operator 再直接把 observer 丟給 _subscribe
  14. this._subscribe(observer);
  15. }
  16. return observer;
  17. }
  18. }

以我們的 Observable 實作為例,這裡最重要的就是 this._subscribe 執行,每當執行時就是開始發送元素。

這裡我們可以想像一下當一個 map 產生的 observable 訂閱時,應該先判斷出有 map 這個 operator 並且傳入原本的資料源以及當前的 observer。也就是說我們的 map 至少有以下這幾件事要做

  • 建立新的 observable
  • 保存原本的 observable(資料源),之後訂閱時才有辦法執行
  • 建立並保存 operator 本身的行為,等到訂閱時執行

    1. class Observable {
    2. constructor(subscribe) {
    3. // 一些程式碼...
    4. }
    5. subscribe() {
    6. // 一些程式碼...
    7. }
    8. map(callback) {
    9. const observable = new Observable(); // 建立新的 observable
    10. observable.source = this; // 保存當前的 observable(資料源)
    11. observable.operator = {
    12. call: (observer, source) => { // 執行這個 operator 的行為 }
    13. }; // 儲存當前 operator 行為,並作為是否有 operator 的依據,
    14. return observable; // 返回這個新的 observable
    15. }
    16. }

    上面這三個步驟都是必要的,特別是用到了 observable.source = this 這個小技巧,來保存原本的 observable。但這裡我們還有一個地方沒完成就是 operator 要做的事,這個部分我們等一下再補,先把 subscribe 寫完

    1. class Observable {
    2. constructor(subscribe) {
    3. // 一些程式碼...
    4. }
    5. subscribe() {
    6. const observer = new Observer(...arguments);
    7. // 先用 this.operator 判斷當前的 observable 是否具有 operator
    8. if(this.operator) {
    9. this.operator.call(observer, this.source)
    10. } else {
    11. // 如果沒有 operator 再直接把 observer 丟給 _subscribe
    12. this._subscribe(observer);
    13. }
    14. return observer;
    15. }
    16. map(callback) {
    17. const observable = new Observable(); // 建立新的 observable
    18. observable.source = this; // 保存當前的 observable(資料源)
    19. observable.operator = {
    20. call: (observer, source) => { // 執行這個 operator 的行為 }
    21. }; // 儲存當前 operator 行為,並作為是否有 operator 的依據,
    22. return observable; // 返回這個新的 observable
    23. }
    24. }

記得這裡補的 subscribe 行為,已經是 map 回傳新 observable 的行為,不是原本的 observable 了。
到這裡我們就幾乎要完成了,接著只要實作 map 這個 operator 的行為就可以囉!記得我們在前面講的 operator 其實就是在原本的 observer 做一層包裹,讓 next 執行前先對元素做處理,所以我們改寫一下 Observer 並建立一個 MapObserver 來做這件事

  1. class Observer {
  2. constructor(destinationOrNext, error, complete) {
  3. switch (arguments.length) {
  4. case 0:
  5. this.destination = this.safeObserver(emptyObserver);
  6. break;
  7. case 1:
  8. if (!destinationOrNext) {
  9. this.destination = this.safeObserver(emptyObserver);
  10. break;
  11. }
  12. // 多一個判斷,是否傳入的 destinationOrNext 原本就是 Observer 的實例,如果是就不用在用執行 `this.safeObserver`
  13. if(destinationOrNext instanceof Observer){
  14. this.destination = destinationOrNext;
  15. break;
  16. }
  17. if (typeof destinationOrNext === 'object') {
  18. this.destination = this.safeObserver(destinationOrNext);
  19. break;
  20. }
  21. default:
  22. this.destination = this.safeObserver(destinationOrNext, error, complete);
  23. break;
  24. }
  25. }
  26. // ...下面都一樣
  27. }
  28. class MapObserver extends Observer {
  29. constructor(observer, callback) {
  30. // 這裡會傳入原本的 observer 跟 map 的 callback
  31. super(observer); // 因為有繼承所以要先執行一次父層的建構式
  32. this.callback = callback; // 保存 callback
  33. this.next = this.next.bind(this); // 確保 next 的 this
  34. }
  35. next(value) {
  36. try {
  37. this.destination.next(this.callback(value));
  38. // this.destination 是父層 Observer 保存的 observer 物件
  39. // 這裡 this.callback(value) 就是 map 的操作
  40. } catch (err) {
  41. this.destination.error(err);
  42. return;
  43. }
  44. }
  45. }

上面這段程式碼就可以讓我們包裹 observer 物件,利用物件的繼承覆寫原本的 next 方法。
最後我們就只要補完 map 方法就可以了

  1. class Observable {
  2. constructor(subscribe) {
  3. // 一些程式碼...
  4. }
  5. subscribe() {
  6. // 一些程式碼...
  7. }
  8. map(callback) {
  9. const observable = new Observable();
  10. observable.source = this;
  11. observable.operator = {
  12. call: (observer, source) => {
  13. // 執行這個 operator 的行為
  14. const newObserver = new MapObserver(observer, callback);
  15. // 建立包裹後的 observer
  16. // 訂閱原本的資料源,並回傳
  17. return source.subscribe(newObserver);
  18. }
  19. };
  20. return observable;
  21. }
  22. }

這裡做的事情就簡單很多,我們只要建立包裹過的 observer,並用這個包裹後的 observer 訂閱原本的 source。(記得這個 function 是在 subscribe 時執行的)
這裡有完整的程式碼,可以讓大家參考。
另外這裡有抽出 lift 方法的實作,其實跟我們現在的版本很接近了,只是把建立新的 observable 封裝到 lift 而已。

今日小結

今天這篇文章介紹了要如何簡易的實作 Observable,雖然說是簡易版本但實際上已經非常非常接近 RxJS 官方的實作了,希望讀者花點耐心一步一步跟著程式碼做,做出來後再慢慢吸收。