概念

effect

只要是被 yield 的值,都可以被称为 effect
业务代码往往充当 effect 的生产者,生成 effect
effect的类型主要有promise、iterator、take、put ,合理地组合不同类型的 effect 可以表达非常复杂的异步逻辑
effect 状态分为运行中、已完成(正常结束或是抛出错误结束都算完成)、被取消。

saga-middleware

yield 语句将 effect 传给 saga-middleware,saga-middleware 充当 effect 的消费者
获取 effect,根据 effect 的类型解释该 effect,然后将结果返回给生产者

模型

「请求-响应」模型,业务代码生产 effect 以发起「请求」,而 saga-middleware 负责消费并将响应返回给业务代码

saga

作为参数传入函数 proc 或 sagaMiddleware.run 执行的生成器函数
saga 实例指的是调用 saga 函数得到的迭代器对象

task

task 指的是 saga 实例运行状态的描述对象

  1. interface Task {
  2. cancel(): void
  3. toPromise(): Promise<any>
  4. result: any
  5. error: Error
  6. isRunning: boolean
  7. isCancelled: boolean
  8. isAborted: boolean
  9. }

fork effect

saga 实例在运行的时候会多次 yield fork effect,那么一个 parent-saga 实例就会有多个 child-saga
rootSaga 通过 sagaMiddleware.run() 开始运行 fork 得到若干个 child-saga => saga 树

saga 树

迭代器自身的语句执行完成
所有的 child-saga 进入完成状态
迭代器自身执行时抛出了异常/其中一个 child-saga 抛出了错误=>saga 实例会中断并抛出错误
当一个节点发生错误时,错误会沿着树向根节点向上传播,直到某个节点捕获该错误
取消一个节点时,该节点对应的整个子树都将被取消

effect-runner

使用递归函数来消费迭代器 特点可以传入effect参数,可以同步或者异步调用next消费effect
通过驱动函数next消费effect

