实现 take、put
https://github.com/redux-saga/redux-saga/blob/master/packages/core/src/internal/runSaga.js
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听
export const PUT = 'PUT'; //派发
redux-saga/effects.js
import * as effectTypes from './effectTypes';
/** 监听某个动作,生成指令对象的工厂函数
*
* @param {*} actionType
*/
export function take(actionType){
// 这是一个用来发送给saga中间件的指令对象
return {type: effectTypes.TAKE, actionType};
}
/** 派发一个真正的动作
*
* @param {*} action
*/
export function put(action){
return {type: effectTypes.PUT, action};
}
redux-saga/index.js
import runSaga from './runSaga'; //驱动根saga自动执行,类似co
import stdChannel from './channel'; // 管道 channel 用于实现我们的发布订阅
function createSagaMiddleware(){
const channel = stdChannel();
let boundRunSaga;
// 创建 saga 中间件
function sagaMiddleware({getState, dispatch}){
boundRunSaga = runSaga.bind(null, {channel, dispatch, getState});
return function(next){
return function(action){
const result = next(action);
//把action也给channel一份,如果有channel订阅的动作派发,就会触发相应的监听函数
channel.emit(action);
return result;
}
}
}
sagaMiddleware.run = (saga) => boundRunSaga(saga);
return sagaMiddleware;
}
export default createSagaMiddleware;
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个生成器,也可能是一个迭代器
*/
function runSaga(env, saga){
let {channel, dispatch, getState} = env;
let it = saga();
function next(val){
let {value: effect, done} = it.next(val);
if (!done){
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next继续走
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 不阻塞,派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
default:
break;
}
}
}
next();
}
export default runSaga;
redux-saga/channel.js
function stdChannel(){
let listeners = []; //缓存 监听函数
/** once(在源码中名字叫take) 订阅,保存监听函数
*
* @param {*} actionType 等待的动作类型
* @param {*} listener 等到之后,执行的监听函数
*/
function once(actionType, listener){
listener.actionType = actionType;
// 设置取消监听函数,用于单次监听(即只触发一次,触发后自动移除)
listener.cancel = () => {
listeners = listeners.filter(item => item !== listener);
}
listeners.push(listener);
}
/** emit(在源码中名字叫put) 发布,执行监听函数
*
* @param {*} action 派发的动作
*/
function emit(action){
listeners.forEach(listener => {
if (listener.actionType === action.type){
// 先取消,再执行,这样就实现了多次发布也只会触发一次监听函数。
// 类似于 eventEmitter.once (单次监听器,触发过一次后,就被自动移除了)
listener.cancel();
listener(action);
}
})
}
return {once, emit};
}
export default stdChannel;
/** 使用示例
let channel = stdChannel(); //先实例化
// 订阅
channel.on('ASYNC_ADD', action => {
console.log('action=>', action);
})
// 发布。不管派发几次,都只会执行一次
channel.emit({type: 'ASYNC_ADD', payload: {a: 1} });
channel.emit({type: 'ASYNC_ADD', payload: {a: 2} });
// action=> { type: 'ASYNC_ADD', payload: { a: 1 } }
*/
支持产出iterator
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
*/
function runSaga(env, saga){
let {channel, dispatch, getState} = env;
+ let it = typeof saga === 'function' ? saga() : saga;
function next(val){
let {value: effect, done} = it.next(val);
if (!done){
+ // 如果 effect产出 是 Generator 迭代器
+ if (typeof effect[Symbol.iterator] === 'function'){
+ // 开始一个子进程执行 worker saga
+ // 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
+ runSaga(env, effect);
+ next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
+ break;
+ } else {
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
default:
break;
}
}
}
}
next();
}
export default runSaga;
实现 takeEvery
PS: while 遇到yield 不会阻塞主进程
function* gen(){
while (true){
yield 1;
yield 2;
}
}
setInterval(() => {
console.log(new Date());
}, 1000);
let it = gen();
it.next();
it.next();
it.next();
// 时间依然输出,说明 这里的while不会阻塞主进程。因为它遇到yield,会放弃执行权
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听
export const PUT = 'PUT'; //派发
+ export const FORK = 'FORK'; //fork出一个子进程
redux-saga/effects.js
/** 开启一个子进程运行saga,为了不阻塞当前的进程
*
* @param {*} saga Generator生成器
* @param {*} args 额外参数
*/
export function fork(saga, ...args){
return {type: effectTypes.FORK, saga, args};
}
/** 当监听到某个动作类型时,开启新的子进程执行saga
*
* @param {*} actionType 动作类型
* @param {*} saga
*/
export function takeEvery(actionType, saga){
function* takeEveryHelper(){
// while 不会阻塞主进程,因为它遇到 yield后,就会放弃执行权。
while (true){
const action = yield take(actionType); //监听动作派发
// yield fork(saga, action); //开启一个新的子进程执行saga
yield fork(function* (){
yield saga(action);
})
}
}
return fork(takeEveryHelper);
}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
+* @param {*} args 额外参数
*/
+ function runSaga(env, saga, ...args){
let {channel, dispatch, getState} = env;
+ let it = typeof saga === 'function' ? saga() : saga;
function next(val){
let {value: effect, done} = it.next(val);
if (!done){
// 如果 effect产出 是 Generator 迭代器
if (typeof effect[Symbol.iterator] === 'function'){
// 开始一个子进程执行 worker saga
// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
runSaga(env, effect);
next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
} else {
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
break;
+ case effectTypes.FORK:
+ runSaga(env, effect.saga, ...effect.args);
+ next();
+ break;
default:
break;
}
}
}
}
next();
}
export default runSaga;
支持Promise
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
* @param {*} args 额外参数
*/
function runSaga(env, saga, ...args){
let {channel, dispatch, getState} = env;
let it = typeof saga === 'function' ? saga() : saga;
function next(val){
let {value: effect, done} = it.next(val);
if (!done){
// 如果 effect产出 是 Generator 迭代器
if (typeof effect[Symbol.iterator] === 'function'){
// 开始一个子进程执行 worker saga
// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
runSaga(env, effect);
next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
+ } else if (typeof effect.then === 'function'){
+ effect.then(next);
+ } else{
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 不阻塞,派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
break;
case effectTypes.FORK:
runSaga(env, effect.saga, ...effect.args);
next();
break;
default:
break;
}
}
}
}
next();
}
export default runSaga;
实现 call、delay
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听
export const PUT = 'PUT'; //派发
export const FORK = 'FORK'; //fork出一个子进程
+ export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
redux-saga/effects.js
/** 处理一个 返回值为Promise的函数fn
*
* @param {*} fn 函数
* @param {...args} args 函数参数
*/
export function call(fn, ...args){
return {type: effectTypes.CALL, fn, args};
}
// 延迟
function delayP(ms, val=true){
const promise = new Promise(resolve => {
setTimeout(resolve, ms, val);
})
return promise;
}
export const delay = call.bind(null, delayP);
// delay(fn, 1000) => {type: 'CALL', fn:delayP, args} args=[1000]
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
* @param {*} args 额外参数
*/
function runSaga(env, saga, ...args){
let {channel, dispatch, getState} = env;
let it = typeof saga === 'function' ? saga() : saga;
function next(val){
let {value: effect, done} = it.next(val);
if (!done){
// 如果 effect产出 是 Generator 迭代器
if (typeof effect[Symbol.iterator] === 'function'){
// 开始一个子进程执行 worker saga
// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
runSaga(env, effect);
next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
} else if (typeof effect.then === 'function'){
effect.then(next);
} else{
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 不阻塞,派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
break;
case effectTypes.FORK:
runSaga(env, effect.saga, ...effect.args);
next();
break;
+ case effectTypes.CALL:
+ effect.fn(...effect.args).then(next);
+ break;
default:
break;
}
}
}
}
next();
}
export default runSaga;
实现 cps
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听
export const PUT = 'PUT'; //派发
export const FORK = 'FORK'; //fork出一个子进程
export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
+ export const CPS = 'CPS'; //处理一个 回调形式的函数
redux-saga/effects.js
/** 处理一个 回调形式的函数
*
* @param {*} fn 函数
* @param {...args} args 函数参数
*/
export function cps(fn, ...args){
return {type: effectTypes.CPS, fn, args};
}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
* @param {*} args 额外参数
*/
function runSaga(env, saga, ...args){
let {channel, dispatch, getState} = env;
let it = typeof saga === 'function' ? saga() : saga;
+ function next(val, isError){
+ let result;
+ if (isError){
+ result = it.throw(val); //立刻停止saga,表示出错了
+ } else {
+ it.next(val);
+ }
+ let {value: effect, done} = result;
if (!done){
// 如果 effect产出 是 Generator 迭代器
if (typeof effect[Symbol.iterator] === 'function'){
// 开始一个子进程执行 worker saga
// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
runSaga(env, effect);
next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
} else if (typeof effect.then === 'function'){
effect.then(next);
} else{
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 不阻塞,派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
break;
case effectTypes.FORK:
runSaga(env, effect.saga, ...effect.args);
next();
break;
case effectTypes.CALL:
effect.fn(...effect.args).then(next);
break;
+ case effectTypes.CPS:
+ effect.fn(...effect.args, (err, data) => {
+ err ? next(err, true) : next(data);
+ });
+ break;
default:
break;
}
}
}
}
next();
}
export default runSaga;
实现 all
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听
export const PUT = 'PUT'; //派发
export const FORK = 'FORK'; //fork出一个子进程
export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
export const CPS = 'CPS'; //处理一个 回调形式的函数
+ export const ALL = 'ALL';
redux-saga/effects.js
/**
*
* @param {*} effects 数组,里面放置一个个的 监听saga迭代器
*/
export function all(effects){
return {type: effectTypes.ALL, effects};
}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
* @param {*} args 额外参数
*/
function runSaga(env, saga, ...args){
let {channel, dispatch, getState} = env;
let it = typeof saga === 'function' ? saga() : saga;
function next(val, isError){
let result = isError ? it.throw(val) : it.next(val);
let {value: effect, done} = result;
if (!done){
// 如果 effect产出 是 Generator 迭代器
if (typeof effect[Symbol.iterator] === 'function'){
// 开始一个子进程执行 worker saga
// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
runSaga(env, effect);
next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
} else if (typeof effect.then === 'function'){
effect.then(next);
} else{
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 不阻塞,派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
break;
case effectTypes.FORK:
runSaga(env, effect.saga, ...effect.args);
next();
break;
case effectTypes.CALL:
effect.fn(...effect.args).then(next);
break;
case effectTypes.CPS:
effect.fn(...effect.args, (err, data) => {
err ? next(err, true) : next(data);
});
break;
+ case effectTypes.ALL:
+ let effects = effect.effects;
+ let result = []; //存放结果的数组
+ let completeCount = 0; //完成任务的数量
+ effects.forEach((it, index) => {
+ runSaga(env, it, (itResult) => {
+ result[index] = itResult;
+ if (++completeCount === effects.length){
+ next(result); //都完了之后,可以让当前的saga继续执行
+ }
+ });
+ })
+ break;
default:
break;
}
}
+ } else {
+ if (args && typeof args[0] === 'function'){
+ args[0](effect);
+ }
+ }
}
next();
}
export default runSaga;
实现 cancel
如果父saga取消,子saga什么状态?现在这个版本里是保持原状 但其实应该是你把父saga取消了,所有的子saga也应该取消会比较好。
redux-saga/symbols.js
export const TASK_CANCEL = Symbol('TASK_CANCEL');
redux-saga/effectTypes.js
export const TAKE = 'TAKE'; //监听
export const PUT = 'PUT'; //派发
export const FORK = 'FORK'; //fork出一个子进程
export const CALL = 'CALL'; //处理一个 返回值为Promise的函数fn
export const CPS = 'CPS'; //处理一个 回调形式的函数
export const ALL = 'ALL';
export const CANCEL = 'CANCEL'; //取消一个saga任务
redux-saga/effects.js
export function cancel(task){
return {type: effectTypes.CANCEL, task};
}
redux-saga/runSaga.js
import * as effectTypes from './effectTypes';
+ import {TASK_CANCEL} from './symbols';
/** runSaga
*
* @param {*} env {channel, dispatch, getState}
* @param {*} saga 可能是一个Generator生成器,也可能是一个Generator迭代器
* @param {*} args 额外参数
*/
function runSaga(env, saga, ...args){
+ let subTasks = []; //子saga的task任务对象存放数组
+ // 每当执行runSaga的时候,会创建一个task任务对象
+ let task = {cancel: () => next(TASK_CANCEL)};
let {channel, dispatch, getState} = env;
let it = typeof saga === 'function' ? saga() : saga;
function next(val, isError){
let result;
if (isError){
result = it.throw(val); //立刻停止saga,表示出错了
+ } else if (val === TASK_CANCEL){
+ subTasks.forEach(subTask => subTask.cancel()); //如果父saga取消了,它的所有子saga也得取消
+ result = it.return(val); //立刻停止saga,并返回 {value: val, done: true}
} else {
result = it.next(val);
}
let {value: effect, done} = result;
if (!done){
// 如果 effect产出 是 Generator 迭代器
if (typeof effect[Symbol.iterator] === 'function'){
// 开始一个子进程执行 worker saga
// 此处的“开启子进程”只是一个比喻,并不是真的开了一个子进程,只是它的表现很像。
+ let iteratorTask = runSaga(env, effect);
+ subTasks.push(iteratorTask);
next(); //调用next,让当前saga继续往下走(子saga内部是否阻塞对其无影响)
} else if (typeof effect.then === 'function'){
effect.then(next);
} else{
// 根据不同的effect进行不同的处理
switch (effect.type){
case effectTypes.TAKE:
// effect = {type: 'TAKE', actionType: 'ASYNC_ADD'}
// 添加订阅,如果有人派发了 effect.actionType 这个动作类型,就执行next,继续走;
// 如果没有,就暂停,阻塞在这里
channel.once(effect.actionType, next);
break;
case effectTypes.PUT:
// effect = {type: 'PUT', action: {type: 'ADD'}}
// put 不阻塞,派发action之后就继续向下执行
dispatch(effect.action);
next(effect.action);
break;
case effectTypes.FORK:
+ let forkTask = runSaga(env, effect.saga, ...effect.args);
+ subTasks.push(forkTask);
+ next(forkTask);
break;
case effectTypes.CALL:
effect.fn(...effect.args).then(next);
break;
case effectTypes.CPS:
effect.fn(...effect.args, (err, data) => {
err ? next(err, true) : next(data);
});
break;
case effectTypes.ALL:
let effects = effect.effects;
let result = []; //存放结果的数组
let completeCount = 0; //完成任务的数量
effects.forEach((it, index) => {
+ let subTask = runSaga(env, it, (itResult) => {
result[index] = itResult;
if (++completeCount === effects.length){
next(result); //都完了之后,可以让当前的saga继续执行
}
});
+ subTasks.push(subTask);
})
break;
+ case effectTypes.CANCEL:
+ effect.task.cancel();
+ next();
+ break;
default:
break;
}
}
} else {
if (args && typeof args[0] === 'function'){
args[0](effect);
}
}
}
next();
+ return task;
}
export default runSaga;