Promise

Promise/A+ 与 thenable

Promise/A+ 的规范规定了 then() 方法的行为,让不同的程序能够以此为共识,来开发各自的 Promise 对象,使得这些 Promise 之间能够互相兼容。许多 Promise 实现方案都把这样的对象称为 thenable,即能在它上面调用 then()

把回调方案改写成 Promise 方案

在 Node.js 平台中,基于回调的函数通常具备以下特征:

  • 回调通常是该函数的最后一个参数
  • 如果有错误的话,那么错误信息总是当作第一个参数,传给回调
  • 如果有返回值,那么返回值会跟在错误信息后面,传给回调 ```typescript import { randomBytes } from ‘crypto’

function promisify (callbackBasedApi) { return function promisified (…args) { return new Promise((resolve, reject) => { const newArgs = [ …args, function (err, result) { if (err) { return reject(err) }

  1. resolve(result)
  2. }
  3. ]
  4. callbackBasedApi(...newArgs)
  5. })

} }

const randomBytesP = promisify(randomBytes) randomBytesP(32) .then(buffer => { console.log(Random bytes: ${buffer.toString()}) })

  1. <a name="j1tgP"></a>
  2. ## 顺序执行与迭代
  3. 基于 Promise 机制的顺序迭代模式
  4. > 通过循环结构,动态地构建 Promise 链,以便用其中的每个 Promise 对象,来表示相应的异步操作
  5. ```typescript
  6. function spiderLinks (currentUrl, content, nesting) {
  7. let promise = Promise.resolve()
  8. if (nesting === 0) {
  9. return promise
  10. }
  11. const links = getPageLinks(currentUrl, content)
  12. for (const link of links) {
  13. promise = promise.then(() => spider(link, nesting - 1))
  14. }
  15. return promise
  16. }
  1. 构建一个空白的 Promise 作为链条的起始点
  2. 在 for 循环中反复调用 promise 变量的 then 方法,并把该方法所返回的 Promise 对象,当作 Promise 变量的新值,这样就会将这些 promise 对象串成链条。

还可以使用 reduce 函数来实现 promise 的顺序迭代模式

  1. const promise = tasks.reduce((prev, task) => {
  2. return prev.then(() => {
  3. return task();
  4. })
  5. }, Promise.resolve())

限制任务数量的平行执行

  1. import { EventEmitter } from 'events'
  2. export class TaskQueue extends EventEmitter {
  3. constructor (concurrency) {
  4. super()
  5. this.concurrency = concurrency
  6. this.running = 0
  7. this.queue = []
  8. }
  9. pushTask (task) {
  10. this.queue.push(task)
  11. process.nextTick(this.next.bind(this))
  12. return this
  13. }
  14. next () {
  15. while (this.running < this.concurrency && this.queue.length) {
  16. const task = this.queue.shift()
  17. task().finally(() => {
  18. this.running--;
  19. this.next();
  20. })
  21. this.running++;
  22. }
  23. }
  24. runTask (task) {
  25. return new Promise((resolve, reject) => {
  26. this.queue.push(() => {
  27. return task().then(resolve, reject);
  28. })
  29. process.nextTick(this.next.bind(this));
  30. })
  31. }
  32. }

async/await

任何一种类型的值都可以写在 await 关键字的右侧,未必非得是 Promise 才行。如果你写的是其他类型的值,那就相当于先把这个值传给 Promise.resolve() 方法,以构建出一个 Promise 对象,然后让 await 针对这个对象去等候处理结果。

用 async/await 处理错误

async/await 机制的一项主要优势,正体现在它统一了同步错误与异步错误的处理方式,让 try…catch 结构既能处理同步操作通过 throw 所抛出的错误,又能顾及 promise 对象在执行异步操作时因为出错而遭到拒绝的情况。

用同一套 try…catch 结构处理同步与异步错误

  1. function delayError (milliseconds) {
  2. return new Promise((resolve, reject) => {
  3. setTimeout(() => {
  4. reject(new Error(`Error after ${milliseconds}ms`))
  5. }, milliseconds)
  6. })
  7. }
  8. async function playingWithErrors (throwSyncError) {
  9. try {
  10. if (throwSyncError) {
  11. throw new Error('This is a synchronous error')
  12. }
  13. await delayError(1000)
  14. } catch (err) {
  15. console.error(`We have an error: ${err.message}`)
  16. } finally {
  17. console.log('Done')
  18. }
  19. }
  20. // throws a synchronous error
  21. playingWithErrors(true)
  22. // awaited Promise will reject
  23. playingWithErrors(false)

是该用 return 还是 return await

  1. function delayError (milliseconds) {
  2. return new Promise((resolve, reject) => {
  3. setTimeout(() => {
  4. reject(new Error(`Error after ${milliseconds}ms`))
  5. }, milliseconds)
  6. })
  7. }
  8. async function errorNotCaught () {
  9. try {
  10. return delayError(1000)
  11. } catch (err) {
  12. console.error('Error caught by the async function: ' +
  13. err.message)
  14. }
  15. }
  16. errorNotCaught()
  17. .catch(err => console.error('Error caught by the caller: ' +
  18. err.message))

上面的写法会让 errorNotCaught 函数内部的 catch 块不起作用

  1. async function errorCaught () {
  2. try {
  3. return await delayError(1000)
  4. } catch (err) {
  5. console.error('Error caught by the async function: ' +
  6. err.message)
  7. }
  8. }

如果想实现的效果是在 Promise 对象得以兑现的情况下,把执行结果返回给调用函数的人,而在该对象遭到拒绝的情况下,将错误在函数内部捕获下来,那么应该针对这个对象的 await 表达式

