因為實在太多讀者在問要如何實作 Observable,所以特別調整了本系列文章最後幾篇的內容,空出一天的位置來寫如何簡易實作 Observable。
為什麼是簡易實作而不完整實作呢? 當然這個系列的文章是希望讀者能學會如何使用 RxJS,而 實作 Observable 其實只是幫助我們理解 Observable 的運作方式,所以這篇文章會盡可能地簡單,一來讓讀者容易理解及吸收,二來有興趣的讀者可以再沿著這篇文章的內容去完整的實作。
重點觀念
Observable 跟 Observer Pattern 是不同的,Observable 內部並沒有管理一份訂閱清單,訂閱 Observable 就像是執行一個 function 一樣!
所以實作過程的重點
- 訂閱就是執行一個 funciton
- 訂閱接收的物件具備 next, error, complete 三個方法
- 訂閱會返回一個可退訂(unsubscribe)的物件
基本 observable 實作
先用最簡單的 function 來建立 observable 物件
function create(subscriber) {var observable = {subscribe: function(observer) {subscriber(observer)}};return observable;}
上面這段程式碼就可以做最簡單的訂閱,像下面這樣
function create(subscriber) {var observable = {subscribe: function(observer) {subscriber(observer)}};return observable;}var observable = create(function(observer) {observer.next(1);observer.next(2);observer.next(3);})var observer = {next: function(value) {console.log(value)}}observable.subscribe(observer)// 1// 2// 3
這時我們已經有最簡單的功能了,但這裡有一個大問題,就是 observable 在結束(complete)就不應該再發送元素
var observable = create(function(observer) {observer.next(1);observer.next(2);observer.next(3);observer.complete();observer.next('still work');})var observer = {next: function(value) {console.log(value)},complete: function() {console.log('complete!')}}observable.subscribe(observer)// 1// 2// 3// "complete!"// "still work"
從上面的程式碼可以看到 complete 之後還是能送元素出來,另外還有一個問題就是 observer,如果是不完整的就會出錯,這也不是我們希望看到的。
var observable = create(function(observer) {observer.next(1);observer.next(2);observer.next(3);observer.complete(); // error: complete is not a function})var observer = {next: function(value) {console.log(value)}}observable.subscribe(observer)// 1// 2// 3
上面這段程式碼可以看出來,當使用者 observer 物件沒有 complete 方法時,就會報錯。 我們應該修正這兩個問題!
實作簡易 Observer
要修正這兩個問題其實並不難,我們只要實作一個 Observer 的類別,每次使用者傳入的 observer 都會利用這個類別轉乘我們想要 Observer 物件。
首先訂閱時有可能傳入一個 observer 物件,或是一到三個 function(next, error, complete),所以我們要建立一個類別可以接受各種可能的參數
class Observer {constructor(destinationOrNext, error, complete) {switch (arguments.length) {case 0:// 空的 observercase 1:if (!destinationOrNext) {// 空的 observer}if (typeof destinationOrNext === 'object') {// 傳入了 observer 物件}default:// 如果上面都不是,代表應該是傳入了一到三個 functionbreak;}}}
寫一個方法(safeObserver)來回傳正常的 observer
class Observer {constructor(destinationOrNext, error, complete) {// ... 一些程式碼}safeObserver(observerOrNext, error, complete) {let next;if (typeof (observerOrNext) === 'function') {// observerOrNext 是 next functionnext = observerOrNext;} else if (observerOrNext) {// observerOrNext 是 observer 物件next = observerOrNext.next || () => {};error = observerOrNext.error || function(err) {throw err};complete = observerOrNext.complete || () => {};}// 最後回傳我們預期的 observer 物件return {next: next,error: error,complete: complete};}}
再把 constructor 完成
// 預設空的 observerconst emptyObserver = {next: () => {},error: (err) => { throw err; },complete: () => {}}class Observer {constructor(destinationOrNext, error, complete) {switch (arguments.length) {case 0:// 空的 observerthis.destination = this.safeObserver(emptyObserver);break;case 1:if (!destinationOrNext) {// 空的 observerthis.destination = this.safeObserver(emptyObserver);break;}if (typeof destinationOrNext === 'object') {// 傳入了 observer 物件this.destination = this.safeObserver(destinationOrNext);break;}default:// 如果上面都不是,代表應該是傳入了一到三個 functionthis.destination = this.safeObserver(destinationOrNext, error, complete);break;}}safeObserver(observerOrNext, error, complete) {// ... 一些程式碼}}
這裡我們把真正的 observer 塞到 this.destination,接著完成 observer 的方法。
Observer 的三個主要的方法(next, error, complete)都應該結束或退訂後不能再被執行,所以我們在物件內部偷塞一個 boolean 值來作為是否曾經結束的依據。
class Observer {constructor(destinationOrNext, error, complete) {// ... 一些程式碼}safeObserver(observerOrNext, error, complete) {// ... 一些程式碼}unsubscribe() {this.isStopped = true; // 偷塞一個屬性 isStopped}}
接著要實作三個主要的方法就很簡單了,只要先判斷 isStopped 在再使用 this.destination 物件來傳送值就可以了
class Observer {constructor(destinationOrNext, error, complete) {// ... 一些程式碼}safeObserver(observerOrNext, error, complete) {// ... 一些程式碼}next(value) {if (!this.isStopped && this.next) {// 先判斷是否停止過try {this.destination.next(value); // 傳送值} catch (err) {this.unsubscribe();throw err;}}}error(err) {if (!this.isStopped && this.error) {// 先判斷是否停止過try {this.destination.error(err); // 傳送錯誤} catch (anotherError) {this.unsubscribe();throw anotherError;}this.unsubscribe();}}complete() {if (!this.isStopped && this.complete) {// 先判斷是否停止過try {this.destination.complete(); // 發送停止訊息} catch (err) {this.unsubscribe();throw err;}this.unsubscribe(); // 發送停止訊息後退訂}}unsubscribe() {this.isStopped = true;}}
到這裡我們就完成基本的 Observer 實作了,接著讓我們拿到基本版的 observable 中使用吧。
function create(subscriber) {const observable = {subscribe: function(observerOrNext, error, complete) {const realObserver = new Observer(observerOrNext, error, complete)subscriber(realObserver);return realObserver;}};return observable;}var observable = create(function(observer) {observer.next(1);observer.next(2);observer.next(3);observer.complete();observer.next('not work');})var observer = {next: function(value) {console.log(value)},complete: function() {console.log('complete!')}}observable.subscribe(observer);// 1// 2// 3// complete!
到這裡我們就完成最基本的 observable 了,至少基本的行為都跟我們期望的一致,我知道讀者們仍然不會放過我,你們會希望做出一個 Observable 型別以及至少一個 operator 對吧? 不用擔心,我們下一篇就會講解如何建立一個 Observable 型別和 operator 的方法!
今日小結
今天我們複習了 Observable 的重要概念,並用這些重要的概念實作出了基本的 observable 以及 Observer 的類別。
