前言

@ngrx/effect 作为 Angular 副作用处理库,在实现上必然离不开 @ngrx/store,所以不清楚@ngrx/store 原理的请先读这篇文章了解下。

正文1 - 依赖注入

  1. static forRoot(
  2. rootEffects: Type<any>[] = []
  3. ): ModuleWithProviders<EffectsRootModule> {
  4. return {
  5. ngModule: EffectsRootModule,
  6. providers: [
  7. {
  8. provide: _ROOT_EFFECTS_GUARD,
  9. useFactory: _provideForRootGuard,
  10. deps: [[EffectsRunner, new Optional(), new SkipSelf()]],
  11. },
  12. {
  13. provide: EFFECTS_ERROR_HANDLER,
  14. useValue: defaultEffectsErrorHandler,
  15. },
  16. // A
  17. EffectsRunner,
  18. // B
  19. EffectSources,
  20. // C
  21. Actions,
  22. // D1
  23. rootEffects,
  24. // D2
  25. {
  26. provide: _ROOT_EFFECTS,
  27. useValue: [rootEffects],
  28. },
  29. {
  30. provide: USER_PROVIDED_EFFECTS,
  31. multi: true,
  32. useValue: [],
  33. },
  34. // E
  35. {
  36. provide: ROOT_EFFECTS,
  37. useFactory: createEffects,
  38. deps: [Injector, _ROOT_EFFECTS, USER_PROVIDED_EFFECTS],
  39. },
  40. ],
  41. };
  42. }
  • A,EffectsRunner
    依赖 B(EffectSources),其 start 方法是将 effect 连接 store 的关键
  1. constructor(
  2. private effectSources: EffectSources,
  3. private store: Store<any>
  4. ) {}
  5. start() {
  6. if (!this.effectsSubscription) {
  7. this.effectsSubscription = this.effectSources
  8. .toActions()
  9. .subscribe(this.store);
  10. }
  11. }
  • B,EffectSources
    它有个 toActions 方法,用于将 effect流 转化为 action流供 store 订阅
  1. constructor(
  2. private errorHandler: ErrorHandler,
  3. @Inject(EFFECTS_ERROR_HANDLER)
  4. private effectsErrorHandler: EffectsErrorHandler
  5. ) {
  6. super();
  7. }
  • C,Actions
    注入了 ScannedActionsSubject 作为 source,从上篇文章可知每当store diapatch 一个 action 时,ScannedActionsSubject 都会收到这个 action,所以这个 Actions 实际上就是 store 里的 action 流
  1. constructor(@Inject(ScannedActionsSubject) source?: Observable<V>) {
  2. super();
  3. if (source) {
  4. this.source = source;
  5. }
  6. }
  • D1,D2都是获取输入的 effects
  • E,createEffects(injector,effects,user_effects)
  1. export function createEffects(
  2. injector: Injector,
  3. effectGroups: Type<any>[][],
  4. userProvidedEffectGroups: Type<any>[][]
  5. ): any[] {
  6. const mergedEffects: Type<any>[] = [];
  7. for (let effectGroup of effectGroups) {
  8. mergedEffects.push(...effectGroup);
  9. }
  10. for (let userProvidedEffectGroup of userProvidedEffectGroups) {
  11. mergedEffects.push(...userProvidedEffectGroup);
  12. }
  13. return createEffectInstances(injector, mergedEffects);
  14. }

做了合并操作后,就开始创建 effect 实例了

  1. export function createEffectInstances(
  2. injector: Injector,
  3. effects: Type<any>[]
  4. ): any[] {
  5. return effects.map((effect) => injector.get(effect));
  6. }

也就是说,E = [根的 effect 实例]
到这里依赖注入部分就结束了。

正文2 - EffectsRootModule

  1. constructor(
  2. private sources: EffectSources,
  3. runner: EffectsRunner,
  4. store: Store<any>,
  5. @Inject(ROOT_EFFECTS) rootEffects: any[],
  6. @Optional() storeRootModule: StoreRootModule,
  7. @Optional() storeFeatureModule: StoreFeatureModule,
  8. @Optional()
  9. @Inject(_ROOT_EFFECTS_GUARD)
  10. guard: any
  11. ) {
  12. runner.start();
  13. rootEffects.forEach(effectSourceInstance =>
  14. sources.addEffects(effectSourceInstance)
  15. );
  16. store.dispatch({ type: ROOT_EFFECTS_INIT });
  17. }
  18. addEffects(effectSourceInstance: any) {
  19. this.sources.addEffects(effectSourceInstance);
  20. }

runner.start 方法十分关键,它将 effect 实例中每个通过 this.actions$.pipe() 创建的属性 observable 转为 action流并让 store 订阅

  1. start() {
  2. if (!this.effectsSubscription) {
  3. this.effectsSubscription = this.effectSources
  4. .toActions()
  5. .subscribe(this.store);
  6. }
  7. }

