实现 take、put

https://github.com/redux-saga/redux-saga/blob/master/packages/core/src/internal/runSaga.js

redux-saga/effectTypes.js

  1. export const TAKE = 'TAKE'; //监听
  2. export const PUT = 'PUT'; //派发

redux-saga/effects.js

  1. import * as effectTypes from './effectTypes';
  2. /** 监听某个动作,生成指令对象的工厂函数
  3. *
  4. * @param {*} actionType
  5. */
  6. export function take(actionType){
  7. // 这是一个用来发送给saga中间件的指令对象
  8. return {type: effectTypes.TAKE, actionType};
  9. }
  10. /** 派发一个真正的动作
  11. *
  12. * @param {*} action
  13. */
  14. export function put(action){
  15. return {type: effectTypes.PUT, action};
  16. }

redux-saga/index.js

  1. import runSaga from './runSaga'; //驱动根saga自动执行,类似co
  2. import stdChannel from './channel'; // 管道 channel 用于实现我们的发布订阅
  3. function createSagaMiddleware(){
  4. const channel = stdChannel();
  5. let boundRunSaga;
  6. // 创建 saga 中间件
  7. function sagaMiddleware({getState, dispatch}){
  8. boundRunSaga = runSaga.bind(null, {channel, dispatch, getState});
  9. return function(next){
  10. return function(action){
  11. const result = next(action);
  12. //把action也给channel一份,如果有channel订阅的动作派发,就会触发相应的监听函数
  13. channel.emit(action);
  14. return result;
  15. }
  16. }
  17. }
  18. sagaMiddleware.run = (saga) => boundRunSaga(saga);
  19. return sagaMiddleware;
  20. }
  21. export default createSagaMiddleware;

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个生成器,也可能是一个迭代器
  6. */
  7. function runSaga(env, saga){
  8. let {channel, dispatch, getState} = env;
  9. let it = saga();
  10. function next(val){
  11. let {value: effect, done} = it.next(val);
  12. if (!done){
  13. // 根据不同的effect进行不同的处理
  14. switch (effect.type){
  15. case effectTypes.TAKE:
  16. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  17. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next继续走
  18. // 如果没有,就暂停,阻塞在这里
  19. channel.once(effect.actionType, next);
  20. break;
  21. case effectTypes.PUT:
  22. // effect = {type: 'PUT', action: {type: 'ADD'}}
  23. // put 不阻塞,派发action之后就继续向下执行
  24. dispatch(effect.action);
  25. next(effect.action);
  26. default:
  27. break;
  28. }
  29. }
  30. }
  31. next();
  32. }
  33. export default runSaga;

redux-saga/channel.js

  1. function stdChannel(){
  2. let listeners = []; //缓存 监听函数
  3. /** once(在源码中名字叫take) 订阅,保存监听函数
  4. *
  5. * @param {*} actionType 等待的动作类型
  6. * @param {*} listener 等到之后,执行的监听函数
  7. */
  8. function once(actionType, listener){
  9. listener.actionType = actionType;
  10. // 设置取消监听函数,用于单次监听(即只触发一次,触发后自动移除)
  11. listener.cancel = () => {
  12. listeners = listeners.filter(item => item !== listener);
  13. }
  14. listeners.push(listener);
  15. }
  16. /** emit(在源码中名字叫put) 发布,执行监听函数
  17. *
  18. * @param {*} action 派发的动作
  19. */
  20. function emit(action){
  21. listeners.forEach(listener => {
  22. if (listener.actionType === action.type){
  23. // 先取消,再执行,这样就实现了多次发布也只会触发一次监听函数。
  24. // 类似于 eventEmitter.once (单次监听器,触发过一次后,就被自动移除了)
  25. listener.cancel();
  26. listener(action);
  27. }
  28. })
  29. }
  30. return {once, emit};
  31. }
  32. export default stdChannel;
  33. /** 使用示例
  34. let channel = stdChannel(); //先实例化
  35. // 订阅
  36. channel.on('ASYNC_ADD', action => {
  37. console.log('action=>', action);
  38. })
  39. // 发布。不管派发几次,都只会执行一次
  40. channel.emit({type: 'ASYNC_ADD', payload: {a: 1} });
  41. channel.emit({type: 'ASYNC_ADD', payload: {a: 2} });
  42. // action=> { type: 'ASYNC_ADD', payload: { a: 1 } }
  43. */