代码分析

  1. const iterator = range2(1, 10)
  2. // 驱动函数
  3. function next(arg, isErr) {
  4. let result
  5. if (isErr) {
  6. result = iterator.throw(arg)
  7. } else {
  8. result = iterator.next(arg)
  9. }
  10. const { done, value } = result
  11. if (done) {
  12. return
  13. }
  14. // 根据effect的类型执行相应的处理逻辑
  15. if (value[0] === "promise") {
  16. const promise = value[1]
  17. promise.then(
  18. resolvedValue => next(resolvedValue),
  19. error => next(error, true)
  20. )
  21. } else if (value[0] === "delay") {
  22. const timeout = value[1]
  23. setTimeout(() => next(`${timeout}ms elapsed`), timeout)
  24. } else if (value[0] === "ping") {
  25. next("pong")
  26. } else {
  27. iterator.throw(new Error("无法识别的 effect"))
  28. }
  29. }
  30. next()
  31. // cancellation
  32. // effects 是可取消的
  33. // effect规范化处理
  34. // 每一个 effect 在运行之前都会通过函数 digestEffect 的处理。该函数用变量 effectSettled 记录了一个 effect 是否已经完成
  35. function digestEffect(rawEffect, cb) {
  36. let effectSettled = false
  37. // 通过callback函数及时得到effect执行状态,调用该函数,表明一旦完成或是被取消,就不能再调用cb改变状态
  38. function currCb(res, isErr) {
  39. if (effectSettled) {
  40. return
  41. }
  42. effectSettled = true
  43. cb.cancel = noop
  44. cb(res, isErr)
  45. }
  46. currCb.cancel = noop
  47. // 通过cb注册cancel函数,改变标识符的值,可以达到同样的目的,不再调用cb改变状态,实现effect取消功能
  48. cb.cancel = () => {
  49. if (effectSettled) {
  50. return
  51. }
  52. effectSettled = true
  53. try {
  54. currCb.cancel()
  55. } catch (err) {
  56. console.error(err)
  57. }
  58. currCb.cancel = noop
  59. }
  60. // normalizeEffect用来内省识别effect类型,不需要开发者手动去标识effect类型
  61. runEffect(normalizeEffect(rawEffect), currCb)
  62. }
  63. //************************************************ */
  64. // redux-saga
  65. const TASK_CANCEL = Symbol("TASK_CANCEL")
  66. const CANCEL = Symbol("CANCEL")
  67. // 所有 saga 实例都是通过该函数启动的,返回 task
  68. // 建立对象之间的后继关系(cont)和取消关系(cancellation)
  69. function proc(iterator, parentContext, cont) {
  70. // 初始化当前 task 的 context
  71. const ctx = Object.create(parentContext)
  72. // mainTask 用来跟踪当前迭代器的语句执行状态
  73. const mainTask = {
  74. // cont: **will be set when passed to ForkQueue**
  75. isRunning: true,
  76. isCancelled: false,
  77. cancel() {
  78. if (mainTask.isRunning && !mainTask.isCancelled) {
  79. mainTask.isCancelled = true
  80. next(TASK_CANCEL)
  81. }
  82. },
  83. }
  84. // 实际上task并不是这么构造的,不过在初步实现中,暂时先这样吧
  85. // const task = {
  86. // cancel: () => next(TASK_CANCEL),
  87. // }
  88. // 创建 ForkQueue 对象和 Task 对象,这两个类的代码在后面会写出来
  89. const taskQueue = new ForkQueue(mainTask)
  90. const task = new Task(taskQueue)
  91. // 设置后继关系
  92. taskQueue.cont = task.end
  93. task.cont = cont
  94. // 设置取消关系
  95. cont.cancel = task.cancel
  96. next()
  97. return task
  98. // 在图中驱动函数只和 mainTask 有联系
  99. // 然后我们也可以发现下面 next 函数的代码中,也只调用了 mainTask 的接口
  100. // 即 next 函数中的代码不会引用 task 和 taskQueue 对象
  101. function next(arg, isErr) {
  102. try {
  103. let result
  104. if (isErr) {
  105. result = iterator.throw(arg)
  106. } else if (arg === TASK_CANCEL) {
  107. // next.cancel 由当前正在执行的 effectRunner 所设置
  108. next.cancel()
  109. // 跳转到迭代器的 finally block,执行清理逻辑
  110. result = iterator.return(TASK_CANCEL)
  111. } else {
  112. result = iterator.next(arg)
  113. }
  114. if (!result.done) {
  115. digestEffect(result.value, next)
  116. } else {
  117. // 迭代器执行完毕,调用cont将结果返回给上层
  118. mainTask.isRunning = false
  119. mainTask.cont(result.value)
  120. }
  121. } catch (error) {
  122. if (!mainTask.isRunning) {
  123. throw error
  124. }
  125. if (mainTask.isCancelled) {
  126. // 在执行 cancel 逻辑时发生错误,在 3.4 其他问题与细节 中说明
  127. console.error(error)
  128. }
  129. mainTask.isRunning = false
  130. mainTask.cont(error, true)
  131. }
  132. }
  133. // function digestEffect(rawEffect, cb) { /* ...... */ }
  134. // 执行effect,根据effect的类型调用不同的effectRunner
  135. function runEffect(effect, currCb) {
  136. const effectType = effect[0]
  137. if (effectType === "promise") {
  138. resolvePromise(effect, ctx, currCb)
  139. } else if (effectType === "iterator") {
  140. resolveIterator(iterator, ctx, currCb)
  141. } else {
  142. // 拓展这里的 if-else 便可以拓展新的effect类型
  143. throw new Error("Unknown effect type")
  144. }
  145. }
  146. function resolvePromise([effectType, promise], ctx, cb) {
  147. const cancelPromise = promise[CANCEL]
  148. if (is.func(cancelPromise)) {
  149. // 设置promise的cancel逻辑
  150. cb.cancel = cancelPromise
  151. }
  152. promise.then(cb, error => cb(error, true))
  153. }
  154. function resolveIterator([effectType, iterator], ctx, cb) {
  155. proc(iterator, ctx, cb)
  156. }
  157. }
  158. // https://zhuanlan.zhihu.com/p/37356948
  159. // fork effect
  160. function runForkEffect([effectType, fn, ...args], ctx, cb) {
  161. const iterator = createTaskIterator(fn, args)
  162. try {
  163. suspend() // 见 3.4 scheduler
  164. const subTask = proc(iterator, ctx, noop)
  165. if (subTask.isRunning) {
  166. task.taskQueue.addTask(subTask)
  167. cb(subTask)
  168. } else if (subTask.error) {
  169. task.taskQueue.abort(subTask.error)
  170. } else {
  171. cb(subTask)
  172. }
  173. } finally {
  174. flush() // 见 3.4 scheduler
  175. }
  176. }
  177. interface ForkQueue {
  178. // MainTask 该参数代表当前迭代器自身代码的执行状态
  179. constructor(mainTask: MainTask)
  180. // 在 ForkQueue 被构造之后进行设置。当所有的 child-task 以及 mainTask 都完成时,我们需要调用 forkQueue.cont 来通知其 parent-saga
  181. // cont: Callback 这是一个私有的字段
  182. // 添加新的 child-task ,赋值task.cont函数,task.cont调用实例方法cont返回结果值
  183. addTask(task: Task): void
  184. cancelAll(): void
  185. // 取消所有的 child-task,还会调用 forkQueue.cont 向 parent-task 通知错误
  186. abort(err: Error): void
  187. }
  188. // task context
  189. // 通过原型链的方式访问父节点的上下文
  190. // 利用 context 机制,并使用该机制实现「effect 类型拓展」、「连接 redux store」等功能
  191. // /src/core/Task.js
  192. class Task {
  193. isRunning = true
  194. isCancelled = false
  195. isAborted = false
  196. result = undefined
  197. error = undefined
  198. joiners = []
  199. // cont will be set after calling constructor()
  200. cont = undefined
  201. constructor(taskQueue) {
  202. this.taskQueue = taskQueue
  203. }
  204. // 调用 cancel 函数来取消该 Task,这将取消所有当前正在执行的 child-task 和 mainTask
  205. // cancellation 会向下传播,意味着该 Task 对应的 saga-tree 子树都将会被取消
  206. // 同时 cancellation 也会传递给该 Task 的所有 joiners
  207. cancel = () => {
  208. // 如果该 Task 已经完成或是已经被取消,则跳过
  209. if (this.isRunning && !this.isCancelled) {
  210. this.isCancelled = true
  211. this.taskQueue.cancelAll()
  212. // 将 TASK_CANCEL 传递给所有 joiners
  213. this.end(TASK_CANCEL)
  214. }
  215. }
  216. // 结束当前 Task
  217. // 设置 Task 的 result/error,然后调用 task.cont,最后将结果传递给 joiners
  218. // 当该 Task 的 child-task 和 mainTask 都完成时(即 fork-queue 完成时),该函数将被调用
  219. end = (result, isErr) => {
  220. this.isRunning = false
  221. if (!isErr) {
  222. this.result = result
  223. } else {
  224. this.error = result
  225. this.isAborted = true
  226. }
  227. this.cont(result, isErr)
  228. this.joiners.forEach(j => j.cb(result, isErr))
  229. this.joiners = null
  230. }
  231. toPromise() {
  232. // 获取 task 对应的 promise 对象,这里省略了代码
  233. }
  234. }
  235. function createSagaMiddleware(cont) {
  236. function middleware({ dispatch, getState }) {
  237. let channelPut
  238. const env = new Env(cont)
  239. .use(commonEffects)
  240. .use(channelEffects)
  241. .use(ctx => {
  242. // 记录「真实」的 channel.put
  243. channelPut = ctx.channel.put
  244. // 使用 dispatch 替换掉 channel 上的 put 方法
  245. ctx.channel.put = action => {
  246. action[SAGA_ACTION] = true
  247. dispatch(action)
  248. }
  249. // 使用 def 方法来定义 select 类型的 effect-runner
  250. def(ctx, 'select', ([_effectType, selector = identity, ...args], _ctx, cb) =>
  251. cb(selector(getState(), ...args)),
  252. )
  253. })
  254. // 当 middleware 函数执行时,说明 store 正在创建
  255. // 此时我们给 middleware.run 设置正确的函数
  256. middleware.run = (...args) => env.run(...args)
  257. return next => action => {
  258. const result = next(action) // hit reducers
  259. // 下面的 if-else 主要是为了保证 channelPut(action) 恰好被包裹在一层 asap 中
  260. // asap 的介绍见 3.4
  261. if (action[SAGA_ACTION]) {
  262. // SAGA_ACTION 字段为 true 表示该 action 来自 saga
  263. // 而在 saga 中,我们在 put 的时候已经使用了函数asap
  264. // 所以在这里就不需要再次调用 asap 了
  265. channelPut(action)
  266. } else {
  267. // 表示该 action 来自 store.dispatch
  268. // 例如某个 React 组件的 onClick 中调用了 dispatch 方法
  269. asap(() => channelPut(action))
  270. }
  271. return result
  272. }
  273. }
  274. middleware.run = (...args) => {
  275. throw new Error('运行 Saga 函数之前,必须使用 applyMiddleware 将 Saga 中间件加载到 Store 中')
  276. }
  277. return middleware
  278. }