要点
控制并发请求时,可以对请求进行拆分,分为正在执行的和尚未执行的,当 Promise 被 fulfilled 的时候,就将其移除掉,然后将 pending 状态的加进来。我们可以使用 Promise.race 来完成。
实现
在开发过程中,有时候会遇到需要进行并发控制的场景。控制当前执行的异步请求个数。针对这个场景,可以使用闭包和 promise 来实现。
首先创建一个模拟请求:
function performAsyncOperation<T>(id: T): Promise<T> {return new Promise((resolve) => {setTimeout(() => {console.log(`Performed operation for resource ${id}.`)resolve(id)}, 500 + 500 * Math.random())})}
然后我们再来创建一个模拟发起多个请求的函数:
async function run() {console.time('Async limit')const ids = Array.from(Array(10), (_, i) => i + '')const limitedFun = asyncLimit(performAsyncOperation, 3)await Promise.all(ids.map((id) => limitedFun(id)))console.timeEnd('Async limit')}
接下来我们可以创建一个异步控制函数:
function asyncLimit(fn: Function, limit: number) {let pendingPromises = []return async function <T>(...args: T[]): Promise<T> {while (pendingPromises.length >= limit) {await Promise.race(pendingPromises).catch(() => {})}const executedPromise = fn.apply(this, args)pendingPromises.push(executedPromise)await executedPromise.catch(() => {})pendingPromises = pendingPromises.filter((promise) => promise !== executedPromise)return executedPromise}}
在这个函数中,我们使用一个闭包来存储现有正在执行的异步函数,并且控制其数量。假设我们设置的 limit 值为 3,则当当前执行的数量超过 3 时,会等待 Promise.race() 执行完毕,即停留在第5行处的位置,任意一个 promise 状态从 pending 发生转变之后才去执行7行以后的操作。
pendingPromises 是共有的闭包变量用来存储当前 pending 状态的promise。我们使用 limit 这个参数来控制当前暂存的请求数量。
以上述的 limit 为 3 为例,前三个请求停留在 9 行的位置。当其中任意一个请求发生状态变更后,会继续执行,删去 pendingPromises 中 resolved 请求。同时,停留在 5 处的其它请求也会执行完毕,再次检查 pendingPromises 的长度,然后继续向下执行,最终会停留在 9 行的位置。如此往复,直到最后的所有的请求被执行完毕。
这样我们就完成了一个简易的并发限流函数。测试结果如下。每三个请求为一组,依次输出。
async function run() {console.time('Async limit');const ids = Array.from(Array(10), (_, i) => i + '');const limitedFun = asyncLimit(performAsyncOperation, 3);await Promise.all(ids.map((id) => limitedFun(id)));console.timeEnd('Async limit');}run();// Performed operation for resource 2.// Performed operation for resource 0.// Performed operation for resource 1.// Performed operation for resource 3.// Performed operation for resource 4.// Performed operation for resource 5.// Performed operation for resource 7.// Performed operation for resource 6.// Performed operation for resource 8.// Performed operation for resource 9.// Async limit: 3.025s
