要点
控制并发请求时,可以对请求进行拆分,分为正在执行的和尚未执行的,当 Promise
被 fulfilled
的时候,就将其移除掉,然后将 pending
状态的加进来。我们可以使用 Promise.race
来完成。
实现
在开发过程中,有时候会遇到需要进行并发控制的场景。控制当前执行的异步请求个数。针对这个场景,可以使用闭包和 promise
来实现。
首先创建一个模拟请求:
function performAsyncOperation<T>(id: T): Promise<T> {
return new Promise((resolve) => {
setTimeout(() => {
console.log(`Performed operation for resource ${id}.`)
resolve(id)
}, 500 + 500 * Math.random())
})
}
然后我们再来创建一个模拟发起多个请求的函数:
async function run() {
console.time('Async limit')
const ids = Array.from(Array(10), (_, i) => i + '')
const limitedFun = asyncLimit(performAsyncOperation, 3)
await Promise.all(ids.map((id) => limitedFun(id)))
console.timeEnd('Async limit')
}
接下来我们可以创建一个异步控制函数:
function asyncLimit(fn: Function, limit: number) {
let pendingPromises = []
return async function <T>(...args: T[]): Promise<T> {
while (pendingPromises.length >= limit) {
await Promise.race(pendingPromises).catch(() => {})
}
const executedPromise = fn.apply(this, args)
pendingPromises.push(executedPromise)
await executedPromise.catch(() => {})
pendingPromises = pendingPromises.filter(
(promise) => promise !== executedPromise
)
return executedPromise
}
}
在这个函数中,我们使用一个闭包来存储现有正在执行的异步函数,并且控制其数量。假设我们设置的 limit 值为 3,则当当前执行的数量超过 3 时,会等待 Promise.race()
执行完毕,即停留在第5行处的位置,任意一个 promise
状态从 pending
发生转变之后才去执行7行以后的操作。pendingPromises
是共有的闭包变量用来存储当前 pending
状态的promise。我们使用 limit 这个参数来控制当前暂存的请求数量。
以上述的 limit 为 3 为例,前三个请求停留在 9 行的位置。当其中任意一个请求发生状态变更后,会继续执行,删去 pendingPromises
中 resolved 请求。同时,停留在 5 处的其它请求也会执行完毕,再次检查 pendingPromises
的长度,然后继续向下执行,最终会停留在 9 行的位置。如此往复,直到最后的所有的请求被执行完毕。
这样我们就完成了一个简易的并发限流函数。测试结果如下。每三个请求为一组,依次输出。
async function run() {
console.time('Async limit');
const ids = Array.from(Array(10), (_, i) => i + '');
const limitedFun = asyncLimit(performAsyncOperation, 3);
await Promise.all(ids.map((id) => limitedFun(id)));
console.timeEnd('Async limit');
}
run();
// Performed operation for resource 2.
// Performed operation for resource 0.
// Performed operation for resource 1.
// Performed operation for resource 3.
// Performed operation for resource 4.
// Performed operation for resource 5.
// Performed operation for resource 7.
// Performed operation for resource 6.
// Performed operation for resource 8.
// Performed operation for resource 9.
// Async limit: 3.025s