1. 源码简介

官网简介:Run multiple promise-returning & async functions with limited concurrency
简单理解,控制有限的并发数量,运行多个promise-returning和async functions。就是常见的控制并发请求,刚好有一个实践的场景:大文件上传,看完这个库看看能不能派上用场。

2. 测试用例


  • concurrency: 1,这个测试用例测试并发数量为1时的场景。

    1. test('concurrency: 1', async t => {
    2. const input = [
    3. [10, 300],
    4. [20, 200],
    5. [30, 100],
    6. ];
    7. const end = timeSpan();
    8. const limit = pLimit(1);
    9. const mapper = ([value, ms]) => limit(async () => {
    10. await delay(ms);
    11. return value;
    12. });
    13. // 判断输出的值是否等于[10,20,30]
    14. t.deepEqual(await Promise.all(input.map(x => mapper(x))), [10, 20, 30]);
    15. // 判断执行时间是否在预期内
    16. t.true(inRange(end(), {start: 590, end: 650}));
    17. });
  • concurrency: 4,测试多个并发时,running数是否小于等于并发数

  • non-promise returning function,测试非promise返回值
  • continues after sync throw,测试抛出异常的情况是否正常执行
  • accepts additional arguments,没太看懂这个,字面意思是接受其他参数
  • does not ignore errors,测试抛出错误的情况,能否正常接收到错误的值
  • activeCount and pendingCount properties,测试activeCount和pendingCount属性
  • clearQueue,清空队列
  • throws on invalid concurrency argument,测试输入无效参数时的情况

    3. 源码

  1. 先校验
  2. 生成一个promise,入队操作是,queue.enqueue(run.bind(…)),意思是把请求放在队列里了
  3. 如果当前执行数 < 并发数,且队列中存在数据,出队运行请求;否则就在队列里等着吧。
  4. 当上一个运行完了(run函数的await result),会唤醒next()函数,出队运行请求。 ```javascript import Queue from ‘yocto-queue’; export default function pLimit(concurrency) { // 检验参数是Integer,且校验最大最小值 if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {

    1. throw new TypeError('Expected `concurrency` to be a number from 1 and up');

    } // 并发队列 const queue = new Queue(); // 当前正在执行的请求数 let activeCount = 0; // 执行下一个请求 const next = () => {

    1. activeCount--;
    2. if (queue.size > 0) {

    // 出队操作并执行

    1. queue.dequeue()();
    2. }

    }; // 执行请求任务 const run = async (fn, resolve, args) => { // 当前执行请求数+1

    1. activeCount++;
    2. const result = (async () => fn(...args))();

    // resolve出去

    1. resolve(result);
    2. try {

    // await 保证执行顺序

    1. await result;
    2. } catch {}
    3. next();

    }; // 入队操作 const enqueue = (fn, resolve, args) => {

    1. queue.enqueue(run.bind(undefined, fn, resolve, args));
    2. (async () => {
    3. await Promise.resolve();

    // 如果当前执行数 < 并发数,且队列还有请求

    1. if (activeCount < concurrency && queue.size > 0) {
    2. // 出队运行
    3. queue.dequeue()();
    4. }
    5. })();

    }; // 生成一个promise,入队 const generator = (fn, …args) => new Promise(resolve => {

    1. enqueue(fn, resolve, args);


    Object.defineProperties(generator, {

    1. activeCount: {
    2. get: () => activeCount,
    3. },
    4. pendingCount: {
    5. get: () => queue.size,
    6. },
    7. clearQueue: {
    8. value: () => {
    9. queue.clear();
    10. },
    11. },

    }); return generator; }


4. 总结
