背景是在项目中需要选定一个处理redux异步数据流控制库,redux-saga相对来说更受欢迎,使用库的过程中,如果能知道它的实现方式,对于更优雅,更有效率的使用库本身有比较大的好处,所以抽空看了它的源码实现,有所收获,记录下来分享给大家!
假设不使用redux-thunk或者redux-saga
使用async await等简单方案 获取用户id数据,发出action将数据保存到store
function fetchData(userId) {
// 返回一个promise, 内容是数据
}
function fetchUser() {
// 返回一个promise, 内容是用户信息
}
class Component {
componentDidMount() {
const { userInfo: { userId } } = await fetchUser();
store.dispatch({type: 'UPDATE_USER_ID', payload: userId});
const { data } = await fetchData(userId);
store.dispatch({type: 'UPDATE_DATA', payload: data});
}
}
需要扩展怎么办,相对来说好一点的方案如下
class DataHandler {
static fetchData() {
const { userInfo: { userId } } = await fetchUser();
store.dispatch({type: 'UPDATE_USER_ID', payload: userId});
const { data } = await fetchData(userId);
store.dispatch({type: 'UPDATE_DATA', payload: data});
}
}
class ComponentA {
componentDidMount() {
DataHandler.fetchData();
}
}
class ComponentB {
componentDidMount() {
DataHandler.fetchData();
}
}
可是如果需要定制数据处理逻辑,有的组件获取到数据含有其他额外处理就没法完成复用,需要额外开发逻辑处理,最好的方法是,把异步数据获取和操作独立出来,通过指令压缩或扩展逻辑
使用redux-saga之后
异步逻辑抽离到了saga文件,可以构建统一的 集中的异步处理中心,可以直接分发action触发对应逻辑,复用也简单,分发同一个action即可,如果有特殊定制,只需另加saga方法特殊处理,是store reducer专注处理state,接口与实现 分离
// store
import rootSaga from './sagas'
const sagaMiddleware = createSagaMiddleware()
const store = ...
sagaMiddleware.run(rootSaga)
// saga.js
import { delay } from 'redux-saga'
import { put, takeEvery, all } from 'redux-saga/effects'
function* incrementAsync() {
yield delay(1000)
//...可以加额外逻辑
yield put({ type: 'INCREMENT1' })
// ...handler
}
function* incrementAsync2() {
yield delay(1000)
//...可以加额外逻辑
yield put({ type: 'INCREMENT2' })
// ...handler
}
function* incrementAsync3() {
yield delay(1000)
//...可以加额外逻辑
yield put({ type: 'INCREMENT3' })
// ...handler
}
function* watchIncrementAsync() {
yield takeEvery('INCREMENT_ASYNC1', incrementAsync)
yield takeEvery('INCREMENT_ASYNC2', incrementAsync2)
yield takeEvery('INCREMENT_ASYNC', incrementAsync3)
}
function* watchIncrementAsync2() {
yield takeEvery('INCREMENT_ASYNC2', incrementAsync2)
yield takeEvery('INCREMENT_ASYNC3', incrementAsync3)
}
export default function* rootSaga() {
yield all([
helloSaga(),
watchIncrementAsync(),
watchIncrementAsync2()
])
}
// render
function render() {
ReactDOM.render(
<Counter
value={store.getState()}
onIncrement={() => action('INCREMENT')}
onDecrement={() => action('DECREMENT')}
// onIncrementAsync={() => action('INCREMENT_ASYNC1')}
// onIncrementAsync={() => action('INCREMENT_ASYNC2')}
onIncrementAsync={() => action('INCREMENT_ASYNC3')} />,
document.getElementById('root')
)
}
redux-saga的本质,集中处理副作用
只要是跟函数外部环境发生的交互就都属于副作用
包括但不限于
- 发送一个 http 请求
- 更改文件系统
- 往数据库插入记录
- 使用LocalStorage进行本地存储
- 打印/log
- 获取用户输入
- DOM 查询
- 访问系统状态
redux是一个同步的树形state系统,没有考虑异步的处理,把这些脏活交给了用户,redux-saga就是尽量保持redux的纯粹,自己接管异步脏活,让redux只维护state树
纯粹的state树
reducers: {
update: state => {
state.value -= 1
}
}
掺杂了异步脏活的state树
reducers: {
update: state => {
await api()
// ...
// ...
// ...
state.value -= 1
}
}
// 可以当成一个handler
function* incrementAsync() {
yield delay(1000)
yield put({ type: 'INCREMENT' })
}
function* watchIncrementAsync() {
yield takeEvery('INCREMENT_ASYNC', incrementAsync)
}
channel = [
{
effect: takeEvery
// 注册的type
type: 'INCREMENT_ASYNC',
cb: incrementAsync
}
]
saga的优点
- 更容易管理副作用
- 程序更高效执行
- 易于测试
- 易于处理错误
javascript的Generator生成器
saga就是一个中间件,实现异步流程控制管理,异步流程管理,ES6特性Generator,调用next()方法推进流程执行,对生成器函数概念陌生的可以移步https://www.infoq.cn/article/es6-in-depth-iterators-and-the-for-of-loop/
function* someSaga() {
// yield 一个 promise 应该返回 promise resolve 的值
const response = yield fetch('https://example.com/')
// yield 一个 take effect 应该返回一个 Action
const action = yield take('SOME_ACTION')
// yield 一个 all effect 应该返回一个数组,该数组记录了 effect1 或 effect2 的执行结果
const allResult = yield all([effect1, effect2])
}
someSaga()
在我们选定用生成器函数来控制异步流程后,怎么有序的,自动的触发next()很关键
for循环消费迭代器
for循环是一个迭代器, 本质上for循环是一个迭代器语法糖,底层调用iterator.next()实现迭代,但是功能有限,不能传参
function* range(start, end) {
for (let i = start; i < end; i++) {
yield i
}
}
for (let x of range(1, 10)) {
console.log(x)
}
// 输出 1, 2, 3 ... 8, 9
//
while (true)消费
同步操作,无法控制等待异步完成继续执行接下来的iterator.next()
const iterator = range(1, 10)
while (true) {
const { done, value } = iterator.next(/* 我们可以决定这里的参数 */)
if (done) {
break
}
if (value === 5) {
iterator.throw(new Error('5 is bad input'))
}
console.log(value)
}
// 输出 1, 2, 3, 4,然后抛出异常 '5 is bad input'
saga的解决方案-递归方法,next中嵌套next
function* range2(start, end) {
for (let i = start; i < end; i++) {
const response = yield i
console.log(`response of ${i} is ${response}`)
}
}
const iterator = range2(1, 10)
function next(arg, isErr) {
// 注意驱动函数多了参数 arg 和 isErr
let result
if (isErr) {
result = iterator.throw(arg)
} else {
// 这里我们将 arg 作为参数传递给 iterator.next,作为 effect-producer 中 yield 语句的返回值
result = iterator.next(arg)
}
const { done, value } = result
if (done) {
return
}
console.log('getting:', value)
if (value === 5) {
// 将 isErr 置为 true,就能用递归的方式调用 iterator.throw 方法
next(new Error('5 is bad input'), true)
} else {
// 延迟调用驱动函数;「响应」是「请求」的两倍
setTimeout(() => next(value * 2), value * 1000)
}
}
next()
// 输出
// getting: 1
// response of 1 is 2
// getting: 2
// response of 2 is 4
// getting: 3
// response of 3 is 6
// getting: 4
// response of 4 is 8
// getting: 5
// Uncaught Error: 5 is bad input
// 输出 getting: x 之后,输出会暂停一段时间
注册saga中间件
saga本质上是一个redux中间件,如果对redux不熟悉,请移步https://redux.js.org/
redux中间件的经典使用姿势,高阶函数
function sagaMiddleware({ getState, dispatch }) {
boundRunSaga = runSaga.bind(null, {
...options,
context,
channel,
dispatch,
getState,
sagaMonitor,
})
return next => action => {
if (sagaMonitor && sagaMonitor.actionDispatched) {
sagaMonitor.actionDispatched(action)
}
const result = next(action) // hit reducers
channel.put(action) // 通过saga处理逻辑
return result
}
}
从这边可以看到一个action是有会经过两个通道,稍微注意action commit type的唯一性,基本上也不会有冲突,示例如下
// saga
const initialState: WorkflowState = {
// 点击某一条记录时 推入该条记录
approve: null,
permission: null,
records: [],
userinfo: {},
};
export function makeFetchRecordsCommit(row: WorkflowRecord): RecordsUpdateCommit {
return {
type: WorkflowActionType.updateRecords,
payload: row,
};
}
export default {
namespace: 'workflow',
state: initialState,
effects: {
*fetchRecords(action: StringPayloadAction, { call, put }: EffectsCommandMap) {
const formNo = action.payload;
const res: Response = yield call(fetchRecordsByFormNo, formNo);
if (!res.empty) {
const records = res.content;
yield put(makeFetchRecordsCommit(records));
}
},
},
reducers: {
updateRecords(state: WorkflowState, { payload }: ApproveUpdateCommit) {
return {
...state,
records: payload,
};
},
},
};
启动saga注册
sagaMiddleware.run = (...args) => {
if (process.env.NODE_ENV !== 'production' && !boundRunSaga) {
throw new Error('Before running a Saga, you must mount the Saga middleware on the Store using applyMiddleware')
}
return boundRunSaga(...args)
}
// store上注册
const sagaMiddleware = createSagaMiddleware()
const store = createStore(reducer, applyMiddleware(sagaMiddleware))
sagaMiddleware.run(rootSaga)
runSaga
export function runSaga(
{ channel = stdChannel(), dispatch, getState, context = {}, sagaMonitor, effectMiddlewares, onError = logError },
saga,
...args
) {
const iterator = saga(...args) // rootSaga执行获取了迭代器
const env = {
channel,
dispatch: wrapSagaDispatch(dispatch),
getState,
sagaMonitor,
onError,
finalizeRunEffect,
}
// 开启task
return immediately(() => {
const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
if (sagaMonitor) {
sagaMonitor.effectResolved(effectId, task)
}
return task
})
}
redux-saga之take
相对最常用的api,捕捉未来的action并作出对应操作
// rootSaga
export function* rootSaga () {
yield take(actions.FETCH_USERINFO)
// 当dispatch action.type命中actions.FETCH_USERINFO
// 执行下列逻辑
// ... handlerTake
handlerTake
}
- 第一步,执行rootSata,得到一个迭代器
import { take } from "./internal/io"
// rootSaga()
const iterator = saga(...args)
// 执行proc
const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
// 构造一个next方法 = currcb
function next (arg, isErr) {
try {
let result = iterator.next(arg)
if (!result.done) {
digestEffect(result.value, parentEffectId, next)
}
} catch (error) {
...
}
}
// 构造完next方法后直接调用
next()
// 执行 构造一个effect 本质上是一个纯对象
take(actions.FETCH_USERINFO)
// patternOrChannel = actions.FETCH_USERINFO
makeEffect(effectTypes.TAKE, { pattern: patternOrChannel })
// result = { value: ..., done: false }
// payload = { pattern: patternOrChannel }
value = {
[IO]: true, // 标记 判断是否属于saga
combinator: false,
type, // effect类型 Take
payload,
}
// result.done === false
// 进入代码分支 digestEffect 且传递了next作为回调函数
// cb = next
function digestEffect(effect, parentEffectId, cb, label = '') {
let effectSettled // 标记当前effect处理进度
function currCb(res, isErr) {
if (effectSettled) {
return
}
effectSettled = true // 表示当前effect处理过了 适用于竞赛请求的情况
cb(res, isErr)
}
// currCb = () => { next() } 包裹一层next
finalRunEffect(effect, effectId, currCb)
}
// 开始执行runEffect
function runEffect(effect, effectId, currCb) {
if (is.promise(effect)) {
// 非promise
resolvePromise(effect, currCb)
} else if (is.iterator(effect)) {
// 这边当前实例不会进入
proc(env, effect, task.context, effectId, meta, /* isRoot */ false, currCb)
} else if (effect && effect[IO]) {
// 符合当前分支
const effectRunner = effectRunnerMap[effect.type]
effectRunner(env, effect.payload, currCb, executingContext)
} else {
// 如果不是saga effect 照常进行 此时将会调用顶层next,rootSaga方法体将继续执行
currCb(effect)
}
}
// 执行effectRunner
// 匹配到的是runTakeEffect take类型的runner
function runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {
// 构造回调函数 作为后续响应当前actions.FETCH_USERINFO要执行的注册handler
const takeCb = input => {
if (input instanceof Error) {
cb(input, true)
return
}
if (isEnd(input) && !maybe) {
cb(TERMINATE)
return
}
cb(input)
}
try {
// 注册channel
// 注意tabkeCb是包裹了一层上层传递下来的next 也就是说 程序卡在
// 卡在准备执行handlerTake的地方
// 因为需要iterator.next()才能继续执行handlerTake,暂时没有触发因子
channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)
// matcher处理通配符等情况
} catch (err) {
cb(err, true)
return
}
}
// channel.take
take(cb, matcher = matchers.wildcard) {
if (closed) {
cb(END)
return
}
cb[MATCH] = matcher // 正则生成 根据dispatch的action.type来匹配触发
nextTakers.push(cb) // buffer数组保存cb 包含match信息,也就是actions.FETCH_USERINFO这类信息
}
// 至此saga take注册完成
// 触发 在注册saga的地方提过,action.type会进入两个分支 一个是redux分支,一个是saga分支
return next => action => {
if (sagaMonitor && sagaMonitor.actionDispatched) {
sagaMonitor.actionDispatched(action)
}
const result = next(action)
channel.put(action) // action
return result
}
// 看看put做了啥
put(input) {
if (closed) {
return
}
if (isEnd(input)) {
close()
return
}
// 当前维护的所有actions相关takecb
const takers = (currentTakers = nextTakers)
for (let i = 0, len = takers.length; i < len; i++) {
const taker = takers[i]
// 找出正则匹配上的action type
if (taker[MATCH](input)) {
taker.cancel()
// takecb = 顶层next() 触发iterator.next() 执行handlerTake
taker(input)
}
}
},
redux-saga之takeEvery
发现take有个问题,就是只能触发一个就不再执行,不使用与多次点击
不推荐的解决办法
// rootSaga
export function* rootSaga () {
while (true) {
yield take(actions.FETCH_USERINFO)
// 当dispatch action.type命中actions.FETCH_USERINFO
// 执行下列逻辑
// ... handlerTake
handlerTake
}
}
redux-saga提供了一个工具函数takeEvery,也就是说每次分发的actions.FETCH_USERINFO都能收集且触发继而执行handlerTake
// rootSaga
export function* rootSaga () {
yield takeEvery(actions.FETCH_USERINFO)
// 当dispatch action.type命中actions.FETCH_USERINFO
// 执行下列逻辑
// ... handlerTake
handlerTake
}
解析它的实现,本质上和while…true类似,消费完上一个effect后,生成一个新的take effect
// rootSaga
export function* rootSaga () {
yield takeEvery(actions.FETCH_USERINFO)
// 当dispatch action.type命中actions.FETCH_USERINFO
// 执行下列逻辑
// ... handlerTake
handlerTake
}
构造takeEvery
// 工具方法 构造迭代器 核心是next方法
export function makeIterator(next, thro = kThrow, name = 'iterator') {
const iterator = { meta: { name }, next, throw: thro, return: kReturn, isSagaIterator: true }
return iterator
}
// next方法编写
export default function fsmIterator(fsm, startState, name) {
let stateUpdater,
errorState,
effect,
nextState = startState
// 除非报错的情况 否则nextState在q1, q2间切换
function next(arg, error) {
if (nextState === qEnd) {
return done(arg)
}
if (error && !errorState) {
nextState = qEnd
throw error
} else {
stateUpdater && stateUpdater(arg) // 状态流转方法
const currentState = error ? fsm[errorState](error) : fsm[nextState]()
;({ nextState, effect, stateUpdater, errorState } = currentState)
return nextState === qEnd ? done(arg) : effect
}
}
return makeIterator(next, error => next(null, error), name)
}
export default function takeEvery(patternOrChannel, worker, ...args) {
// 熟悉的地方 模拟take的处理结果 相当于使用take注册
const yTake = { done: false, value: take(patternOrChannel) }
// fork类型effect
const yFork = ac => ({ done: false, value: fork(worker, ...args, ac) })
// 切换q1, q2
let action,
setAction = ac => (action = ac)
// 原来代码
// return fsmIterator(
// {
// q1() {
// return { nextState: 'q2', effect: yTake, stateUpdater: setAction }
// },
// q2() {
// return { nextState: 'q1', effect: yFork(action) }
// },
// },
// 'q1', // 初始状态q1
// `takeEvery(${safeName(patternOrChannel)}, ${worker.name})`,
// )
// 搬运后代码 可以看到状态在q1,q2轮转
const fsm = {
q1() {
return { nextState: 'q2', effect: yTake, stateUpdater: setAction }
},
q2() {
return { nextState: 'q1', effect: yFork(action) }
},
},
function next(arg, error) {
if (nextState === qEnd) {
return done(arg)
}
if (error && !errorState) {
nextState = qEnd
throw error
} else {
stateUpdater && stateUpdater(arg)
const currentState = error ? fsm[errorState](error) : fsm[nextState]()
;({ nextState, effect, stateUpdater, errorState } = currentState)
return nextState === qEnd ? done(arg) : effect
}
}
}
用take类似的分析方法走一遍
// rootSaga
const iterator = saga(...args)
// 开启proc
const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
// 构造进程next
function next(arg, isErr) {
try {
let result = iterator.next(arg)
if (!result.done) {
digestEffect(result.value, parentEffectId, next)
}
}
}
// 构造完next 立即执行
next()
// iterator.next()得到
// { nextState: 'q2', effect: yTake, stateUpdater: setAction }
// 也就是
function next(arg, error) {
if (nextState === qEnd) {
return done(arg)
}
if (error && !errorState) {
nextState = qEnd
throw error
} else {
stateUpdater && stateUpdater(arg)
const currentState = error ? fsm[errorState](error) : fsm[nextState]()
;({ nextState, effect, stateUpdater, errorState } = currentState)
// q1.effect
return q1.effect
}
}
// 得到一个take类型effect,参照上面take的处理,直到捕捉到一个actions.FETCH_USERINFO
// 假设捕捉成功,此时执行最外层定义的proc的next中包裹的
result = iterator.next(arg)
// 得到
result = { nextState: 'q1', effect: yFork(action) }
// 因为
result.done !== true
// 继续执行
digestEffect(result.value, .., proc-next)
// 继续执行
effectRunner(env, effect.payload, currCb, executingContext)
// 判断出effect.type === effectTypes.fork
function runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {
// 类似rootSaga
const taskIterator = createTaskIterator({ context, fn, args })
const meta = getIteratorMetaInfo(taskIterator, fn)
immediately(() => {
// 开启一个proc,就类似根saga的处理方法 走一遍单个的take注册流程
// 这个开启之后是另一个管理流程了 但是之前我们proc的收尾呢
const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)
if (detached) {
cb(child)
} else {
if (child.isRunning()) {
parent.queue.addTask(child)
cb(child)
} else if (child.isAborted()) {
parent.queue.abort(child.error())
} else {
// cb收尾
// 执行handleTake
cb(child)
}
}
})
}
总结就是,开启生产一个take effect,消费一个的同时再次生产一个take effect待命下一次的action
redux-saga之call
前面类似take,最后的effectRunner不同
function runCallEffect(env, { context, fn, args }, cb, { task }) {
// catch synchronous failures; see #152
try {
const result = fn.apply(context, args)
if (is.promise(result)) {
// 如果是异步请求 result = then
// 只有在then完成后,才会执行cb,也就是proc-next
// 所以call是阻塞的
resolvePromise(result, cb)
return
}
if (is.iterator(result)) {
// resolve iterator
proc(env, result, task.context, currentEffectId, getMetaInfo(fn), /* isRoot */ false, cb)
return
}
cb(result)
} catch (error) {
cb(error, true)
}
}
redux-saga之put
前面类似take,最后的effectRunner不同
function runPutEffect(env, { channel, action, resolve }, cb) {
// 重点是有调度器 调度器阻塞
asap(() => {
let result
try {
// 直接dispatch
result = (channel ? channel.put : env.dispatch)(action)
} catch (error) {
cb(error, true)
return
}
if (resolve && is.promise(result)) {
resolvePromise(result, cb)
} else {
cb(result)
}
})
}
为什么redux-saga需要调度器
源码阅读的时候对一个schedule文件充满好奇,没有了解到它的用处
假设我们有如下场景
function* rootSaga() {
// next0()
yield fork(genA) // LINE-1
// next1()
yield fork(genB) // LINE-2
}
function* genA() {
// nextA0()
yield put({ type: 'A' })
// nextA1()
yield take('B')
}
function* genB() {
// nextB0()
yield take('A')
// nextB1()
yield put({ type: 'B' })
}
// 按照刚刚的分析,如果没有调度器
// 执行next0(), fork,开启了一个新的proc
// 执行genA, put为同步方法,立即执行 发出一个action
// 此时还没有执行nextA1,没有执行genB,所以有可能遗失一个action 'A'
// 注册完 takeB后,执行genB,分发 action 'B', 成功捕捉
// 有遗失action是不能接受的
调度方案解决action遗失问题
// 任务队列
const queue = []
// 锁
let semaphore = 0
// 执行任务
function exec(task) {
try {
suspend()
task()
} finally {
release()
}
}
// 尽快执行任务
export function asap(task) {
// 压入任务栈
queue.push(task)
if (!semaphore) {
suspend()
flush()
}
}
// 立即执行任务
export function immediately(task) {
try {
suspend()
return task()
} finally {
flush()
}
}
// 上锁
function suspend() {
semaphore++
}
// 解锁
function release() {
semaphore--
}
function flush() {
release()
let task
while (!semaphore && (task = queue.shift()) !== undefined) {
exec(task)
}
}
重新分析上述例子
// runSaga中的immediately 立即执行 rootSaga立即执行
// task1
return immediately(() => {
const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)
if (sagaMonitor) {
sagaMonitor.effectResolved(effectId, task)
}
return task
})
// forkA
function runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {
const taskIterator = createTaskIterator({ context, fn, args })
const meta = getIteratorMetaInfo(taskIterator, fn)
// 立即执行 task2
immediately(() => {
// procA
const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)
if (detached) {
cb(child)
} else {
if (child.isRunning()) {
parent.queue.addTask(child)
cb(child)
} else if (child.isAborted()) {
parent.queue.abort(child.error())
} else {
cb(child)
}
}
})
}
// genA put task3, 注意 此时是在procA中,此任务本身处于task2中,而立即执行的任务会上锁
asap(() => {
let result
try {
result = (channel ? channel.put : env.dispatch)(action)
} catch (error) {
cb(error, true)
return
}
if (resolve && is.promise(result)) {
resolvePromise(result, cb)
} else {
cb(result)
}
})
// 此时put堵塞,不是put本身堵塞,是注册put,迭代器没有回调proc-next
// 调起forkB,注册take 'B',没有启用调度器 用不着 注册不影响后续,而且越早越好
// put B同样堵塞,调度器执行flush,put A执行,而后put B执行,
// 两个put都是在take注册完成之后,不再有action遗失问题
总结一下 ,调度器本质就是,注册操作优先,触发操作靠后,理论上最好不要写这种交叉逻辑