Promise
Promise/A+ 与 thenable
Promise/A+ 的规范规定了 then() 方法的行为,让不同的程序能够以此为共识,来开发各自的 Promise 对象,使得这些 Promise 之间能够互相兼容。许多 Promise 实现方案都把这样的对象称为 thenable,即能在它上面调用 then()
把回调方案改写成 Promise 方案
在 Node.js 平台中,基于回调的函数通常具备以下特征:
- 回调通常是该函数的最后一个参数
- 如果有错误的话,那么错误信息总是当作第一个参数,传给回调
- 如果有返回值,那么返回值会跟在错误信息后面,传给回调 ```typescript import { randomBytes } from ‘crypto’
function promisify (callbackBasedApi) { return function promisified (…args) { return new Promise((resolve, reject) => { const newArgs = [ …args, function (err, result) { if (err) { return reject(err) }
resolve(result)
}
]
callbackBasedApi(...newArgs)
})
} }
const randomBytesP = promisify(randomBytes)
randomBytesP(32)
.then(buffer => {
console.log(Random bytes: ${buffer.toString()}
)
})
<a name="j1tgP"></a>
## 顺序执行与迭代
基于 Promise 机制的顺序迭代模式
> 通过循环结构,动态地构建 Promise 链,以便用其中的每个 Promise 对象,来表示相应的异步操作
```typescript
function spiderLinks (currentUrl, content, nesting) {
let promise = Promise.resolve()
if (nesting === 0) {
return promise
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
promise = promise.then(() => spider(link, nesting - 1))
}
return promise
}
- 构建一个空白的 Promise 作为链条的起始点
- 在 for 循环中反复调用 promise 变量的 then 方法,并把该方法所返回的 Promise 对象,当作 Promise 变量的新值,这样就会将这些 promise 对象串成链条。
还可以使用 reduce 函数来实现 promise 的顺序迭代模式
const promise = tasks.reduce((prev, task) => {
return prev.then(() => {
return task();
})
}, Promise.resolve())
限制任务数量的平行执行
import { EventEmitter } from 'events'
export class TaskQueue extends EventEmitter {
constructor (concurrency) {
super()
this.concurrency = concurrency
this.running = 0
this.queue = []
}
pushTask (task) {
this.queue.push(task)
process.nextTick(this.next.bind(this))
return this
}
next () {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift()
task().finally(() => {
this.running--;
this.next();
})
this.running++;
}
}
runTask (task) {
return new Promise((resolve, reject) => {
this.queue.push(() => {
return task().then(resolve, reject);
})
process.nextTick(this.next.bind(this));
})
}
}
async/await
任何一种类型的值都可以写在 await 关键字的右侧,未必非得是 Promise 才行。如果你写的是其他类型的值,那就相当于先把这个值传给 Promise.resolve() 方法,以构建出一个 Promise 对象,然后让 await 针对这个对象去等候处理结果。
用 async/await 处理错误
async/await 机制的一项主要优势,正体现在它统一了同步错误与异步错误的处理方式,让 try…catch 结构既能处理同步操作通过 throw 所抛出的错误,又能顾及 promise 对象在执行异步操作时因为出错而遭到拒绝的情况。
用同一套 try…catch 结构处理同步与异步错误
function delayError (milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Error after ${milliseconds}ms`))
}, milliseconds)
})
}
async function playingWithErrors (throwSyncError) {
try {
if (throwSyncError) {
throw new Error('This is a synchronous error')
}
await delayError(1000)
} catch (err) {
console.error(`We have an error: ${err.message}`)
} finally {
console.log('Done')
}
}
// throws a synchronous error
playingWithErrors(true)
// awaited Promise will reject
playingWithErrors(false)
是该用 return 还是 return await
function delayError (milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Error after ${milliseconds}ms`))
}, milliseconds)
})
}
async function errorNotCaught () {
try {
return delayError(1000)
} catch (err) {
console.error('Error caught by the async function: ' +
err.message)
}
}
errorNotCaught()
.catch(err => console.error('Error caught by the caller: ' +
err.message))
上面的写法会让 errorNotCaught 函数内部的 catch 块不起作用
async function errorCaught () {
try {
return await delayError(1000)
} catch (err) {
console.error('Error caught by the async function: ' +
err.message)
}
}
如果想实现的效果是在 Promise 对象得以兑现的情况下,把执行结果返回给调用函数的人,而在该对象遭到拒绝的情况下,将错误在函数内部捕获下来,那么应该针对这个对象的 await 表达式
平行执行
- 采用 await 表达式实现
依赖 Promise.all 方法
async function spiderLinks (currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
for (const promise of promises) {
await promise
}
}
使用 for 循环逐个等待 promise,如果数组中某个 promise 会遭到 rejected,也必须等待前面的那些 promise 全都有了结果才行
async function spiderLinks (currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}
使用 promise.all 函数,只要传给它的那些 promise 对象里有任何一个遭到 rejected,该函数所返回的这个总的 promise 就会进入 rejected 状态
限制任务数量的平行执行
把 async/await 方案与 producer-consumer(生产者-消费者)模式结合起来
- 在队列的一端,有一系列数量未知的 producer(生产者)给队列中添加任务
- 在队列的另一端,有一系列数量已知的 consumer 负责每次从队列中提取一项任务,并加以执行
消费者的数量决定了最多有几项任务能够并发地执行。
队列中没有任务可做时,把控制权还给事件循环让消费者暂时睡眠,在队列中有新任务时触发相应的回调,让消费者继续处理任务。
export class TaskQueuePC {
constructor (concurrency) {
this.taskQueue = []; // 存放任务
this.consumerQueue = []; // 存放暂时还没有拿到任务的消费者
// 安排消费者开始消耗队列中的任务, concurrency 控制并发上限
for (let i = 0; i < concurrency; i++) {
this.consumer();
}
}
async consumer () {
// consumer 是 async 函数,
// 所以该 while 系统在底层是以一种类似于异步递归(asynchronous recursion)的方式执行
while (true) {
try {
const task = await this.getNextTask();
await task();
} catch (err) {
console.log(err);
}
}
}
async getNextTask () {
return new Promise((resolve) => {
if (this.taskQueue.length !== 0) {
return resolve(this.taskQueue.shift());
}
this.consumerQueue.push(resolve);
})
}
// 胶水层,将消费者逻辑与生产者逻辑对接
runTask (task) {
return new Promise((resolve, reject) => {
const taskWrapper = () => {
const taskPromise = task();
taskPromise.then(resolve, reject);
return taskPromise;
}
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(taskWrapper);
} else {
this.taskQueue.push(taskWrapper);
}
})
}
}
无限递归的 Promise 解析链所引发的问题
无限递归的 Promise 解析链所引发的内存泄漏
function delay (milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(new Date())
}, milliseconds)
})
}
// eslint-disable-next-line no-unused-vars
function leakingLoop () {
return delay(1)
.then(() => {
console.log(`Tick ${Date.now()}`)
return leakingLoop()
})
}
最开始的 leakingLoop 必须等待后面的 leakingLoop 任务有结果才能得到解析,而后面的那一 个 leakingLoop 又必须依赖下一个,依此类推形成一条永远无法解析的 Promise 链。
function nonLeakingLoop () {
delay(1)
.then(() => {
console.log(`Tick ${Date.now()}`)
nonLeakingLoop()
})
}
上面的 nonLeakingLoop 不会依赖下一次调用的 nonLeakingLoop 所返回的 Promise,系统会多次安排垃圾收集器去回收用不到的内存,从而令内存用量降低,所以不会存在内存泄漏问题。
但这个函数不会把深层递归过程中所发生的错误播报出来,因为这些 Promise 的状态之间是没有联系的,前一个 Promise 不会因为后面的 Promise 发生错误而进入 rejected 状态
function nonLeakingLoopWithErrors () {
return new Promise((resolve, reject) => {
(function internalLoop () {
delay(1)
.then(() => {
console.log(`Tick ${Date.now()}`)
internalLoop()
})
.catch(err => {
reject(err)
})
})()
})
}
保证无论哪一层的异步操作发生错误,nonLeakingLoopWithErrors 函数所返回的 Promise 对象会遭到拒绝
async function nonLeakingLoopAsync () {
while (true) {
await delay(1)
console.log(`Tick ${Date.now()}`)
}
}
拥有递归效果,同时保证异步任务 delay 所抛出的错误,总能播报给最初调用函数的人