前言
@ngrx/effect 作为 Angular 副作用处理库,在实现上必然离不开 @ngrx/store,所以不清楚@ngrx/store 原理的请先读这篇文章了解下。
正文1 - 依赖注入
static forRoot(rootEffects: Type<any>[] = []): ModuleWithProviders<EffectsRootModule> {return {ngModule: EffectsRootModule,providers: [{provide: _ROOT_EFFECTS_GUARD,useFactory: _provideForRootGuard,deps: [[EffectsRunner, new Optional(), new SkipSelf()]],},{provide: EFFECTS_ERROR_HANDLER,useValue: defaultEffectsErrorHandler,},// AEffectsRunner,// BEffectSources,// CActions,// D1rootEffects,// D2{provide: _ROOT_EFFECTS,useValue: [rootEffects],},{provide: USER_PROVIDED_EFFECTS,multi: true,useValue: [],},// E{provide: ROOT_EFFECTS,useFactory: createEffects,deps: [Injector, _ROOT_EFFECTS, USER_PROVIDED_EFFECTS],},],};}
- A,EffectsRunner
依赖 B(EffectSources),其 start 方法是将 effect 连接 store 的关键
constructor(private effectSources: EffectSources,private store: Store<any>) {}start() {if (!this.effectsSubscription) {this.effectsSubscription = this.effectSources.toActions().subscribe(this.store);}}
- B,EffectSources
它有个 toActions 方法,用于将 effect流 转化为 action流供 store 订阅
constructor(private errorHandler: ErrorHandler,@Inject(EFFECTS_ERROR_HANDLER)private effectsErrorHandler: EffectsErrorHandler) {super();}
- C,Actions
注入了 ScannedActionsSubject 作为 source,从上篇文章可知每当store diapatch 一个 action 时,ScannedActionsSubject 都会收到这个 action,所以这个 Actions 实际上就是 store 里的 action 流
constructor(@Inject(ScannedActionsSubject) source?: Observable<V>) {super();if (source) {this.source = source;}}
- D1,D2都是获取输入的 effects
- E,createEffects(injector,effects,user_effects)
export function createEffects(injector: Injector,effectGroups: Type<any>[][],userProvidedEffectGroups: Type<any>[][]): any[] {const mergedEffects: Type<any>[] = [];for (let effectGroup of effectGroups) {mergedEffects.push(...effectGroup);}for (let userProvidedEffectGroup of userProvidedEffectGroups) {mergedEffects.push(...userProvidedEffectGroup);}return createEffectInstances(injector, mergedEffects);}
做了合并操作后,就开始创建 effect 实例了
export function createEffectInstances(injector: Injector,effects: Type<any>[]): any[] {return effects.map((effect) => injector.get(effect));}
也就是说,E = [根的 effect 实例]
到这里依赖注入部分就结束了。
正文2 - EffectsRootModule
constructor(private sources: EffectSources,runner: EffectsRunner,store: Store<any>,@Inject(ROOT_EFFECTS) rootEffects: any[],@Optional() storeRootModule: StoreRootModule,@Optional() storeFeatureModule: StoreFeatureModule,@Optional()@Inject(_ROOT_EFFECTS_GUARD)guard: any) {runner.start();rootEffects.forEach(effectSourceInstance =>sources.addEffects(effectSourceInstance));store.dispatch({ type: ROOT_EFFECTS_INIT });}addEffects(effectSourceInstance: any) {this.sources.addEffects(effectSourceInstance);}
runner.start 方法十分关键,它将 effect 实例中每个通过 this.actions$.pipe() 创建的属性 observable 转为 action流并让 store 订阅
start() {if (!this.effectsSubscription) {this.effectsSubscription = this.effectSources.toActions().subscribe(this.store);}}
toActions 方法为 effectSources 绑定了订阅者 store
toActions(): Observable<Action> {return this.pipe(groupBy(getSourceForInstance),mergeMap((source$) => {return source$.pipe(groupBy(effectsInstance));}),mergeMap((source$) => {const effect$ = source$.pipe(exhaustMap((sourceInstance) => {return resolveEffectSource(this.errorHandler,this.effectsErrorHandler)(sourceInstance);}),map((output) => {reportInvalidActions(output, this.errorHandler);return output.notification;}),filter((notification): notification is Notification<Action> =>notification.kind === 'N'),dematerialize());// start the stream with an INIT action// do this only for the first Effect instanceconst init$ = source$.pipe(take(1),filter(isOnInitEffects),map((instance) => instance.ngrxOnInitEffects()));return merge(effect$, init$);}));}
接下来的 sources.addEffects(effectSourceInstance) 则直接将所有effect 实例传给 effectSources,于是每个 effect 实例都会走一遍 toAction 方法。看下 resolveEffectSource
function resolveEffectSource(errorHandler: ErrorHandler,effectsErrorHandler: EffectsErrorHandler): (sourceInstance: any) => Observable<EffectNotification> {return (sourceInstance) => {const mergedEffects$ = mergeEffects(sourceInstance,errorHandler,effectsErrorHandler);if (isOnRunEffects(sourceInstance)) {return sourceInstance.ngrxOnRunEffects(mergedEffects$);}return mergedEffects$;};}export function mergeEffects(sourceInstance: any,globalErrorHandler: ErrorHandler,effectsErrorHandler: EffectsErrorHandler): Observable<EffectNotification> {const sourceName = getSourceForInstance(sourceInstance).constructor.name;const observables$: Observable<any>[] = getSourceMetadata(sourceInstance).map(({propertyName,dispatch,useEffectsErrorHandler,}): Observable<EffectNotification> => {const observable$: Observable<any> =typeof sourceInstance[propertyName] === 'function'? sourceInstance[propertyName](): sourceInstance[propertyName];const effectAction$ = useEffectsErrorHandler? effectsErrorHandler(observable$, globalErrorHandler): observable$;if (dispatch === false) {return effectAction$.pipe(ignoreElements());}const materialized$ = effectAction$.pipe(materialize());return materialized$.pipe(map((notification: Notification<Action>): EffectNotification => ({effect: sourceInstance[propertyName],notification,propertyName,sourceName,sourceInstance,})));});return merge(...observables$);}
mergeEffects 把 effect 中每个子 observable 都分离了出来,分别被 store 订阅,这就走通了整个流程了。首先 store dispatch 一个 action,接着 state 被 reducer 更新,然后 action 传给了 ScannedActionsSubject,然后 effect 这边就收到了该 action,然后 effct 里对该 action 感兴趣的子 observable 逻辑得到执行,一般会产生一个新 action,这个新 action 被 store 订阅了,于是又到了store dispatch 一个 action,形成闭环。当然这里要注意避免死循环,对不需要被 store 接受的 action 用 dispatch:false 处理
正文3 - EffectsFeatureModule
这个就比较好懂了
constructor(root: EffectsRootModule,@Inject(FEATURE_EFFECTS) effectSourceGroups: any[][],@Optional() storeRootModule: StoreRootModule,@Optional() storeFeatureModule: StoreFeatureModule) {effectSourceGroups.forEach((group) =>group.forEach((effectSourceInstance) =>root.addEffects(effectSourceInstance)));}
就是调用 EffectsRootModule 的 addEffects 方法将子 effect 加入到 EffectModule 系统中:
addEffects(effectSourceInstance: any) {this.sources.addEffects(effectSourceInstance);}
这就触发了 toActions 中的源 observable 了,接下来的逻辑和正文2一样了
后序
仓库中有一些技巧,如用装饰器 Effect 收集一些基本信息,等将来再使用。还有很多RXJS高级操作符的使用,很值得学习。
