可并发控制的 Promise.all

纯自己整的,不确定这样的实现是否正确,也不确定测试是否有说服力。

还是期待讨论和指教。

实现代码

  1. Promise.myAllConcurrent = function (promiseArr, limit = 1) {
  2. return new Promise((resolve, reject) => {
  3. if (!promiseArr[Symbol.iterator]) {
  4. throw new TypeError(`${promiseArr} is not iterable`);
  5. }
  6. if (!Array.isArray(promiseArr)) {
  7. promiseArr = Array.from(promiseArr);
  8. }
  9. const resArr = [];
  10. const len = promiseArr.length;
  11. let count = 0;
  12. let lastIndex = limit - 1;
  13. // 不能用 count 来替代 lastIndex
  14. // eg. 允许并发数为 3, 但总共有 10 个
  15. // 任务 ① 处理完后, 开始处理任务 ④
  16. // 假设任务 ④ 耗时较长,
  17. // 任务 ④ 结束之前, 任务 ② 也处理好了
  18. // 如果按照 count 去处理, 则此时任务 ③ 的回调会去处理任务 ②, 而非 任务 ⑤
  19. // 如果按照 lastIndex 去处理, 则此时任务 ③ 的回调会去处理任务 ⑤
  20. function takeOnePromise(resArr, index) {
  21. Promise.resolve(promiseArr[index]).then(
  22. value => {
  23. resArr[index] = value;
  24. count++;
  25. lastIndex++;
  26. if (count !== len) takeOnePromise(resArr, lastIndex);
  27. else resolve(resArr);
  28. },
  29. reason => {
  30. reject(reason);
  31. }
  32. );
  33. }
  34. const top = Math.min(limit, length);
  35. for (let i = 0; i < top; i++) {
  36. takeOnePromise(resArr, i);
  37. }
  38. });
  39. }

测试代码

  1. Promise.myAllConcurrent = function (promiseArr, limit = 1) {
  2. return new Promise((resolve, reject) => {
  3. if (!promiseArr[Symbol.iterator]) {
  4. throw new TypeError(`${promiseArr} is not iterable`);
  5. }
  6. if (!Array.isArray(promiseArr)) {
  7. promiseArr = Array.from(promiseArr);
  8. }
  9. const resArr = [];
  10. const len = promiseArr.length;
  11. let count = 0;
  12. let lastIndex = limit - 1;
  13. function takeOnePromise_timer(resArr, index) { // 为了测试, 包了一层计时器
  14. setTimeout(() => {
  15. Promise.resolve(promiseArr[index]).then(
  16. value => {
  17. resArr[index] = value;
  18. count++;
  19. lastIndex++;
  20. if (count !== len) takeOnePromise_timer(resArr, lastIndex);
  21. else resolve(resArr);
  22. },
  23. reason => {
  24. reject(reason);
  25. }
  26. );
  27. }, 2000);
  28. }
  29. for (let i = 0; i < limit; i++) {
  30. // takeOnePromise(resArr, i);
  31. takeOnePromise_timer(resArr, i);
  32. }
  33. // 计时器 定时查询 res的数量, 以此代替观测并发数
  34. setInterval(() => {
  35. console.log(resArr.length);
  36. }, 1000);
  37. });
  38. }
  39. const instance1 = new Promise(resolve => {
  40. resolve('Promise —— 1');
  41. });
  42. const instance2 = new Promise(resolve => {
  43. resolve('Promise —— 2');
  44. });
  45. const instance3 = new Promise(resolve => {
  46. resolve('Promise —— 3');
  47. });
  48. const instance4 = new Promise(resolve => {
  49. resolve('Promise —— 4');
  50. });
  51. const instance5 = new Promise(resolve => {
  52. resolve('Promise —— 5');
  53. });
  54. const promiseArr = [instance1, instance2, instance3, instance4, instance5];
  55. Promise.myAllConcurrent(promiseArr, 2).then((value) => {
  56. console.log('finish —— promise');
  57. console.log('value: ', value);
  58. });