promise-concurrency-limiter

  1. /* TYPES */
  2. type Callback = () => void;
  3. type Task<T> = () => Promise<T>;
  4. type Options = {
  5. concurrency: number
  6. };
  7. /* PROMISE CONCURRENCY LIMITER */
  8. class Limiter {
  9. /* VARIABLES */
  10. concurrency: number;
  11. count: number;
  12. queue: Set<Callback>;
  13. /* CONSTRUCTOR */
  14. constructor ( options: Options ) {
  15. this.concurrency = options.concurrency;
  16. this.count = 0;
  17. this.queue = new Set ();
  18. }
  19. /* API */
  20. add <T> ( fn: Task<T> ): Promise<T> {
  21. if ( this.count < this.concurrency ) return this.run ( fn );
  22. return new Promise<T> ( resolve => {
  23. const callback = () => resolve ( this.run ( fn ) );
  24. this.queue.add ( callback );
  25. });
  26. }
  27. flush (): void {
  28. for ( const callback of this.queue ) {
  29. if ( this.count >= this.concurrency ) break;
  30. this.queue.delete ( callback );
  31. callback ();
  32. }
  33. }
  34. run <T> ( fn: Task<T> ): Promise<T> {
  35. this.count += 1;
  36. const promise = fn ();
  37. const cleanup = (): void => {
  38. this.count -= 1;
  39. this.flush ();
  40. };
  41. promise.then ( cleanup, cleanup );
  42. return promise;
  43. }
  44. }
  45. export default Limiter;

使用:

  1. import Limiter from 'promise-concurrency-limiter';
  2. const limiter = new Limiter ({
  3. concurrency: 2 // Limit the number of simultaneously active promises to 2
  4. });
  5. const somePromiseReturningFunction = async () => { /* ... */ };
  6. limiter.add ( somePromiseReturningFunction ); // First function added, executed immediately
  7. limiter.add ( somePromiseReturningFunction ); // Second function added, executed immediately
  8. limiter.add ( somePromiseReturningFunction ); // Third function added, executed immediately only if one of the 2 available slots got freed, deferred otherwise

参考

  1. promise-concurrency-limiter