Redux 数据更新

  1. function observable() {
  2. const outerSubscribe = subscribe
  3. return {
  4. /**
  5. * The minimal observable subscription method.
  6. * @param observer Any object that can be used as an observer.
  7. * The observer object should have a `next` method.
  8. * @returns An object with an `unsubscribe` method that can
  9. * be used to unsubscribe the observable from the store, and prevent further
  10. * emission of values from the observable.
  11. */
  12. subscribe(observer: unknown) {
  13. if (typeof observer !== 'object' || observer === null) {
  14. throw new TypeError('Expected the observer to be an object.')
  15. }
  16. function observeState() {
  17. const observerAsObserver = observer as Observer<S>
  18. if (observerAsObserver.next) {
  19. observerAsObserver.next(getState())
  20. }
  21. }
  22. observeState()
  23. const unsubscribe = outerSubscribe(observeState)
  24. return { unsubscribe }
  25. },
  26. [$$observable]() {
  27. return this
  28. }
  29. }
  30. }
  31. function subscribe(listener: () => void) {
  32. if (typeof listener !== 'function') {
  33. throw new Error('Expected the listener to be a function.')
  34. }
  35. if (isDispatching) {
  36. throw new Error(
  37. 'You may not call store.subscribe() while the reducer is executing. ' +
  38. 'If you would like to be notified after the store has been updated, subscribe from a ' +
  39. 'component and invoke store.getState() in the callback to access the latest state. ' +
  40. 'See https://redux.js.org/api/store#subscribelistener for more details.'
  41. )
  42. }
  43. let isSubscribed = true
  44. ensureCanMutateNextListeners()
  45. nextListeners.push(listener)
  46. return function unsubscribe() {
  47. if (!isSubscribed) {
  48. return
  49. }
  50. if (isDispatching) {
  51. throw new Error(
  52. 'You may not unsubscribe from a store listener while the reducer is executing. ' +
  53. 'See https://redux.js.org/api/store#subscribelistener for more details.'
  54. )
  55. }
  56. isSubscribed = false
  57. ensureCanMutateNextListeners()
  58. const index = nextListeners.indexOf(listener)
  59. nextListeners.splice(index, 1)
  60. currentListeners = null
  61. }
  62. }

设计模式

  • 观察者模式
  • 迭代器模式

实现

  1. class Observable {
  2. constructor(_subscribe) {
  3. this._subscribe = _subscribe;
  4. }
  5. subscribe(observer) {
  6. const safeObserver = new SafeObserver(observer);
  7. safeObserver.unsub = this._subscribe(safeObserver);
  8. return safeObserver.unsubscribe.bind(safeObserver);
  9. }
  10. }
  11. Observable.prototype.map = function (project) {
  12. return new Observable((observer) => {
  13. const mapObserver = {
  14. next: (x) => observer.next(project(x)),
  15. error: (err) => observer.error(err),
  16. complete: () => observer.complete()
  17. };
  18. return this.subscribe(mapObserver);
  19. });
  20. };
  1. class SafeObserver {
  2. constructor(destination) {
  3. if (typeof destination !== 'object' || destination === null) {
  4. throw new TypeError('Expected the observer to be an object.')
  5. }
  6. this.destination = destination;
  7. }
  8. next(value) {
  9. // 尚未取消订阅,且包含next方法
  10. if (!this.isUnsubscribed && this.destination.next) {
  11. try {
  12. this.destination.next(value);
  13. } catch (err) {
  14. // 出现异常时,取消订阅释放资源,再抛出异常
  15. this.unsubscribe();
  16. throw err;
  17. }
  18. }
  19. }
  20. error(err) {
  21. // 尚未取消订阅,且包含error方法
  22. if (!this.isUnsubscribed && this.destination.error) {
  23. try {
  24. this.destination.error(err);
  25. } catch (e2) {
  26. // 出现异常时,取消订阅释放资源,再抛出异常
  27. this.unsubscribe();
  28. throw e2;
  29. }
  30. this.unsubscribe();
  31. }
  32. }
  33. complete() {
  34. // 尚未取消订阅,且包含complete方法
  35. if (!this.isUnsubscribed && this.destination.complete) {
  36. try {
  37. this.destination.complete();
  38. } catch (err) {
  39. // 出现异常时,取消订阅释放资源,再抛出异常
  40. this.unsubscribe();
  41. throw err;
  42. }
  43. this.unsubscribe();
  44. }
  45. }
  46. unsubscribe() { // 用于取消订阅
  47. this.isUnsubscribed = true;
  48. if (this.unsub) {
  49. this.unsub();
  50. }
  51. }
  52. }

Rx.js 响应式编程