概念
effect
只要是被 yield 的值,都可以被称为 effect
业务代码往往充当 effect 的生产者,生成 effect
effect的类型主要有promise、iterator、take、put ,合理地组合不同类型的 effect 可以表达非常复杂的异步逻辑
effect 状态分为运行中、已完成(正常结束或是抛出错误结束都算完成)、被取消。
saga-middleware
yield 语句将 effect 传给 saga-middleware,saga-middleware 充当 effect 的消费者
获取 effect,根据 effect 的类型解释该 effect,然后将结果返回给生产者
模型
「请求-响应」模型,业务代码生产 effect 以发起「请求」,而 saga-middleware 负责消费并将响应返回给业务代码
saga
作为参数传入函数 proc 或 sagaMiddleware.run 执行的生成器函数
saga 实例指的是调用 saga 函数得到的迭代器对象
task
task 指的是 saga 实例运行状态的描述对象
interface Task {cancel(): voidtoPromise(): Promise<any>result: anyerror: ErrorisRunning: booleanisCancelled: booleanisAborted: boolean}
fork effect
saga 实例在运行的时候会多次 yield fork effect,那么一个 parent-saga 实例就会有多个 child-saga
rootSaga 通过 sagaMiddleware.run() 开始运行 fork 得到若干个 child-saga => saga 树
saga 树
迭代器自身的语句执行完成
所有的 child-saga 进入完成状态
迭代器自身执行时抛出了异常/其中一个 child-saga 抛出了错误=>saga 实例会中断并抛出错误
当一个节点发生错误时,错误会沿着树向根节点向上传播,直到某个节点捕获该错误
取消一个节点时,该节点对应的整个子树都将被取消
effect-runner
使用递归函数来消费迭代器 特点可以传入effect参数,可以同步或者异步调用next消费effect
通过驱动函数next消费effect
代码分析
const iterator = range2(1, 10)// 驱动函数function next(arg, isErr) {let resultif (isErr) {result = iterator.throw(arg)} else {result = iterator.next(arg)}const { done, value } = resultif (done) {return}// 根据effect的类型执行相应的处理逻辑if (value[0] === "promise") {const promise = value[1]promise.then(resolvedValue => next(resolvedValue),error => next(error, true))} else if (value[0] === "delay") {const timeout = value[1]setTimeout(() => next(`${timeout}ms elapsed`), timeout)} else if (value[0] === "ping") {next("pong")} else {iterator.throw(new Error("无法识别的 effect"))}}next()// cancellation// effects 是可取消的// effect规范化处理// 每一个 effect 在运行之前都会通过函数 digestEffect 的处理。该函数用变量 effectSettled 记录了一个 effect 是否已经完成function digestEffect(rawEffect, cb) {let effectSettled = false// 通过callback函数及时得到effect执行状态,调用该函数,表明一旦完成或是被取消,就不能再调用cb改变状态function currCb(res, isErr) {if (effectSettled) {return}effectSettled = truecb.cancel = noopcb(res, isErr)}currCb.cancel = noop// 通过cb注册cancel函数,改变标识符的值,可以达到同样的目的,不再调用cb改变状态,实现effect取消功能cb.cancel = () => {if (effectSettled) {return}effectSettled = truetry {currCb.cancel()} catch (err) {console.error(err)}currCb.cancel = noop}// normalizeEffect用来内省识别effect类型,不需要开发者手动去标识effect类型runEffect(normalizeEffect(rawEffect), currCb)}//************************************************ */// redux-sagaconst TASK_CANCEL = Symbol("TASK_CANCEL")const CANCEL = Symbol("CANCEL")// 所有 saga 实例都是通过该函数启动的,返回 task// 建立对象之间的后继关系(cont)和取消关系(cancellation)function proc(iterator, parentContext, cont) {// 初始化当前 task 的 contextconst ctx = Object.create(parentContext)// mainTask 用来跟踪当前迭代器的语句执行状态const mainTask = {// cont: **will be set when passed to ForkQueue**isRunning: true,isCancelled: false,cancel() {if (mainTask.isRunning && !mainTask.isCancelled) {mainTask.isCancelled = truenext(TASK_CANCEL)}},}// 实际上task并不是这么构造的,不过在初步实现中,暂时先这样吧// const task = {// cancel: () => next(TASK_CANCEL),// }// 创建 ForkQueue 对象和 Task 对象,这两个类的代码在后面会写出来const taskQueue = new ForkQueue(mainTask)const task = new Task(taskQueue)// 设置后继关系taskQueue.cont = task.endtask.cont = cont// 设置取消关系cont.cancel = task.cancelnext()return task// 在图中驱动函数只和 mainTask 有联系// 然后我们也可以发现下面 next 函数的代码中,也只调用了 mainTask 的接口// 即 next 函数中的代码不会引用 task 和 taskQueue 对象function next(arg, isErr) {try {let resultif (isErr) {result = iterator.throw(arg)} else if (arg === TASK_CANCEL) {// next.cancel 由当前正在执行的 effectRunner 所设置next.cancel()// 跳转到迭代器的 finally block,执行清理逻辑result = iterator.return(TASK_CANCEL)} else {result = iterator.next(arg)}if (!result.done) {digestEffect(result.value, next)} else {// 迭代器执行完毕,调用cont将结果返回给上层mainTask.isRunning = falsemainTask.cont(result.value)}} catch (error) {if (!mainTask.isRunning) {throw error}if (mainTask.isCancelled) {// 在执行 cancel 逻辑时发生错误,在 3.4 其他问题与细节 中说明console.error(error)}mainTask.isRunning = falsemainTask.cont(error, true)}}// function digestEffect(rawEffect, cb) { /* ...... */ }// 执行effect,根据effect的类型调用不同的effectRunnerfunction runEffect(effect, currCb) {const effectType = effect[0]if (effectType === "promise") {resolvePromise(effect, ctx, currCb)} else if (effectType === "iterator") {resolveIterator(iterator, ctx, currCb)} else {// 拓展这里的 if-else 便可以拓展新的effect类型throw new Error("Unknown effect type")}}function resolvePromise([effectType, promise], ctx, cb) {const cancelPromise = promise[CANCEL]if (is.func(cancelPromise)) {// 设置promise的cancel逻辑cb.cancel = cancelPromise}promise.then(cb, error => cb(error, true))}function resolveIterator([effectType, iterator], ctx, cb) {proc(iterator, ctx, cb)}}// https://zhuanlan.zhihu.com/p/37356948// fork effectfunction runForkEffect([effectType, fn, ...args], ctx, cb) {const iterator = createTaskIterator(fn, args)try {suspend() // 见 3.4 schedulerconst subTask = proc(iterator, ctx, noop)if (subTask.isRunning) {task.taskQueue.addTask(subTask)cb(subTask)} else if (subTask.error) {task.taskQueue.abort(subTask.error)} else {cb(subTask)}} finally {flush() // 见 3.4 scheduler}}interface ForkQueue {// MainTask 该参数代表当前迭代器自身代码的执行状态constructor(mainTask: MainTask)// 在 ForkQueue 被构造之后进行设置。当所有的 child-task 以及 mainTask 都完成时,我们需要调用 forkQueue.cont 来通知其 parent-saga// cont: Callback 这是一个私有的字段// 添加新的 child-task ,赋值task.cont函数,task.cont调用实例方法cont返回结果值addTask(task: Task): voidcancelAll(): void// 取消所有的 child-task,还会调用 forkQueue.cont 向 parent-task 通知错误abort(err: Error): void}// task context// 通过原型链的方式访问父节点的上下文// 利用 context 机制,并使用该机制实现「effect 类型拓展」、「连接 redux store」等功能// /src/core/Task.jsclass Task {isRunning = trueisCancelled = falseisAborted = falseresult = undefinederror = undefinedjoiners = []// cont will be set after calling constructor()cont = undefinedconstructor(taskQueue) {this.taskQueue = taskQueue}// 调用 cancel 函数来取消该 Task,这将取消所有当前正在执行的 child-task 和 mainTask// cancellation 会向下传播,意味着该 Task 对应的 saga-tree 子树都将会被取消// 同时 cancellation 也会传递给该 Task 的所有 joinerscancel = () => {// 如果该 Task 已经完成或是已经被取消,则跳过if (this.isRunning && !this.isCancelled) {this.isCancelled = truethis.taskQueue.cancelAll()// 将 TASK_CANCEL 传递给所有 joinersthis.end(TASK_CANCEL)}}// 结束当前 Task// 设置 Task 的 result/error,然后调用 task.cont,最后将结果传递给 joiners// 当该 Task 的 child-task 和 mainTask 都完成时(即 fork-queue 完成时),该函数将被调用end = (result, isErr) => {this.isRunning = falseif (!isErr) {this.result = result} else {this.error = resultthis.isAborted = true}this.cont(result, isErr)this.joiners.forEach(j => j.cb(result, isErr))this.joiners = null}toPromise() {// 获取 task 对应的 promise 对象,这里省略了代码}}function createSagaMiddleware(cont) {function middleware({ dispatch, getState }) {let channelPutconst env = new Env(cont).use(commonEffects).use(channelEffects).use(ctx => {// 记录「真实」的 channel.putchannelPut = ctx.channel.put// 使用 dispatch 替换掉 channel 上的 put 方法ctx.channel.put = action => {action[SAGA_ACTION] = truedispatch(action)}// 使用 def 方法来定义 select 类型的 effect-runnerdef(ctx, 'select', ([_effectType, selector = identity, ...args], _ctx, cb) =>cb(selector(getState(), ...args)),)})// 当 middleware 函数执行时,说明 store 正在创建// 此时我们给 middleware.run 设置正确的函数middleware.run = (...args) => env.run(...args)return next => action => {const result = next(action) // hit reducers// 下面的 if-else 主要是为了保证 channelPut(action) 恰好被包裹在一层 asap 中// asap 的介绍见 3.4if (action[SAGA_ACTION]) {// SAGA_ACTION 字段为 true 表示该 action 来自 saga// 而在 saga 中,我们在 put 的时候已经使用了函数asap// 所以在这里就不需要再次调用 asap 了channelPut(action)} else {// 表示该 action 来自 store.dispatch// 例如某个 React 组件的 onClick 中调用了 dispatch 方法asap(() => channelPut(action))}return result}}middleware.run = (...args) => {throw new Error('运行 Saga 函数之前,必须使用 applyMiddleware 将 Saga 中间件加载到 Store 中')}return middleware}
