背景是在项目中需要选定一个处理redux异步数据流控制库,redux-saga相对来说更受欢迎,使用库的过程中,如果能知道它的实现方式,对于更优雅,更有效率的使用库本身有比较大的好处,所以抽空看了它的源码实现,有所收获,记录下来分享给大家!

假设不使用redux-thunk或者redux-saga

使用async await等简单方案 获取用户id数据,发出action将数据保存到store

  1. function fetchData(userId) {
  2. // 返回一个promise, 内容是数据
  3. }
  4. function fetchUser() {
  5. // 返回一个promise, 内容是用户信息
  6. }
  7. class Component {
  8. componentDidMount() {
  9. const { userInfo: { userId } } = await fetchUser();
  10. store.dispatch({type: 'UPDATE_USER_ID', payload: userId});
  11. const { data } = await fetchData(userId);
  12. store.dispatch({type: 'UPDATE_DATA', payload: data});
  13. }
  14. }

需要扩展怎么办,相对来说好一点的方案如下

  1. class DataHandler {
  2. static fetchData() {
  3. const { userInfo: { userId } } = await fetchUser();
  4. store.dispatch({type: 'UPDATE_USER_ID', payload: userId});
  5. const { data } = await fetchData(userId);
  6. store.dispatch({type: 'UPDATE_DATA', payload: data});
  7. }
  8. }
  9. class ComponentA {
  10. componentDidMount() {
  11. DataHandler.fetchData();
  12. }
  13. }
  14. class ComponentB {
  15. componentDidMount() {
  16. DataHandler.fetchData();
  17. }
  18. }

可是如果需要定制数据处理逻辑,有的组件获取到数据含有其他额外处理就没法完成复用,需要额外开发逻辑处理,最好的方法是,把异步数据获取和操作独立出来,通过指令压缩或扩展逻辑

使用redux-saga之后

异步逻辑抽离到了saga文件,可以构建统一的 集中的异步处理中心,可以直接分发action触发对应逻辑,复用也简单,分发同一个action即可,如果有特殊定制,只需另加saga方法特殊处理,是store reducer专注处理state,接口与实现 分离

  1. // store
  2. import rootSaga from './sagas'
  3. const sagaMiddleware = createSagaMiddleware()
  4. const store = ...
  5. sagaMiddleware.run(rootSaga)
  6. // saga.js
  7. import { delay } from 'redux-saga'
  8. import { put, takeEvery, all } from 'redux-saga/effects'
  9. function* incrementAsync() {
  10. yield delay(1000)
  11. //...可以加额外逻辑
  12. yield put({ type: 'INCREMENT1' })
  13. // ...handler
  14. }
  15. function* incrementAsync2() {
  16. yield delay(1000)
  17. //...可以加额外逻辑
  18. yield put({ type: 'INCREMENT2' })
  19. // ...handler
  20. }
  21. function* incrementAsync3() {
  22. yield delay(1000)
  23. //...可以加额外逻辑
  24. yield put({ type: 'INCREMENT3' })
  25. // ...handler
  26. }
  27. function* watchIncrementAsync() {
  28. yield takeEvery('INCREMENT_ASYNC1', incrementAsync)
  29. yield takeEvery('INCREMENT_ASYNC2', incrementAsync2)
  30. yield takeEvery('INCREMENT_ASYNC', incrementAsync3)
  31. }
  32. function* watchIncrementAsync2() {
  33. yield takeEvery('INCREMENT_ASYNC2', incrementAsync2)
  34. yield takeEvery('INCREMENT_ASYNC3', incrementAsync3)
  35. }
  36. export default function* rootSaga() {
  37. yield all([
  38. helloSaga(),
  39. watchIncrementAsync(),
  40. watchIncrementAsync2()
  41. ])
  42. }
  43. // render
  44. function render() {
  45. ReactDOM.render(
  46. <Counter
  47. value={store.getState()}
  48. onIncrement={() => action('INCREMENT')}
  49. onDecrement={() => action('DECREMENT')}
  50. // onIncrementAsync={() => action('INCREMENT_ASYNC1')}
  51. // onIncrementAsync={() => action('INCREMENT_ASYNC2')}
  52. onIncrementAsync={() => action('INCREMENT_ASYNC3')} />,
  53. document.getElementById('root')
  54. )
  55. }