平行执行

  1. 采用 await 表达式实现
  2. 依赖 Promise.all 方法

    1. async function spiderLinks (currentUrl, content, nesting) {
    2. if (nesting === 0) {
    3. return
    4. }
    5. const links = getPageLinks(currentUrl, content)
    6. const promises = links.map(link => spider(link, nesting - 1))
    7. for (const promise of promises) {
    8. await promise
    9. }
    10. }

    使用 for 循环逐个等待 promise,如果数组中某个 promise 会遭到 rejected,也必须等待前面的那些 promise 全都有了结果才行

    1. async function spiderLinks (currentUrl, content, nesting) {
    2. if (nesting === 0) {
    3. return
    4. }
    5. const links = getPageLinks(currentUrl, content)
    6. const promises = links.map(link => spider(link, nesting - 1))
    7. return Promise.all(promises)
    8. }

    使用 promise.all 函数,只要传给它的那些 promise 对象里有任何一个遭到 rejected,该函数所返回的这个总的 promise 就会进入 rejected 状态

    限制任务数量的平行执行

    把 async/await 方案与 producer-consumer(生产者-消费者)模式结合起来

  • 在队列的一端,有一系列数量未知的 producer(生产者)给队列中添加任务
  • 在队列的另一端,有一系列数量已知的 consumer 负责每次从队列中提取一项任务,并加以执行

image.png
消费者的数量决定了最多有几项任务能够并发地执行。
队列中没有任务可做时,把控制权还给事件循环让消费者暂时睡眠,在队列中有新任务时触发相应的回调,让消费者继续处理任务。

  1. export class TaskQueuePC {
  2. constructor (concurrency) {
  3. this.taskQueue = []; // 存放任务
  4. this.consumerQueue = []; // 存放暂时还没有拿到任务的消费者
  5. // 安排消费者开始消耗队列中的任务, concurrency 控制并发上限
  6. for (let i = 0; i < concurrency; i++) {
  7. this.consumer();
  8. }
  9. }
  10. async consumer () {
  11. // consumer 是 async 函数,
  12. // 所以该 while 系统在底层是以一种类似于异步递归(asynchronous recursion)的方式执行
  13. while (true) {
  14. try {
  15. const task = await this.getNextTask();
  16. await task();
  17. } catch (err) {
  18. console.log(err);
  19. }
  20. }
  21. }
  22. async getNextTask () {
  23. return new Promise((resolve) => {
  24. if (this.taskQueue.length !== 0) {
  25. return resolve(this.taskQueue.shift());
  26. }
  27. this.consumerQueue.push(resolve);
  28. })
  29. }
  30. // 胶水层,将消费者逻辑与生产者逻辑对接
  31. runTask (task) {
  32. return new Promise((resolve, reject) => {
  33. const taskWrapper = () => {
  34. const taskPromise = task();
  35. taskPromise.then(resolve, reject);
  36. return taskPromise;
  37. }
  38. if (this.consumerQueue.length !== 0) {
  39. const consumer = this.consumerQueue.shift();
  40. consumer(taskWrapper);
  41. } else {
  42. this.taskQueue.push(taskWrapper);
  43. }
  44. })
  45. }
  46. }

无限递归的 Promise 解析链所引发的问题

无限递归的 Promise 解析链所引发的内存泄漏

  1. function delay (milliseconds) {
  2. return new Promise((resolve, reject) => {
  3. setTimeout(() => {
  4. resolve(new Date())
  5. }, milliseconds)
  6. })
  7. }
  8. // eslint-disable-next-line no-unused-vars
  9. function leakingLoop () {
  10. return delay(1)
  11. .then(() => {
  12. console.log(`Tick ${Date.now()}`)
  13. return leakingLoop()
  14. })
  15. }

最开始的 leakingLoop 必须等待后面的 leakingLoop 任务有结果才能得到解析,而后面的那一 个 leakingLoop 又必须依赖下一个,依此类推形成一条永远无法解析的 Promise 链。

  1. function nonLeakingLoop () {
  2. delay(1)
  3. .then(() => {
  4. console.log(`Tick ${Date.now()}`)
  5. nonLeakingLoop()
  6. })
  7. }

上面的 nonLeakingLoop 不会依赖下一次调用的 nonLeakingLoop 所返回的 Promise,系统会多次安排垃圾收集器去回收用不到的内存,从而令内存用量降低,所以不会存在内存泄漏问题。
但这个函数不会把深层递归过程中所发生的错误播报出来,因为这些 Promise 的状态之间是没有联系的,前一个 Promise 不会因为后面的 Promise 发生错误而进入 rejected 状态

  1. function nonLeakingLoopWithErrors () {
  2. return new Promise((resolve, reject) => {
  3. (function internalLoop () {
  4. delay(1)
  5. .then(() => {
  6. console.log(`Tick ${Date.now()}`)
  7. internalLoop()
  8. })
  9. .catch(err => {
  10. reject(err)
  11. })
  12. })()
  13. })
  14. }

保证无论哪一层的异步操作发生错误,nonLeakingLoopWithErrors 函数所返回的 Promise 对象会遭到拒绝

  1. async function nonLeakingLoopAsync () {
  2. while (true) {
  3. await delay(1)
  4. console.log(`Tick ${Date.now()}`)
  5. }
  6. }

拥有递归效果,同时保证异步任务 delay 所抛出的错误,总能播报给最初调用函数的人

参考资料

  1. github 仓库