Redux 数据更新
function observable() { const outerSubscribe = subscribe return { /** * The minimal observable subscription method. * @param observer Any object that can be used as an observer. * The observer object should have a `next` method. * @returns An object with an `unsubscribe` method that can * be used to unsubscribe the observable from the store, and prevent further * emission of values from the observable. */ subscribe(observer: unknown) { if (typeof observer !== 'object' || observer === null) { throw new TypeError('Expected the observer to be an object.') } function observeState() { const observerAsObserver = observer as Observer<S> if (observerAsObserver.next) { observerAsObserver.next(getState()) } } observeState() const unsubscribe = outerSubscribe(observeState) return { unsubscribe } }, [$$observable]() { return this } }}function subscribe(listener: () => void) { if (typeof listener !== 'function') { throw new Error('Expected the listener to be a function.') } if (isDispatching) { throw new Error( 'You may not call store.subscribe() while the reducer is executing. ' + 'If you would like to be notified after the store has been updated, subscribe from a ' + 'component and invoke store.getState() in the callback to access the latest state. ' + 'See https://redux.js.org/api/store#subscribelistener for more details.' ) } let isSubscribed = true ensureCanMutateNextListeners() nextListeners.push(listener) return function unsubscribe() { if (!isSubscribed) { return } if (isDispatching) { throw new Error( 'You may not unsubscribe from a store listener while the reducer is executing. ' + 'See https://redux.js.org/api/store#subscribelistener for more details.' ) } isSubscribed = false ensureCanMutateNextListeners() const index = nextListeners.indexOf(listener) nextListeners.splice(index, 1) currentListeners = null }}
设计模式
实现
class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const safeObserver = new SafeObserver(observer); safeObserver.unsub = this._subscribe(safeObserver); return safeObserver.unsubscribe.bind(safeObserver); }}Observable.prototype.map = function (project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)), error: (err) => observer.error(err), complete: () => observer.complete() }; return this.subscribe(mapObserver); });};
class SafeObserver { constructor(destination) { if (typeof destination !== 'object' || destination === null) { throw new TypeError('Expected the observer to be an object.') } this.destination = destination; } next(value) { // 尚未取消订阅,且包含next方法 if (!this.isUnsubscribed && this.destination.next) { try { this.destination.next(value); } catch (err) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw err; } } } error(err) { // 尚未取消订阅,且包含error方法 if (!this.isUnsubscribed && this.destination.error) { try { this.destination.error(err); } catch (e2) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw e2; } this.unsubscribe(); } } complete() { // 尚未取消订阅,且包含complete方法 if (!this.isUnsubscribed && this.destination.complete) { try { this.destination.complete(); } catch (err) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw err; } this.unsubscribe(); } } unsubscribe() { // 用于取消订阅 this.isUnsubscribed = true; if (this.unsub) { this.unsub(); } }}
Rx.js 响应式编程