redux-saga的本质,集中处理副作用

只要是跟函数外部环境发生的交互就都属于副作用

包括但不限于

  • 发送一个 http 请求
  • 更改文件系统
  • 往数据库插入记录
  • 使用LocalStorage进行本地存储
  • 打印/log
  • 获取用户输入
  • DOM 查询
  • 访问系统状态

redux是一个同步的树形state系统,没有考虑异步的处理,把这些脏活交给了用户,redux-saga就是尽量保持redux的纯粹,自己接管异步脏活,让redux只维护state树

纯粹的state树

  1. reducers: {
  2. update: state => {
  3. state.value -= 1
  4. }
  5. }

掺杂了异步脏活的state树

  1. reducers: {
  2. update: state => {
  3. await api()
  4. // ...
  5. // ...
  6. // ...
  7. state.value -= 1
  8. }
  9. }
  1. // 可以当成一个handler
  2. function* incrementAsync() {
  3. yield delay(1000)
  4. yield put({ type: 'INCREMENT' })
  5. }
  6. function* watchIncrementAsync() {
  7. yield takeEvery('INCREMENT_ASYNC', incrementAsync)
  8. }
  9. channel = [
  10. {
  11. effect: takeEvery
  12. // 注册的type
  13. type: 'INCREMENT_ASYNC',
  14. cb: incrementAsync
  15. }
  16. ]

saga的优点

  • 更容易管理副作用
  • 程序更高效执行
  • 易于测试
  • 易于处理错误

javascript的Generator生成器

saga就是一个中间件,实现异步流程控制管理,异步流程管理,ES6特性Generator,调用next()方法推进流程执行,对生成器函数概念陌生的可以移步https://www.infoq.cn/article/es6-in-depth-iterators-and-the-for-of-loop/

  1. function* someSaga() {
  2. // yield 一个 promise 应该返回 promise resolve 的值
  3. const response = yield fetch('https://example.com/')
  4. // yield 一个 take effect 应该返回一个 Action
  5. const action = yield take('SOME_ACTION')
  6. // yield 一个 all effect 应该返回一个数组,该数组记录了 effect1 或 effect2 的执行结果
  7. const allResult = yield all([effect1, effect2])
  8. }
  9. someSaga()

在我们选定用生成器函数来控制异步流程后,怎么有序的,自动的触发next()很关键

for循环消费迭代器

for循环是一个迭代器, 本质上for循环是一个迭代器语法糖,底层调用iterator.next()实现迭代,但是功能有限,不能传参

  1. function* range(start, end) {
  2. for (let i = start; i < end; i++) {
  3. yield i
  4. }
  5. }
  6. for (let x of range(1, 10)) {
  7. console.log(x)
  8. }
  9. // 输出 1, 2, 3 ... 8, 9
  10. //

while (true)消费

同步操作,无法控制等待异步完成继续执行接下来的iterator.next()

  1. const iterator = range(1, 10)
  2. while (true) {
  3. const { done, value } = iterator.next(/* 我们可以决定这里的参数 */)
  4. if (done) {
  5. break
  6. }
  7. if (value === 5) {
  8. iterator.throw(new Error('5 is bad input'))
  9. }
  10. console.log(value)
  11. }
  12. // 输出 1, 2, 3, 4,然后抛出异常 '5 is bad input'

saga的解决方案-递归方法,next中嵌套next

  1. function* range2(start, end) {
  2. for (let i = start; i < end; i++) {
  3. const response = yield i
  4. console.log(`response of ${i} is ${response}`)
  5. }
  6. }
  7. const iterator = range2(1, 10)
  8. function next(arg, isErr) {
  9. // 注意驱动函数多了参数 arg 和 isErr
  10. let result
  11. if (isErr) {
  12. result = iterator.throw(arg)
  13. } else {
  14. // 这里我们将 arg 作为参数传递给 iterator.next,作为 effect-producer 中 yield 语句的返回值
  15. result = iterator.next(arg)
  16. }
  17. const { done, value } = result
  18. if (done) {
  19. return
  20. }
  21. console.log('getting:', value)
  22. if (value === 5) {
  23. // 将 isErr 置为 true,就能用递归的方式调用 iterator.throw 方法
  24. next(new Error('5 is bad input'), true)
  25. } else {
  26. // 延迟调用驱动函数;「响应」是「请求」的两倍
  27. setTimeout(() => next(value * 2), value * 1000)
  28. }
  29. }
  30. next()
  31. // 输出
  32. // getting: 1
  33. // response of 1 is 2
  34. // getting: 2
  35. // response of 2 is 4
  36. // getting: 3
  37. // response of 3 is 6
  38. // getting: 4
  39. // response of 4 is 8
  40. // getting: 5
  41. // Uncaught Error: 5 is bad input
  42. // 输出 getting: x 之后,输出会暂停一段时间

