实现 take、put
https://github.com/redux-saga/redux-saga/blob/master/packages/core/src/internal/runSaga.js
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听export const PUT = 'PUT'; //派发
redux-saga/effects.js
import * as effectTypes from './effectTypes';/** 监听某个动作,生成指令对象的工厂函数** @param {*} actionType*/export function take(actionType){// 这是一个用来发送给saga中间件的指令对象return {type: effectTypes.TAKE, actionType};}/** 派发一个真正的动作** @param {*} action*/export function put(action){return {type: effectTypes.PUT, action};}
redux-saga/index.js
import runSaga from './runSaga'; //驱动根saga自动执行,类似coimport stdChannel from './channel'; // 管道 channel 用于实现我们的发布订阅function createSagaMiddleware(){const channel = stdChannel();let boundRunSaga;// 创建 saga 中间件function sagaMiddleware({getState, dispatch}){boundRunSaga = runSaga.bind(null, {channel, dispatch, getState});return function(next){return function(action){const result = next(action);//把action也给channel一份,如果有channel订阅的动作派发,就会触发相应的监听函数channel.emit(action);return result;}}}sagaMiddleware.run = (saga) => boundRunSaga(saga);return sagaMiddleware;}export default createSagaMiddleware;
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个生成器,也可能是一个迭代器*/function runSaga(env, saga){let {channel, dispatch, getState} = env;let it = saga();function next(val){let {value: effect, done} = it.next(val);if (!done){// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next继续走// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 不阻塞,派发action之后就继续向下执行dispatch(effect.action);next(effect.action);default:break;}}}next();}export default runSaga;
redux-saga/channel.js
function stdChannel(){let listeners = []; //缓存 监听函数/** once(在源码中名字叫take) 订阅,保存监听函数** @param {*} actionType 等待的动作类型* @param {*} listener 等到之后,执行的监听函数*/function once(actionType, listener){listener.actionType = actionType;// 设置取消监听函数,用于单次监听(即只触发一次,触发后自动移除)listener.cancel = () => {listeners = listeners.filter(item => item !== listener);}listeners.push(listener);}/** emit(在源码中名字叫put) 发布,执行监听函数** @param {*} action 派发的动作*/function emit(action){listeners.forEach(listener => {if (listener.actionType === action.type){// 先取消,再执行,这样就实现了多次发布也只会触发一次监听函数。// 类似于 eventEmitter.once (单次监听器,触发过一次后,就被自动移除了)listener.cancel();listener(action);}})}return {once, emit};}export default stdChannel;/** 使用示例let channel = stdChannel(); //先实例化// 订阅channel.on('ASYNC_ADD', action => {console.log('action=>', action);})// 发布。不管派发几次,都只会执行一次channel.emit({type: 'ASYNC_ADD', payload: {a: 1} });channel.emit({type: 'ASYNC_ADD', payload: {a: 2} });// action=> { type: 'ASYNC_ADD', payload: { a: 1 } }*/
支持产出iterator
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器*/function runSaga(env, saga){let {channel, dispatch, getState} = env;+ let it = typeof saga === 'function' ? saga() : saga;function next(val){let {value: effect, done} = it.next(val);if (!done){+ // 如果 effect产出 是 Generator 迭代器+ if (typeof effect[Symbol.iterator] === 'function'){+ // 开始一个子进程执行 worker saga+ // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。+ runSaga(env, effect);+ next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)+ break;+ } else {// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 派发action之后就继续向下执行dispatch(effect.action);next(effect.action);default:break;}}}}next();}export default runSaga;
实现 takeEvery
PS: while 遇到yield 不会阻塞主进程
function* gen(){while (true){yield 1;yield 2;}}setInterval(() => {console.log(new Date());}, 1000);let it = gen();it.next();it.next();it.next();// 时间依然输出,说明 这里的while不会阻塞主进程。因为它遇到yield,会放弃执行权
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听export const PUT = 'PUT'; //派发+ export const FORK = 'FORK'; //fork出一个子进程
redux-saga/effects.js
/** 开启一个子进程运行saga,为了不阻塞当前的进程** @param {*} saga Generator生成器* @param {*} args 额外参数*/export function fork(saga, ...args){return {type: effectTypes.FORK, saga, args};}/** 当监听到某个动作类型时,开启新的子进程执行saga** @param {*} actionType 动作类型* @param {*} saga*/export function takeEvery(actionType, saga){function* takeEveryHelper(){// while 不会阻塞主进程,因为它遇到 yield后,就会放弃执行权。while (true){const action = yield take(actionType); //监听动作派发// yield fork(saga, action); //开启一个新的子进程执行sagayield fork(function* (){yield saga(action);})}}return fork(takeEveryHelper);}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器+* @param {*} args 额外参数*/+ function runSaga(env, saga, ...args){let {channel, dispatch, getState} = env;+ let it = typeof saga === 'function' ? saga() : saga;function next(val){let {value: effect, done} = it.next(val);if (!done){// 如果 effect产出 是 Generator 迭代器if (typeof effect[Symbol.iterator] === 'function'){// 开始一个子进程执行 worker saga// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。runSaga(env, effect);next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)} else {// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 派发action之后就继续向下执行dispatch(effect.action);next(effect.action);break;+ case effectTypes.FORK:+ runSaga(env, effect.saga, ...effect.args);+ next();+ break;default:break;}}}}next();}export default runSaga;
支持Promise
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器* @param {*} args 额外参数*/function runSaga(env, saga, ...args){let {channel, dispatch, getState} = env;let it = typeof saga === 'function' ? saga() : saga;function next(val){let {value: effect, done} = it.next(val);if (!done){// 如果 effect产出 是 Generator 迭代器if (typeof effect[Symbol.iterator] === 'function'){// 开始一个子进程执行 worker saga// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。runSaga(env, effect);next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)+ } else if (typeof effect.then === 'function'){+ effect.then(next);+ } else{// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 不阻塞,派发action之后就继续向下执行dispatch(effect.action);next(effect.action);break;case effectTypes.FORK:runSaga(env, effect.saga, ...effect.args);next();break;default:break;}}}}next();}export default runSaga;
实现 call、delay
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听export const PUT = 'PUT'; //派发export const FORK = 'FORK'; //fork出一个子进程+ export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
redux-saga/effects.js
/** 处理一个 返回值为Promise的函数fn** @param {*} fn 函数* @param {...args} args 函数参数*/export function call(fn, ...args){return {type: effectTypes.CALL, fn, args};}// 延迟function delayP(ms, val=true){const promise = new Promise(resolve => {setTimeout(resolve, ms, val);})return promise;}export const delay = call.bind(null, delayP);// delay(fn, 1000) => {type: 'CALL', fn:delayP, args} args=[1000]
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器* @param {*} args 额外参数*/function runSaga(env, saga, ...args){let {channel, dispatch, getState} = env;let it = typeof saga === 'function' ? saga() : saga;function next(val){let {value: effect, done} = it.next(val);if (!done){// 如果 effect产出 是 Generator 迭代器if (typeof effect[Symbol.iterator] === 'function'){// 开始一个子进程执行 worker saga// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。runSaga(env, effect);next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)} else if (typeof effect.then === 'function'){effect.then(next);} else{// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 不阻塞,派发action之后就继续向下执行dispatch(effect.action);next(effect.action);break;case effectTypes.FORK:runSaga(env, effect.saga, ...effect.args);next();break;+ case effectTypes.CALL:+ effect.fn(...effect.args).then(next);+ break;default:break;}}}}next();}export default runSaga;
实现 cps
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听export const PUT = 'PUT'; //派发export const FORK = 'FORK'; //fork出一个子进程export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn+ export const CPS = 'CPS'; //处理一个 回调形式的函数
redux-saga/effects.js
/** 处理一个 回调形式的函数** @param {*} fn 函数* @param {...args} args 函数参数*/export function cps(fn, ...args){return {type: effectTypes.CPS, fn, args};}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器* @param {*} args 额外参数*/function runSaga(env, saga, ...args){let {channel, dispatch, getState} = env;let it = typeof saga === 'function' ? saga() : saga;+ function next(val, isError){+ let result;+ if (isError){+ result = it.throw(val); //立刻停止saga,表示出错了+ } else {+ it.next(val);+ }+ let {value: effect, done} = result;if (!done){// 如果 effect产出 是 Generator 迭代器if (typeof effect[Symbol.iterator] === 'function'){// 开始一个子进程执行 worker saga// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。runSaga(env, effect);next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)} else if (typeof effect.then === 'function'){effect.then(next);} else{// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 不阻塞,派发action之后就继续向下执行dispatch(effect.action);next(effect.action);break;case effectTypes.FORK:runSaga(env, effect.saga, ...effect.args);next();break;case effectTypes.CALL:effect.fn(...effect.args).then(next);break;+ case effectTypes.CPS:+ effect.fn(...effect.args, (err, data) => {+ err ? next(err, true) : next(data);+ });+ break;default:break;}}}}next();}export default runSaga;
实现 all
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听export const PUT = 'PUT'; //派发export const FORK = 'FORK'; //fork出一个子进程export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fnexport const CPS = 'CPS'; //处理一个 回调形式的函数+ export const ALL = 'ALL';
redux-saga/effects.js
/**** @param {*} effects 数组,里面放置一个个的 监听saga迭代器*/export function all(effects){return {type: effectTypes.ALL, effects};}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器* @param {*} args 额外参数*/function runSaga(env, saga, ...args){let {channel, dispatch, getState} = env;let it = typeof saga === 'function' ? saga() : saga;function next(val, isError){let result = isError ? it.throw(val) : it.next(val);let {value: effect, done} = result;if (!done){// 如果 effect产出 是 Generator 迭代器if (typeof effect[Symbol.iterator] === 'function'){// 开始一个子进程执行 worker saga// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。runSaga(env, effect);next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)} else if (typeof effect.then === 'function'){effect.then(next);} else{// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 不阻塞,派发action之后就继续向下执行dispatch(effect.action);next(effect.action);break;case effectTypes.FORK:runSaga(env, effect.saga, ...effect.args);next();break;case effectTypes.CALL:effect.fn(...effect.args).then(next);break;case effectTypes.CPS:effect.fn(...effect.args, (err, data) => {err ? next(err, true) : next(data);});break;+ case effectTypes.ALL:+ let effects = effect.effects;+ let result = []; //存放结果的数组+ let completeCount = 0; //完成任务的数量+ effects.forEach((it, index) => {+ runSaga(env, it, (itResult) => {+ result[index] = itResult;+ if (++completeCount === effects.length){+ next(result); //都完了之后,可以让当前的saga继续执行+ }+ });+ })+ break;default:break;}}+ } else {+ if (args && typeof args[0] === 'function'){+ args[0](effect);+ }+ }}next();}export default runSaga;
实现 cancel
如果父saga取消,子saga什么状态?现在这个版本里是保持原状 但其实应该是你把父saga取消了,所有的子saga也应该取消会比较好。
redux-saga/symbols.js
export const TASK_CANCEL = Symbol('TASK_CANCEL');
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听export const PUT = 'PUT'; //派发export const FORK = 'FORK'; //fork出一个子进程export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fnexport const CPS = 'CPS'; //处理一个 回调形式的函数export const ALL = 'ALL';export const CANCEL = 'CANCEL'; //取消一个saga任务
redux-saga/effects.js
export function cancel(task){return {type: effectTypes.CANCEL, task};}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';+ import {TASK_CANCEL} from './symbols';/** runSaga** @param {*} env {channel, dispatch, getState}* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器* @param {*} args 额外参数*/function runSaga(env, saga, ...args){+ let subTasks = []; //子saga的task任务对象存放数组+ // 每当执行runSaga的时候,会创建一个task任务对象+ let task = {cancel: () => next(TASK_CANCEL)};let {channel, dispatch, getState} = env;let it = typeof saga === 'function' ? saga() : saga;function next(val, isError){let result;if (isError){result = it.throw(val); //立刻停止saga,表示出错了+ } else if (val === TASK_CANCEL){+ subTasks.forEach(subTask => subTask.cancel()); //如果父saga取消了,它的所有子saga也得取消+ result = it.return(val); //立刻停止saga,并返回 {value: val, done: true}} else {result = it.next(val);}let {value: effect, done} = result;if (!done){// 如果 effect产出 是 Generator 迭代器if (typeof effect[Symbol.iterator] === 'function'){// 开始一个子进程执行 worker saga// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。+ let iteratorTask = runSaga(env, effect);+ subTasks.push(iteratorTask);next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)} else if (typeof effect.then === 'function'){effect.then(next);} else{// 根据不同的effect进行不同的处理switch (effect.type){case effectTypes.TAKE:// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;// 如果没有,就暂停,阻塞在这里channel.once(effect.actionType, next);break;case effectTypes.PUT:// effect = {type: 'PUT', action: {type: 'ADD'}}// put 不阻塞,派发action之后就继续向下执行dispatch(effect.action);next(effect.action);break;case effectTypes.FORK:+ let forkTask = runSaga(env, effect.saga, ...effect.args);+ subTasks.push(forkTask);+ next(forkTask);break;case effectTypes.CALL:effect.fn(...effect.args).then(next);break;case effectTypes.CPS:effect.fn(...effect.args, (err, data) => {err ? next(err, true) : next(data);});break;case effectTypes.ALL:let effects = effect.effects;let result = []; //存放结果的数组let completeCount = 0; //完成任务的数量effects.forEach((it, index) => {+ let subTask = runSaga(env, it, (itResult) => {result[index] = itResult;if (++completeCount === effects.length){next(result); //都完了之后,可以让当前的saga继续执行}});+ subTasks.push(subTask);})break;+ case effectTypes.CANCEL:+ effect.task.cancel();+ next();+ break;default:break;}}} else {if (args && typeof args[0] === 'function'){args[0](effect);}}}next();+ return task;}export default runSaga;
