要点

控制并发请求时,可以对请求进行拆分,分为正在执行的和尚未执行的,当 Promisefulfilled 的时候,就将其移除掉,然后将 pending 状态的加进来。我们可以使用 Promise.race 来完成。

实现

在开发过程中,有时候会遇到需要进行并发控制的场景。控制当前执行的异步请求个数。针对这个场景,可以使用闭包和 promise 来实现。
首先创建一个模拟请求:

  1. function performAsyncOperation<T>(id: T): Promise<T> {
  2. return new Promise((resolve) => {
  3. setTimeout(() => {
  4. console.log(`Performed operation for resource ${id}.`)
  5. resolve(id)
  6. }, 500 + 500 * Math.random())
  7. })
  8. }

然后我们再来创建一个模拟发起多个请求的函数:

  1. async function run() {
  2. console.time('Async limit')
  3. const ids = Array.from(Array(10), (_, i) => i + '')
  4. const limitedFun = asyncLimit(performAsyncOperation, 3)
  5. await Promise.all(ids.map((id) => limitedFun(id)))
  6. console.timeEnd('Async limit')
  7. }

接下来我们可以创建一个异步控制函数:

  1. function asyncLimit(fn: Function, limit: number) {
  2. let pendingPromises = []
  3. return async function <T>(...args: T[]): Promise<T> {
  4. while (pendingPromises.length >= limit) {
  5. await Promise.race(pendingPromises).catch(() => {})
  6. }
  7. const executedPromise = fn.apply(this, args)
  8. pendingPromises.push(executedPromise)
  9. await executedPromise.catch(() => {})
  10. pendingPromises = pendingPromises.filter(
  11. (promise) => promise !== executedPromise
  12. )
  13. return executedPromise
  14. }
  15. }

在这个函数中,我们使用一个闭包来存储现有正在执行的异步函数,并且控制其数量。假设我们设置的 limit 值为 3,则当当前执行的数量超过 3 时,会等待 Promise.race() 执行完毕,即停留在第5行处的位置,任意一个 promise 状态从 pending 发生转变之后才去执行7行以后的操作。
image.png
pendingPromises 是共有的闭包变量用来存储当前 pending 状态的promise。我们使用 limit 这个参数来控制当前暂存的请求数量。
image.png
以上述的 limit 为 3 为例,前三个请求停留在 9 行的位置。当其中任意一个请求发生状态变更后,会继续执行,删去 pendingPromises 中 resolved 请求。同时,停留在 5 处的其它请求也会执行完毕,再次检查 pendingPromises 的长度,然后继续向下执行,最终会停留在 9 行的位置。如此往复,直到最后的所有的请求被执行完毕。

这样我们就完成了一个简易的并发限流函数。测试结果如下。每三个请求为一组,依次输出。

  1. async function run() {
  2. console.time('Async limit');
  3. const ids = Array.from(Array(10), (_, i) => i + '');
  4. const limitedFun = asyncLimit(performAsyncOperation, 3);
  5. await Promise.all(ids.map((id) => limitedFun(id)));
  6. console.timeEnd('Async limit');
  7. }
  8. run();
  9. // Performed operation for resource 2.
  10. // Performed operation for resource 0.
  11. // Performed operation for resource 1.
  12. // Performed operation for resource 3.
  13. // Performed operation for resource 4.
  14. // Performed operation for resource 5.
  15. // Performed operation for resource 7.
  16. // Performed operation for resource 6.
  17. // Performed operation for resource 8.
  18. // Performed operation for resource 9.
  19. // Async limit: 3.025s