支持产出iterator

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  6. */
  7. function runSaga(env, saga){
  8. let {channel, dispatch, getState} = env;
  9. + let it = typeof saga === 'function' ? saga() : saga;
  10. function next(val){
  11. let {value: effect, done} = it.next(val);
  12. if (!done){
  13. + // 如果 effect产出 是 Generator 迭代器
  14. + if (typeof effect[Symbol.iterator] === 'function'){
  15. + // 开始一个子进程执行 worker saga
  16. + // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  17. + runSaga(env, effect);
  18. + next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  19. + break;
  20. + } else {
  21. // 根据不同的effect进行不同的处理
  22. switch (effect.type){
  23. case effectTypes.TAKE:
  24. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  25. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  26. // 如果没有,就暂停,阻塞在这里
  27. channel.once(effect.actionType, next);
  28. break;
  29. case effectTypes.PUT:
  30. // effect = {type: 'PUT', action: {type: 'ADD'}}
  31. // put 派发action之后就继续向下执行
  32. dispatch(effect.action);
  33. next(effect.action);
  34. default:
  35. break;
  36. }
  37. }
  38. }
  39. }
  40. next();
  41. }
  42. export default runSaga;

实现 takeEvery

PS: while 遇到yield 不会阻塞主进程

  1. function* gen(){
  2. while (true){
  3. yield 1;
  4. yield 2;
  5. }
  6. }
  7. setInterval(() => {
  8. console.log(new Date());
  9. }, 1000);
  10. let it = gen();
  11. it.next();
  12. it.next();
  13. it.next();
  14. // 时间依然输出,说明 这里的while不会阻塞主进程。因为它遇到yield,会放弃执行权

redux-saga/effectTypes.js

  1. export const TAKE = 'TAKE'; //监听
  2. export const PUT = 'PUT'; //派发
  3. + export const FORK = 'FORK'; //fork出一个子进程

redux-saga/effects.js

  1. /** 开启一个子进程运行saga,为了不阻塞当前的进程
  2. *
  3. * @param {*} saga Generator生成器
  4. * @param {*} args 额外参数
  5. */
  6. export function fork(saga, ...args){
  7. return {type: effectTypes.FORK, saga, args};
  8. }
  9. /** 当监听到某个动作类型时,开启新的子进程执行saga
  10. *
  11. * @param {*} actionType 动作类型
  12. * @param {*} saga
  13. */
  14. export function takeEvery(actionType, saga){
  15. function* takeEveryHelper(){
  16. // while 不会阻塞主进程,因为它遇到 yield后,就会放弃执行权。
  17. while (true){
  18. const action = yield take(actionType); //监听动作派发
  19. // yield fork(saga, action); //开启一个新的子进程执行saga
  20. yield fork(function* (){
  21. yield saga(action);
  22. })
  23. }
  24. }
  25. return fork(takeEveryHelper);
  26. }

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  6. +* @param {*} args 额外参数
  7. */
  8. + function runSaga(env, saga, ...args){
  9. let {channel, dispatch, getState} = env;
  10. + let it = typeof saga === 'function' ? saga() : saga;
  11. function next(val){
  12. let {value: effect, done} = it.next(val);
  13. if (!done){
  14. // 如果 effect产出 是 Generator 迭代器
  15. if (typeof effect[Symbol.iterator] === 'function'){
  16. // 开始一个子进程执行 worker saga
  17. // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  18. runSaga(env, effect);
  19. next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  20. } else {
  21. // 根据不同的effect进行不同的处理
  22. switch (effect.type){
  23. case effectTypes.TAKE:
  24. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  25. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  26. // 如果没有,就暂停,阻塞在这里
  27. channel.once(effect.actionType, next);
  28. break;
  29. case effectTypes.PUT:
  30. // effect = {type: 'PUT', action: {type: 'ADD'}}
  31. // put 派发action之后就继续向下执行
  32. dispatch(effect.action);
  33. next(effect.action);
  34. break;
  35. + case effectTypes.FORK:
  36. + runSaga(env, effect.saga, ...effect.args);
  37. + next();
  38. + break;
  39. default:
  40. break;
  41. }
  42. }
  43. }
  44. }
  45. next();
  46. }
  47. export default runSaga;