toActions 方法为 effectSources 绑定了订阅者 store

  1. toActions(): Observable<Action> {
  2. return this.pipe(
  3. groupBy(getSourceForInstance),
  4. mergeMap((source$) => {
  5. return source$.pipe(groupBy(effectsInstance));
  6. }),
  7. mergeMap((source$) => {
  8. const effect$ = source$.pipe(
  9. exhaustMap((sourceInstance) => {
  10. return resolveEffectSource(
  11. this.errorHandler,
  12. this.effectsErrorHandler
  13. )(sourceInstance);
  14. }),
  15. map((output) => {
  16. reportInvalidActions(output, this.errorHandler);
  17. return output.notification;
  18. }),
  19. filter(
  20. (notification): notification is Notification<Action> =>
  21. notification.kind === 'N'
  22. ),
  23. dematerialize()
  24. );
  25. // start the stream with an INIT action
  26. // do this only for the first Effect instance
  27. const init$ = source$.pipe(
  28. take(1),
  29. filter(isOnInitEffects),
  30. map((instance) => instance.ngrxOnInitEffects())
  31. );
  32. return merge(effect$, init$);
  33. })
  34. );
  35. }

接下来的 sources.addEffects(effectSourceInstance) 则直接将所有effect 实例传给 effectSources,于是每个 effect 实例都会走一遍 toAction 方法。看下 resolveEffectSource

  1. function resolveEffectSource(
  2. errorHandler: ErrorHandler,
  3. effectsErrorHandler: EffectsErrorHandler
  4. ): (sourceInstance: any) => Observable<EffectNotification> {
  5. return (sourceInstance) => {
  6. const mergedEffects$ = mergeEffects(
  7. sourceInstance,
  8. errorHandler,
  9. effectsErrorHandler
  10. );
  11. if (isOnRunEffects(sourceInstance)) {
  12. return sourceInstance.ngrxOnRunEffects(mergedEffects$);
  13. }
  14. return mergedEffects$;
  15. };
  16. }
  17. export function mergeEffects(
  18. sourceInstance: any,
  19. globalErrorHandler: ErrorHandler,
  20. effectsErrorHandler: EffectsErrorHandler
  21. ): Observable<EffectNotification> {
  22. const sourceName = getSourceForInstance(sourceInstance).constructor.name;
  23. const observables$: Observable<any>[] = getSourceMetadata(sourceInstance).map(
  24. ({
  25. propertyName,
  26. dispatch,
  27. useEffectsErrorHandler,
  28. }): Observable<EffectNotification> => {
  29. const observable$: Observable<any> =
  30. typeof sourceInstance[propertyName] === 'function'
  31. ? sourceInstance[propertyName]()
  32. : sourceInstance[propertyName];
  33. const effectAction$ = useEffectsErrorHandler
  34. ? effectsErrorHandler(observable$, globalErrorHandler)
  35. : observable$;
  36. if (dispatch === false) {
  37. return effectAction$.pipe(ignoreElements());
  38. }
  39. const materialized$ = effectAction$.pipe(materialize());
  40. return materialized$.pipe(
  41. map(
  42. (notification: Notification<Action>): EffectNotification => ({
  43. effect: sourceInstance[propertyName],
  44. notification,
  45. propertyName,
  46. sourceName,
  47. sourceInstance,
  48. })
  49. )
  50. );
  51. }
  52. );
  53. return merge(...observables$);
  54. }

mergeEffects 把 effect 中每个子 observable 都分离了出来,分别被 store 订阅,这就走通了整个流程了。首先 store dispatch 一个 action,接着 state 被 reducer 更新,然后 action 传给了 ScannedActionsSubject,然后 effect 这边就收到了该 action,然后 effct 里对该 action 感兴趣的子 observable 逻辑得到执行,一般会产生一个新 action,这个新 action 被 store 订阅了,于是又到了store dispatch 一个 action,形成闭环。当然这里要注意避免死循环,对不需要被 store 接受的 action 用 dispatch:false 处理

正文3 - EffectsFeatureModule

这个就比较好懂了

  1. constructor(
  2. root: EffectsRootModule,
  3. @Inject(FEATURE_EFFECTS) effectSourceGroups: any[][],
  4. @Optional() storeRootModule: StoreRootModule,
  5. @Optional() storeFeatureModule: StoreFeatureModule
  6. ) {
  7. effectSourceGroups.forEach((group) =>
  8. group.forEach((effectSourceInstance) =>
  9. root.addEffects(effectSourceInstance)
  10. )
  11. );
  12. }

就是调用 EffectsRootModule 的 addEffects 方法将子 effect 加入到 EffectModule 系统中:

  1. addEffects(effectSourceInstance: any) {
  2. this.sources.addEffects(effectSourceInstance);
  3. }

这就触发了 toActions 中的源 observable 了,接下来的逻辑和正文2一样了

后序

仓库中有一些技巧,如用装饰器 Effect 收集一些基本信息,等将来再使用。还有很多RXJS高级操作符的使用,很值得学习。