注册saga中间件

saga本质上是一个redux中间件,如果对redux不熟悉,请移步https://redux.js.org/

redux中间件的经典使用姿势,高阶函数

  1. function sagaMiddleware({ getState, dispatch }) {
  2. boundRunSaga = runSaga.bind(null, {
  3. ...options,
  4. context,
  5. channel,
  6. dispatch,
  7. getState,
  8. sagaMonitor,
  9. })
  10. return next => action => {
  11. if (sagaMonitor && sagaMonitor.actionDispatched) {
  12. sagaMonitor.actionDispatched(action)
  13. }
  14. const result = next(action) // hit reducers
  15. channel.put(action) // 通过saga处理逻辑
  16. return result
  17. }
  18. }

从这边可以看到一个action是有会经过两个通道,稍微注意action commit type的唯一性,基本上也不会有冲突,示例如下

  1. // saga
  2. const initialState: WorkflowState = {
  3. // 点击某一条记录时 推入该条记录
  4. approve: null,
  5. permission: null,
  6. records: [],
  7. userinfo: {},
  8. };
  9. export function makeFetchRecordsCommit(row: WorkflowRecord): RecordsUpdateCommit {
  10. return {
  11. type: WorkflowActionType.updateRecords,
  12. payload: row,
  13. };
  14. }
  15. export default {
  16. namespace: 'workflow',
  17. state: initialState,
  18. effects: {
  19. *fetchRecords(action: StringPayloadAction, { call, put }: EffectsCommandMap) {
  20. const formNo = action.payload;
  21. const res: Response = yield call(fetchRecordsByFormNo, formNo);
  22. if (!res.empty) {
  23. const records = res.content;
  24. yield put(makeFetchRecordsCommit(records));
  25. }
  26. },
  27. },
  28. reducers: {
  29. updateRecords(state: WorkflowState, { payload }: ApproveUpdateCommit) {
  30. return {
  31. ...state,
  32. records: payload,
  33. };
  34. },
  35. },
  36. };

启动saga注册

  1. sagaMiddleware.run = (...args) => {
  2. if (process.env.NODE_ENV !== 'production' && !boundRunSaga) {
  3. throw new Error('Before running a Saga, you must mount the Saga middleware on the Store using applyMiddleware')
  4. }
  5. return boundRunSaga(...args)
  6. }
  7. // store上注册
  8. const sagaMiddleware = createSagaMiddleware()
  9. const store = createStore(reducer, applyMiddleware(sagaMiddleware))
  10. sagaMiddleware.run(rootSaga)

runSaga

  1. export function runSaga(
  2. { channel = stdChannel(), dispatch, getState, context = {}, sagaMonitor, effectMiddlewares, onError = logError },
  3. saga,
  4. ...args
  5. ) {
  6. const iterator = saga(...args) // rootSaga执行获取了迭代器
  7. const env = {
  8. channel,
  9. dispatch: wrapSagaDispatch(dispatch),
  10. getState,
  11. sagaMonitor,
  12. onError,
  13. finalizeRunEffect,
  14. }
  15. // 开启task
  16. return immediately(() => {
  17. const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
  18. if (sagaMonitor) {
  19. sagaMonitor.effectResolved(effectId, task)
  20. }
  21. return task
  22. })
  23. }

redux-saga之take