支持Promise

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  6. * @param {*} args 额外参数
  7. */
  8. function runSaga(env, saga, ...args){
  9. let {channel, dispatch, getState} = env;
  10. let it = typeof saga === 'function' ? saga() : saga;
  11. function next(val){
  12. let {value: effect, done} = it.next(val);
  13. if (!done){
  14. // 如果 effect产出 是 Generator 迭代器
  15. if (typeof effect[Symbol.iterator] === 'function'){
  16. // 开始一个子进程执行 worker saga
  17. // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  18. runSaga(env, effect);
  19. next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  20. + } else if (typeof effect.then === 'function'){
  21. + effect.then(next);
  22. + } else{
  23. // 根据不同的effect进行不同的处理
  24. switch (effect.type){
  25. case effectTypes.TAKE:
  26. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  27. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  28. // 如果没有,就暂停,阻塞在这里
  29. channel.once(effect.actionType, next);
  30. break;
  31. case effectTypes.PUT:
  32. // effect = {type: 'PUT', action: {type: 'ADD'}}
  33. // put 不阻塞,派发action之后就继续向下执行
  34. dispatch(effect.action);
  35. next(effect.action);
  36. break;
  37. case effectTypes.FORK:
  38. runSaga(env, effect.saga, ...effect.args);
  39. next();
  40. break;
  41. default:
  42. break;
  43. }
  44. }
  45. }
  46. }
  47. next();
  48. }
  49. export default runSaga;

实现 call、delay

redux-saga/effectTypes.js

  1. export const TAKE = 'TAKE'; //监听
  2. export const PUT = 'PUT'; //派发
  3. export const FORK = 'FORK'; //fork出一个子进程
  4. + export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn

redux-saga/effects.js

  1. /** 处理一个 返回值为Promise的函数fn
  2. *
  3. * @param {*} fn 函数
  4. * @param {...args} args 函数参数
  5. */
  6. export function call(fn, ...args){
  7. return {type: effectTypes.CALL, fn, args};
  8. }
  9. // 延迟
  10. function delayP(ms, val=true){
  11. const promise = new Promise(resolve => {
  12. setTimeout(resolve, ms, val);
  13. })
  14. return promise;
  15. }
  16. export const delay = call.bind(null, delayP);
  17. // delay(fn, 1000) => {type: 'CALL', fn:delayP, args} args=[1000]

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  6. * @param {*} args 额外参数
  7. */
  8. function runSaga(env, saga, ...args){
  9. let {channel, dispatch, getState} = env;
  10. let it = typeof saga === 'function' ? saga() : saga;
  11. function next(val){
  12. let {value: effect, done} = it.next(val);
  13. if (!done){
  14. // 如果 effect产出 是 Generator 迭代器
  15. if (typeof effect[Symbol.iterator] === 'function'){
  16. // 开始一个子进程执行 worker saga
  17. // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  18. runSaga(env, effect);
  19. next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  20. } else if (typeof effect.then === 'function'){
  21. effect.then(next);
  22. } else{
  23. // 根据不同的effect进行不同的处理
  24. switch (effect.type){
  25. case effectTypes.TAKE:
  26. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  27. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  28. // 如果没有,就暂停,阻塞在这里
  29. channel.once(effect.actionType, next);
  30. break;
  31. case effectTypes.PUT:
  32. // effect = {type: 'PUT', action: {type: 'ADD'}}
  33. // put 不阻塞,派发action之后就继续向下执行
  34. dispatch(effect.action);
  35. next(effect.action);
  36. break;
  37. case effectTypes.FORK:
  38. runSaga(env, effect.saga, ...effect.args);
  39. next();
  40. break;
  41. + case effectTypes.CALL:
  42. + effect.fn(...effect.args).then(next);
  43. + break;
  44. default:
  45. break;
  46. }
  47. }
  48. }
  49. }
  50. next();
  51. }
  52. export default runSaga;

实现 cps

redux-saga/effectTypes.js

  1. export const TAKE = 'TAKE'; //监听
  2. export const PUT = 'PUT'; //派发
  3. export const FORK = 'FORK'; //fork出一个子进程
  4. export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
  5. + export const CPS = 'CPS'; //处理一个 回调形式的函数

redux-saga/effects.js

  1. /** 处理一个 回调形式的函数
  2. *
  3. * @param {*} fn 函数
  4. * @param {...args} args 函数参数
  5. */
  6. export function cps(fn, ...args){
  7. return {type: effectTypes.CPS, fn, args};
  8. }

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  6. * @param {*} args 额外参数
  7. */
  8. function runSaga(env, saga, ...args){
  9. let {channel, dispatch, getState} = env;
  10. let it = typeof saga === 'function' ? saga() : saga;
  11. + function next(val, isError){
  12. + let result;
  13. + if (isError){
  14. + result = it.throw(val); //立刻停止saga,表示出错了
  15. + } else {
  16. + it.next(val);
  17. + }
  18. + let {value: effect, done} = result;
  19. if (!done){
  20. // 如果 effect产出 是 Generator 迭代器
  21. if (typeof effect[Symbol.iterator] === 'function'){
  22. // 开始一个子进程执行 worker saga
  23. // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  24. runSaga(env, effect);
  25. next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  26. } else if (typeof effect.then === 'function'){
  27. effect.then(next);
  28. } else{
  29. // 根据不同的effect进行不同的处理
  30. switch (effect.type){
  31. case effectTypes.TAKE:
  32. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  33. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  34. // 如果没有,就暂停,阻塞在这里
  35. channel.once(effect.actionType, next);
  36. break;
  37. case effectTypes.PUT:
  38. // effect = {type: 'PUT', action: {type: 'ADD'}}
  39. // put 不阻塞,派发action之后就继续向下执行
  40. dispatch(effect.action);
  41. next(effect.action);
  42. break;
  43. case effectTypes.FORK:
  44. runSaga(env, effect.saga, ...effect.args);
  45. next();
  46. break;
  47. case effectTypes.CALL:
  48. effect.fn(...effect.args).then(next);
  49. break;
  50. + case effectTypes.CPS:
  51. + effect.fn(...effect.args, (err, data) => {
  52. + err ? next(err, true) : next(data);
  53. + });
  54. + break;
  55. default:
  56. break;
  57. }
  58. }
  59. }
  60. }
  61. next();
  62. }
  63. export default runSaga;

实现 all

redux-saga/effectTypes.js

  1. export const TAKE = 'TAKE'; //监听
  2. export const PUT = 'PUT'; //派发
  3. export const FORK = 'FORK'; //fork出一个子进程
  4. export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
  5. export const CPS = 'CPS'; //处理一个 回调形式的函数
  6. + export const ALL = 'ALL';

redux-saga/effects.js

  1. /**
  2. *
  3. * @param {*} effects 数组,里面放置一个个的 监听saga迭代器
  4. */
  5. export function all(effects){
  6. return {type: effectTypes.ALL, effects};
  7. }

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. /** runSaga
  3. *
  4. * @param {*} env {channel, dispatch, getState}
  5. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  6. * @param {*} args 额外参数
  7. */
  8. function runSaga(env, saga, ...args){
  9. let {channel, dispatch, getState} = env;
  10. let it = typeof saga === 'function' ? saga() : saga;
  11. function next(val, isError){
  12. let result = isError ? it.throw(val) : it.next(val);
  13. let {value: effect, done} = result;
  14. if (!done){
  15. // 如果 effect产出 是 Generator 迭代器
  16. if (typeof effect[Symbol.iterator] === 'function'){
  17. // 开始一个子进程执行 worker saga
  18. // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  19. runSaga(env, effect);
  20. next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  21. } else if (typeof effect.then === 'function'){
  22. effect.then(next);
  23. } else{
  24. // 根据不同的effect进行不同的处理
  25. switch (effect.type){
  26. case effectTypes.TAKE:
  27. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  28. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  29. // 如果没有,就暂停,阻塞在这里
  30. channel.once(effect.actionType, next);
  31. break;
  32. case effectTypes.PUT:
  33. // effect = {type: 'PUT', action: {type: 'ADD'}}
  34. // put 不阻塞,派发action之后就继续向下执行
  35. dispatch(effect.action);
  36. next(effect.action);
  37. break;
  38. case effectTypes.FORK:
  39. runSaga(env, effect.saga, ...effect.args);
  40. next();
  41. break;
  42. case effectTypes.CALL:
  43. effect.fn(...effect.args).then(next);
  44. break;
  45. case effectTypes.CPS:
  46. effect.fn(...effect.args, (err, data) => {
  47. err ? next(err, true) : next(data);
  48. });
  49. break;
  50. + case effectTypes.ALL:
  51. + let effects = effect.effects;
  52. + let result = []; //存放结果的数组
  53. + let completeCount = 0; //完成任务的数量
  54. + effects.forEach((it, index) => {
  55. + runSaga(env, it, (itResult) => {
  56. + result[index] = itResult;
  57. + if (++completeCount === effects.length){
  58. + next(result); //都完了之后,可以让当前的saga继续执行
  59. + }
  60. + });
  61. + })
  62. + break;
  63. default:
  64. break;
  65. }
  66. }
  67. + } else {
  68. + if (args && typeof args[0] === 'function'){
  69. + args[0](effect);
  70. + }
  71. + }
  72. }
  73. next();
  74. }
  75. export default runSaga;