相对最常用的api,捕捉未来的action并作出对应操作

  1. // rootSaga
  2. export function* rootSaga () {
  3. yield take(actions.FETCH_USERINFO)
  4. // 当dispatch action.type命中actions.FETCH_USERINFO
  5. // 执行下列逻辑
  6. // ... handlerTake
  7. handlerTake
  8. }
  • 第一步,执行rootSata,得到一个迭代器
  1. import { take } from "./internal/io"
  2. // rootSaga()
  3. const iterator = saga(...args)
  4. // 执行proc
  5. const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
  6. // 构造一个next方法 = currcb
  7. function next (arg, isErr) {
  8. try {
  9. let result = iterator.next(arg)
  10. if (!result.done) {
  11. digestEffect(result.value, parentEffectId, next)
  12. }
  13. } catch (error) {
  14. ...
  15. }
  16. }
  17. // 构造完next方法后直接调用
  18. next()
  19. // 执行 构造一个effect 本质上是一个纯对象
  20. take(actions.FETCH_USERINFO)
  21. // patternOrChannel = actions.FETCH_USERINFO
  22. makeEffect(effectTypes.TAKE, { pattern: patternOrChannel })
  23. // result = { value: ..., done: false }
  24. // payload = { pattern: patternOrChannel }
  25. value = {
  26. [IO]: true, // 标记 判断是否属于saga
  27. combinator: false,
  28. type, // effect类型 Take
  29. payload,
  30. }
  31. // result.done === false
  32. // 进入代码分支 digestEffect 且传递了next作为回调函数
  33. // cb = next
  34. function digestEffect(effect, parentEffectId, cb, label = '') {
  35. let effectSettled // 标记当前effect处理进度
  36. function currCb(res, isErr) {
  37. if (effectSettled) {
  38. return
  39. }
  40. effectSettled = true // 表示当前effect处理过了 适用于竞赛请求的情况
  41. cb(res, isErr)
  42. }
  43. // currCb = () => { next() } 包裹一层next
  44. finalRunEffect(effect, effectId, currCb)
  45. }
  46. // 开始执行runEffect
  47. function runEffect(effect, effectId, currCb) {
  48. if (is.promise(effect)) {
  49. // 非promise
  50. resolvePromise(effect, currCb)
  51. } else if (is.iterator(effect)) {
  52. // 这边当前实例不会进入
  53. proc(env, effect, task.context, effectId, meta, /* isRoot */ false, currCb)
  54. } else if (effect && effect[IO]) {
  55. // 符合当前分支
  56. const effectRunner = effectRunnerMap[effect.type]
  57. effectRunner(env, effect.payload, currCb, executingContext)
  58. } else {
  59. // 如果不是saga effect 照常进行 此时将会调用顶层next,rootSaga方法体将继续执行
  60. currCb(effect)
  61. }
  62. }
  63. // 执行effectRunner
  64. // 匹配到的是runTakeEffect take类型的runner
  65. function runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {
  66. // 构造回调函数 作为后续响应当前actions.FETCH_USERINFO要执行的注册handler
  67. const takeCb = input => {
  68. if (input instanceof Error) {
  69. cb(input, true)
  70. return
  71. }
  72. if (isEnd(input) && !maybe) {
  73. cb(TERMINATE)
  74. return
  75. }
  76. cb(input)
  77. }
  78. try {
  79. // 注册channel
  80. // 注意tabkeCb是包裹了一层上层传递下来的next 也就是说 程序卡在
  81. // 卡在准备执行handlerTake的地方
  82. // 因为需要iterator.next()才能继续执行handlerTake,暂时没有触发因子
  83. channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)
  84. // matcher处理通配符等情况
  85. } catch (err) {
  86. cb(err, true)
  87. return
  88. }
  89. }
  90. // channel.take
  91. take(cb, matcher = matchers.wildcard) {
  92. if (closed) {
  93. cb(END)
  94. return
  95. }
  96. cb[MATCH] = matcher // 正则生成 根据dispatch的action.type来匹配触发
  97. nextTakers.push(cb) // buffer数组保存cb 包含match信息,也就是actions.FETCH_USERINFO这类信息
  98. }
  99. // 至此saga take注册完成
  100. // 触发 在注册saga的地方提过,action.type会进入两个分支 一个是redux分支,一个是saga分支
  101. return next => action => {
  102. if (sagaMonitor && sagaMonitor.actionDispatched) {
  103. sagaMonitor.actionDispatched(action)
  104. }
  105. const result = next(action)
  106. channel.put(action) // action
  107. return result
  108. }
  109. // 看看put做了啥
  110. put(input) {
  111. if (closed) {
  112. return
  113. }
  114. if (isEnd(input)) {
  115. close()
  116. return
  117. }
  118. // 当前维护的所有actions相关takecb
  119. const takers = (currentTakers = nextTakers)
  120. for (let i = 0, len = takers.length; i < len; i++) {
  121. const taker = takers[i]
  122. // 找出正则匹配上的action type
  123. if (taker[MATCH](input)) {
  124. taker.cancel()
  125. // takecb = 顶层next() 触发iterator.next() 执行handlerTake
  126. taker(input)
  127. }
  128. }
  129. },