实现 cancel

如果父saga取消,子saga什么状态?现在这个版本里是保持原状 但其实应该是你把父saga取消了,所有的子saga也应该取消会比较好。

redux-saga/symbols.js

  1. export const TASK_CANCEL = Symbol('TASK_CANCEL');

redux-saga/effectTypes.js

  1. export const TAKE = 'TAKE'; //监听
  2. export const PUT = 'PUT'; //派发
  3. export const FORK = 'FORK'; //fork出一个子进程
  4. export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
  5. export const CPS = 'CPS'; //处理一个 回调形式的函数
  6. export const ALL = 'ALL';
  7. export const CANCEL = 'CANCEL'; //取消一个saga任务

redux-saga/effects.js

  1. export function cancel(task){
  2. return {type: effectTypes.CANCEL, task};
  3. }

redux-saga/runSaga.js

  1. import * as effectTypes from './effectTypes';
  2. + import {TASK_CANCEL} from './symbols';
  3. /** runSaga
  4. *
  5. * @param {*} env {channel, dispatch, getState}
  6. * @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
  7. * @param {*} args 额外参数
  8. */
  9. function runSaga(env, saga, ...args){
  10. + let subTasks = []; //子saga的task任务对象存放数组
  11. + // 每当执行runSaga的时候,会创建一个task任务对象
  12. + let task = {cancel: () => next(TASK_CANCEL)};
  13. let {channel, dispatch, getState} = env;
  14. let it = typeof saga === 'function' ? saga() : saga;
  15. function next(val, isError){
  16. let result;
  17. if (isError){
  18. result = it.throw(val); //立刻停止saga,表示出错了
  19. + } else if (val === TASK_CANCEL){
  20. + subTasks.forEach(subTask => subTask.cancel()); //如果父saga取消了,它的所有子saga也得取消
  21. + result = it.return(val); //立刻停止saga,并返回 {value: val, done: true}
  22. } else {
  23. result = it.next(val);
  24. }
  25. let {value: effect, done} = result;
  26. if (!done){
  27. // 如果 effect产出 是 Generator 迭代器
  28. if (typeof effect[Symbol.iterator] === 'function'){
  29. // 开始一个子进程执行 worker saga
  30. // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
  31. + let iteratorTask = runSaga(env, effect);
  32. + subTasks.push(iteratorTask);
  33. next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
  34. } else if (typeof effect.then === 'function'){
  35. effect.then(next);
  36. } else{
  37. // 根据不同的effect进行不同的处理
  38. switch (effect.type){
  39. case effectTypes.TAKE:
  40. // effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
  41. // 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
  42. // 如果没有,就暂停,阻塞在这里
  43. channel.once(effect.actionType, next);
  44. break;
  45. case effectTypes.PUT:
  46. // effect = {type: 'PUT', action: {type: 'ADD'}}
  47. // put 不阻塞,派发action之后就继续向下执行
  48. dispatch(effect.action);
  49. next(effect.action);
  50. break;
  51. case effectTypes.FORK:
  52. + let forkTask = runSaga(env, effect.saga, ...effect.args);
  53. + subTasks.push(forkTask);
  54. + next(forkTask);
  55. break;
  56. case effectTypes.CALL:
  57. effect.fn(...effect.args).then(next);
  58. break;
  59. case effectTypes.CPS:
  60. effect.fn(...effect.args, (err, data) => {
  61. err ? next(err, true) : next(data);
  62. });
  63. break;
  64. case effectTypes.ALL:
  65. let effects = effect.effects;
  66. let result = []; //存放结果的数组
  67. let completeCount = 0; //完成任务的数量
  68. effects.forEach((it, index) => {
  69. + let subTask = runSaga(env, it, (itResult) => {
  70. result[index] = itResult;
  71. if (++completeCount === effects.length){
  72. next(result); //都完了之后,可以让当前的saga继续执行
  73. }
  74. });
  75. + subTasks.push(subTask);
  76. })
  77. break;
  78. + case effectTypes.CANCEL:
  79. + effect.task.cancel();
  80. + next();
  81. + break;
  82. default:
  83. break;
  84. }
  85. }
  86. } else {
  87. if (args && typeof args[0] === 'function'){
  88. args[0](effect);
  89. }
  90. }
  91. }
  92. next();
  93. + return task;
  94. }
  95. export default runSaga;