redux-saga之takeEvery

发现take有个问题,就是只能触发一个就不再执行,不使用与多次点击
不推荐的解决办法

  1. // rootSaga
  2. export function* rootSaga () {
  3. while (true) {
  4. yield take(actions.FETCH_USERINFO)
  5. // 当dispatch action.type命中actions.FETCH_USERINFO
  6. // 执行下列逻辑
  7. // ... handlerTake
  8. handlerTake
  9. }
  10. }

redux-saga提供了一个工具函数takeEvery,也就是说每次分发的actions.FETCH_USERINFO都能收集且触发继而执行handlerTake

  1. // rootSaga
  2. export function* rootSaga () {
  3. yield takeEvery(actions.FETCH_USERINFO)
  4. // 当dispatch action.type命中actions.FETCH_USERINFO
  5. // 执行下列逻辑
  6. // ... handlerTake
  7. handlerTake
  8. }

解析它的实现,本质上和while…true类似,消费完上一个effect后,生成一个新的take effect

  1. // rootSaga
  2. export function* rootSaga () {
  3. yield takeEvery(actions.FETCH_USERINFO)
  4. // 当dispatch action.type命中actions.FETCH_USERINFO
  5. // 执行下列逻辑
  6. // ... handlerTake
  7. handlerTake
  8. }

构造takeEvery

  1. // 工具方法 构造迭代器 核心是next方法
  2. export function makeIterator(next, thro = kThrow, name = 'iterator') {
  3. const iterator = { meta: { name }, next, throw: thro, return: kReturn, isSagaIterator: true }
  4. return iterator
  5. }
  6. // next方法编写
  7. export default function fsmIterator(fsm, startState, name) {
  8. let stateUpdater,
  9. errorState,
  10. effect,
  11. nextState = startState
  12. // 除非报错的情况 否则nextState在q1, q2间切换
  13. function next(arg, error) {
  14. if (nextState === qEnd) {
  15. return done(arg)
  16. }
  17. if (error && !errorState) {
  18. nextState = qEnd
  19. throw error
  20. } else {
  21. stateUpdater && stateUpdater(arg) // 状态流转方法
  22. const currentState = error ? fsm[errorState](error) : fsm[nextState]()
  23. ;({ nextState, effect, stateUpdater, errorState } = currentState)
  24. return nextState === qEnd ? done(arg) : effect
  25. }
  26. }
  27. return makeIterator(next, error => next(null, error), name)
  28. }
  29. export default function takeEvery(patternOrChannel, worker, ...args) {
  30. // 熟悉的地方 模拟take的处理结果 相当于使用take注册
  31. const yTake = { done: false, value: take(patternOrChannel) }
  32. // fork类型effect
  33. const yFork = ac => ({ done: false, value: fork(worker, ...args, ac) })
  34. // 切换q1, q2
  35. let action,
  36. setAction = ac => (action = ac)
  37. // 原来代码
  38. // return fsmIterator(
  39. // {
  40. // q1() {
  41. // return { nextState: 'q2', effect: yTake, stateUpdater: setAction }
  42. // },
  43. // q2() {
  44. // return { nextState: 'q1', effect: yFork(action) }
  45. // },
  46. // },
  47. // 'q1', // 初始状态q1
  48. // `takeEvery(${safeName(patternOrChannel)}, ${worker.name})`,
  49. // )
  50. // 搬运后代码 可以看到状态在q1,q2轮转
  51. const fsm = {
  52. q1() {
  53. return { nextState: 'q2', effect: yTake, stateUpdater: setAction }
  54. },
  55. q2() {
  56. return { nextState: 'q1', effect: yFork(action) }
  57. },
  58. },
  59. function next(arg, error) {
  60. if (nextState === qEnd) {
  61. return done(arg)
  62. }
  63. if (error && !errorState) {
  64. nextState = qEnd
  65. throw error
  66. } else {
  67. stateUpdater && stateUpdater(arg)
  68. const currentState = error ? fsm[errorState](error) : fsm[nextState]()
  69. ;({ nextState, effect, stateUpdater, errorState } = currentState)
  70. return nextState === qEnd ? done(arg) : effect
  71. }
  72. }
  73. }

用take类似的分析方法走一遍

  1. // rootSaga
  2. const iterator = saga(...args)
  3. // 开启proc
  4. const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
  5. // 构造进程next
  6. function next(arg, isErr) {
  7. try {
  8. let result = iterator.next(arg)
  9. if (!result.done) {
  10. digestEffect(result.value, parentEffectId, next)
  11. }
  12. }
  13. }
  14. // 构造完next 立即执行
  15. next()
  16. // iterator.next()得到
  17. // { nextState: 'q2', effect: yTake, stateUpdater: setAction }
  18. // 也就是
  19. function next(arg, error) {
  20. if (nextState === qEnd) {
  21. return done(arg)
  22. }
  23. if (error && !errorState) {
  24. nextState = qEnd
  25. throw error
  26. } else {
  27. stateUpdater && stateUpdater(arg)
  28. const currentState = error ? fsm[errorState](error) : fsm[nextState]()
  29. ;({ nextState, effect, stateUpdater, errorState } = currentState)
  30. // q1.effect
  31. return q1.effect
  32. }
  33. }
  34. // 得到一个take类型effect,参照上面take的处理,直到捕捉到一个actions.FETCH_USERINFO
  35. // 假设捕捉成功,此时执行最外层定义的proc的next中包裹的
  36. result = iterator.next(arg)
  37. // 得到
  38. result = { nextState: 'q1', effect: yFork(action) }
  39. // 因为
  40. result.done !== true
  41. // 继续执行
  42. digestEffect(result.value, .., proc-next)
  43. // 继续执行
  44. effectRunner(env, effect.payload, currCb, executingContext)
  45. // 判断出effect.type === effectTypes.fork
  46. function runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {
  47. // 类似rootSaga
  48. const taskIterator = createTaskIterator({ context, fn, args })
  49. const meta = getIteratorMetaInfo(taskIterator, fn)
  50. immediately(() => {
  51. // 开启一个proc,就类似根saga的处理方法 走一遍单个的take注册流程
  52. // 这个开启之后是另一个管理流程了 但是之前我们proc的收尾呢
  53. const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)
  54. if (detached) {
  55. cb(child)
  56. } else {
  57. if (child.isRunning()) {
  58. parent.queue.addTask(child)
  59. cb(child)
  60. } else if (child.isAborted()) {
  61. parent.queue.abort(child.error())
  62. } else {
  63. // cb收尾
  64. // 执行handleTake
  65. cb(child)
  66. }
  67. }
  68. })
  69. }

总结就是,开启生产一个take effect,消费一个的同时再次生产一个take effect待命下一次的action

redux-saga之call

前面类似take,最后的effectRunner不同

  1. function runCallEffect(env, { context, fn, args }, cb, { task }) {
  2. // catch synchronous failures; see #152
  3. try {
  4. const result = fn.apply(context, args)
  5. if (is.promise(result)) {
  6. // 如果是异步请求 result = then
  7. // 只有在then完成后,才会执行cb,也就是proc-next
  8. // 所以call是阻塞的
  9. resolvePromise(result, cb)
  10. return
  11. }
  12. if (is.iterator(result)) {
  13. // resolve iterator
  14. proc(env, result, task.context, currentEffectId, getMetaInfo(fn), /* isRoot */ false, cb)
  15. return
  16. }
  17. cb(result)
  18. } catch (error) {
  19. cb(error, true)
  20. }
  21. }

redux-saga之put

前面类似take,最后的effectRunner不同

  1. function runPutEffect(env, { channel, action, resolve }, cb) {
  2. // 重点是有调度器 调度器阻塞
  3. asap(() => {
  4. let result
  5. try {
  6. // 直接dispatch
  7. result = (channel ? channel.put : env.dispatch)(action)
  8. } catch (error) {
  9. cb(error, true)
  10. return
  11. }
  12. if (resolve && is.promise(result)) {
  13. resolvePromise(result, cb)
  14. } else {
  15. cb(result)
  16. }
  17. })
  18. }

为什么redux-saga需要调度器

源码阅读的时候对一个schedule文件充满好奇,没有了解到它的用处

假设我们有如下场景

  1. function* rootSaga() {
  2. // next0()
  3. yield fork(genA) // LINE-1
  4. // next1()
  5. yield fork(genB) // LINE-2
  6. }
  7. function* genA() {
  8. // nextA0()
  9. yield put({ type: 'A' })
  10. // nextA1()
  11. yield take('B')
  12. }
  13. function* genB() {
  14. // nextB0()
  15. yield take('A')
  16. // nextB1()
  17. yield put({ type: 'B' })
  18. }
  19. // 按照刚刚的分析,如果没有调度器
  20. // 执行next0(), fork,开启了一个新的proc
  21. // 执行genA, put为同步方法,立即执行 发出一个action
  22. // 此时还没有执行nextA1,没有执行genB,所以有可能遗失一个action 'A'
  23. // 注册完 takeB后,执行genB,分发 action 'B', 成功捕捉
  24. // 有遗失action是不能接受的

调度方案解决action遗失问题

  1. // 任务队列
  2. const queue = []
  3. // 锁
  4. let semaphore = 0
  5. // 执行任务
  6. function exec(task) {
  7. try {
  8. suspend()
  9. task()
  10. } finally {
  11. release()
  12. }
  13. }
  14. // 尽快执行任务
  15. export function asap(task) {
  16. // 压入任务栈
  17. queue.push(task)
  18. if (!semaphore) {
  19. suspend()
  20. flush()
  21. }
  22. }
  23. // 立即执行任务
  24. export function immediately(task) {
  25. try {
  26. suspend()
  27. return task()
  28. } finally {
  29. flush()
  30. }
  31. }
  32. // 上锁
  33. function suspend() {
  34. semaphore++
  35. }
  36. // 解锁
  37. function release() {
  38. semaphore--
  39. }
  40. function flush() {
  41. release()
  42. let task
  43. while (!semaphore && (task = queue.shift()) !== undefined) {
  44. exec(task)
  45. }
  46. }

重新分析上述例子

  1. // runSaga中的immediately 立即执行 rootSaga立即执行
  2. // task1
  3. return immediately(() => {
  4. const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
  5. if (sagaMonitor) {
  6. sagaMonitor.effectResolved(effectId, task)
  7. }
  8. return task
  9. })
  10. // forkA
  11. function runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {
  12. const taskIterator = createTaskIterator({ context, fn, args })
  13. const meta = getIteratorMetaInfo(taskIterator, fn)
  14. // 立即执行 task2
  15. immediately(() => {
  16. // procA
  17. const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)
  18. if (detached) {
  19. cb(child)
  20. } else {
  21. if (child.isRunning()) {
  22. parent.queue.addTask(child)
  23. cb(child)
  24. } else if (child.isAborted()) {
  25. parent.queue.abort(child.error())
  26. } else {
  27. cb(child)
  28. }
  29. }
  30. })
  31. }
  32. // genA put task3, 注意 此时是在procA中,此任务本身处于task2中,而立即执行的任务会上锁
  33. asap(() => {
  34. let result
  35. try {
  36. result = (channel ? channel.put : env.dispatch)(action)
  37. } catch (error) {
  38. cb(error, true)
  39. return
  40. }
  41. if (resolve && is.promise(result)) {
  42. resolvePromise(result, cb)
  43. } else {
  44. cb(result)
  45. }
  46. })
  47. // 此时put堵塞,不是put本身堵塞,是注册put,迭代器没有回调proc-next
  48. // 调起forkB,注册take 'B',没有启用调度器 用不着 注册不影响后续,而且越早越好
  49. // put B同样堵塞,调度器执行flush,put A执行,而后put B执行,
  50. // 两个put都是在take注册完成之后,不再有action遗失问题

总结一下 ,调度器本质就是,注册操作优先,触发操作靠后,理论上最好不要写这种交叉逻辑