回调地狱

  1. import fs from 'fs'
  2. import path from 'path'
  3. import superagent from 'superagent'
  4. import mkdirp from 'mkdirp'
  5. import { urlToFilename } from './utils.js'
  6. export function spider (url, cb) {
  7. const filename = urlToFilename(url)
  8. fs.access(filename, err => { // [1]
  9. if (err && err.code === 'ENOENT') {
  10. console.log(`Downloading ${url} into ${filename}`)
  11. superagent.get(url).end((err, res) => { // [2]
  12. if (err) {
  13. cb(err)
  14. } else {
  15. mkdirp(path.dirname(filename), err => { // [3]
  16. if (err) {
  17. cb(err)
  18. } else {
  19. fs.writeFile(filename, res.text, err => { // [4]
  20. if (err) {
  21. cb(err)
  22. } else {
  23. cb(null, filename, true)
  24. }
  25. })
  26. }
  27. })
  28. }
  29. })
  30. } else {
  31. cb(null, filename, false)
  32. }
  33. })
  34. }

涉及回调的编程技巧与控制流模式

编写回调逻辑时所应遵循的原则

写异步函数的第一准则:不要滥用闭包
减少嵌套层级的原则:

  • 尽早退出。根据上下文,使用 return、continue 或 break 等关键字实现退出,而不要编写完整的 if…else 语句
  • 用带有名称的函数表示回调逻辑,并把这些函数写在闭包结构的外面,如果需要传递中间结果,通过参数传递
  • 提高代码的模块化程度。尽量把代码分成比较小且可复用的函数

    运用相关的原则编写回调

  1. 尽早返回原则(early return principle):移除 else 语句,在发现错误后立即返回

    1. if (err) {
    2. return cb(err);
    3. }
    4. // 没有出错的情况下应该执行的代码
  2. 将可以复用的代码抽离到单独的函数中 ```typescript function saveFile (filename, contents, cb) { mkdirp(path.dirname(filename), err => { if (err) { return cb(err) } fs.writeFile(filename, contents, cb) }) }

function download (url, filename, cb) { console.log(Downloading ${url}) superagent.get(url).end((err, res) => { if (err) { return cb(err) } saveFile(filename, res.text, err => { if (err) { return cb(err) } console.log(Downloaded and saved: ${url}) cb(null, res.text) }) }) }

export function spider (url, cb) { const filename = urlToFilename(url) fs.access(filename, err => { if (!err || err.code !== ‘ENOENT’) { // [1] return cb(null, filename, false) } download(url, filename, err => { if (err) { return cb(err) } cb(null, filename, true) }) }) }

  1. <a name="OuBXG"></a>
  2. ## sequential execution (顺序执行)
  3. <a name="cDV5W"></a>
  4. ### executing a known set of tasks in sequence (顺序执行一组固定的任务)
  5. 重点在于把每项任务都包装成一个单元
  6. ```typescript
  7. function asyncOperation (cb) {
  8. process.nextTick(cb)
  9. }
  10. function task1 (cb) {
  11. asyncOperation(() => {
  12. task2(cb)
  13. })
  14. }
  15. function task2 (cb) {
  16. asyncOperation(() => {
  17. task3(cb)
  18. })
  19. }
  20. function task3 (cb) {
  21. asyncOperation(() => {
  22. cb() // finally executes the callback
  23. })
  24. }
  25. task1(() => {
  26. // executed when task1, task2 and task3 are completed
  27. console.log('tasks 1, 2 and 3 executed')
  28. })

按先后顺序迭代集合中的元素(sequential iterator)

创建名为 iterator 的函数,以触发集合中的下一项任务,并确保此任务执行完毕时,会继续触发后面应该执行的那项任务

  1. const tasks = [
  2. (cb) => {
  3. console.log('Task 1')
  4. setTimeout(cb, 1000)
  5. },
  6. (cb) => {
  7. console.log('Task 2')
  8. setTimeout(cb, 1000)
  9. },
  10. (cb) => {
  11. console.log('Task 3')
  12. setTimeout(cb, 1000)
  13. }
  14. ]
  15. function iterate (index) {
  16. if (index === tasks.length) {
  17. return finish()
  18. }
  19. const task = tasks[index]
  20. task(() => iterate(index + 1))
  21. }
  22. function finish () {
  23. // iteration completed
  24. console.log('All tasks executed')
  25. }
  26. iterate(0)

如果 task () 是个同步操作,那么这种算法可能会陷入深层递归,因为此时程序不会像 task() 是异步操作时那样,立即从 iterate 中返回,而是会立即进入下一层递归,可能会让调用栈的深度超过系统的最大值。
如果是异步操作,系统会将 task 函数在异步逻辑里面所请求的下一轮迭代,安排到事件循环的下一个周期去执行,而不会让调用栈继续变深。

平行执行

【2022.04】第四章:利用回调实现异步控制流模式 - 图1
只有异步操作,才能在 node 中平行地运行,因为它们能够通过非阻塞式的 API 得到处理,从而形成并发的效果。Node 平台无法并发地执行同步操作(阻塞式),可以通过和异步穿插起来,或用 setTimeout() 与 setImmediate() 等手段安排此类操作异步地执行。

平行执行集合中的各项任务时所采用的模式

Unlimited parallel execution (数量不限的平行执行)模式 把所有任务全都启动起来,让这些任务平行地运行,并关注已经完成的任务数量,等所有任务都完成后触发回调

  1. function makeSampleTask (name) {
  2. return (cb) => {
  3. console.log(`${name} started`)
  4. setTimeout(() => {
  5. console.log(`${name} completed`)
  6. cb()
  7. }, Math.random() * 2000)
  8. }
  9. }
  10. const tasks = [
  11. makeSampleTask('Task 1'),
  12. makeSampleTask('Task 2'),
  13. makeSampleTask('Task 3'),
  14. makeSampleTask('Task 4')
  15. ]
  16. let completed = 0
  17. tasks.forEach(task => {
  18. task(() => {
  19. if (++completed === tasks.length) {
  20. finish()
  21. }
  22. })
  23. })
  24. function finish () {
  25. // all the tasks completed
  26. console.log('All tasks executed!')
  27. }

平行地运行多项任务时,可能会遇到数据竞争现象,即这些任务可能会争用外部资源(例如文件或数据库里的记录)
使用该模式:

  1. function spiderLinks (currentUrl, body, nesting, cb) {
  2. if (nesting === 0) {
  3. return process.nextTick(cb)
  4. }
  5. const links = getPageLinks(currentUrl, body)
  6. if (links.length === 0) {
  7. return process.nextTick(cb)
  8. }
  9. let completed = 0
  10. let hasErrors = false
  11. function done (err) {
  12. if (err) {
  13. hasErrors = true
  14. return cb(err)
  15. }
  16. if (++completed === links.length && !hasErrors) {
  17. return cb()
  18. }
  19. }
  20. links.forEach(link => spider(link, nesting - 1, done))
  21. }

设计 hasErrors 这个变量,用来记录这些平行执行的 spider 任务里面是否有哪个任务在处理过程中发生错误,因为需要在出现这种情况的时候,立即触发 cb 所表示的回调逻辑。另外,在所有 spider 任务都处理完毕的时候,还需要根据这个变量的值,来判断是否需要触发 cb,以免重复触发。

解决并发任务之间的数据争用问题

这种问题出现很频繁的根本原因,在于触发异步操作与获得操作结果之间,是有延迟的。

争用的例子:

  1. export function spider (url, nesting, cb) {
  2. const filename = urlToFilename(url)
  3. fs.readFile(filename, 'utf8', (err, fileContent) => {
  4. if (err) {
  5. if (err.code !== 'ENOENT') {
  6. return cb(err)
  7. }
  8. return download(url, filename, (err, requestContent) => {
  9. if (err) {
  10. return cb(err)
  11. }
  12. spiderLinks(url, requestContent, nesting, cb)
  13. })
  14. }
  15. spiderLinks(url, fileContent, nesting, cb)
  16. })
  17. }

在同一个 URL 上操作的两个爬虫任务可能会在两个任务中针对同一份文件调用 fs.readFile(),这两个任务在调用时都发现该文件不存在,所以会分别安排自己的下载任务。
【2022.04】第四章:利用回调实现异步控制流模式 - 图2
解决办法:

  1. const spidering = new Set();
  2. function spider(url, nesting, cb) {
  3. if (spidering.has(url)) {
  4. return process.nextTick(cb);
  5. }
  6. spidering.add(url);
  7. }

限制任务数量的平行执行

不进行限制可能会导致的问题:

  • 将进程能够使用的所有文件描述符用完
  • 遭受 Dos (denial-of-service,拒绝服务)攻击

    限制并发数量

    ``typescript function makeSampleTask (name) { return (cb) => { console.log(${name} started`) setTimeout(() => {
    1. console.log(`${name} completed`)
    2. cb()
    }, Math.random() * 2000) } }

const tasks = [ makeSampleTask(‘Task 1’), makeSampleTask(‘Task 2’), makeSampleTask(‘Task 3’), makeSampleTask(‘Task 4’), makeSampleTask(‘Task 5’), makeSampleTask(‘Task 6’), makeSampleTask(‘Task 7’) ]

const concurrency = 2 let running = 0 let completed = 0 let index = 0

function next () { // [1] while (running < concurrency && index < tasks.length) { const task = tasks[index++] task(() => { // [2] if (++completed === tasks.length) { return finish() } running— next() }) running++ } } next()

function finish () { // all the tasks completed console.log(‘All tasks executed!’) }

  1. <a name="fIYZc"></a>
  2. ### 限制整个程序的并发数量
  3. <a name="bfu5J"></a>
  4. #### 利用队列确保任务总数不突破全局上限
  5. ```typescript
  6. export class TaskQueue {
  7. constructor (concurrency) {
  8. this.concurrency = concurrency
  9. this.running = 0
  10. this.queue = []
  11. }
  12. pushTask (task) {
  13. this.queue.push(task)
  14. process.nextTick(this.next.bind(this))
  15. return this
  16. }
  17. next () {
  18. while (this.running < this.concurrency && this.queue.length) {
  19. const task = this.queue.shift()
  20. task(() => {
  21. this.running--
  22. process.nextTick(this.next.bind(this))
  23. })
  24. this.running++
  25. }
  26. }
  27. }

在通过 process.nextTick 异步调用 next 时,必须拿 bind 函数给那次调用绑定执行情景(上下文),不然等真正触发时,next 无法获取自身到底应该在什么情景下执行。

完善 TaskQueue 类

将 TaskQueue 变成 EventEmitter 类,具备广播相应事件的能力,使得在任务失败或队列已经清空的时候通知用户

  1. import { EventEmitter } from 'events'
  2. export class TaskQueue extends EventEmitter {
  3. constructor (concurrency) {
  4. super()
  5. this.concurrency = concurrency
  6. this.running = 0
  7. this.queue = []
  8. }
  9. pushTask (task) {
  10. this.queue.push(task)
  11. process.nextTick(this.next.bind(this))
  12. return this
  13. }
  14. next () {
  15. if (this.running === 0 && this.queue.length === 0) {
  16. return this.emit('empty')
  17. }
  18. while (this.running < this.concurrency && this.queue.length) {
  19. const task = this.queue.shift()
  20. task((err) => {
  21. if (err) {
  22. this.emit('error', err)
  23. }
  24. this.running--
  25. process.nextTick(this.next.bind(this))
  26. })
  27. this.running++
  28. }
  29. }
  30. }
  1. import { TaskQueue } from './TaskQueue.js'
  2. function makeSampleTask (name) {
  3. return (cb) => {
  4. console.log(`${name} started`)
  5. setTimeout(() => {
  6. console.log(`${name} completed`)
  7. cb()
  8. }, Math.random() * 2000)
  9. }
  10. }
  11. const queue = new TaskQueue(2)
  12. queue.on('error', console.error)
  13. queue.on('empty', () => console.log('Queue drained'))
  14. // A task that spawns other 2 sub tasks
  15. function task1 (cb) {
  16. console.log('Task 1 started')
  17. queue
  18. .pushTask(makeSampleTask('task1 -> subtask 1'))
  19. .pushTask(makeSampleTask('task1 -> subtask 2'))
  20. setTimeout(() => {
  21. console.log('Task 1 completed')
  22. cb()
  23. }, Math.random() * 2000)
  24. }
  25. // A task that spawns other 3 sub tasks
  26. function task2 (cb) {
  27. console.log('Task 2 started')
  28. queue
  29. .pushTask(makeSampleTask('task2 -> subtask 1'))
  30. .pushTask(makeSampleTask('task2 -> subtask 2'))
  31. .pushTask((done) => done(new Error('Simulated error')))
  32. .pushTask(makeSampleTask('task2 -> subtask 3'))
  33. setTimeout(() => {
  34. console.log('Task 2 completed')
  35. cb()
  36. }, Math.random() * 2000)
  37. }
  38. queue
  39. .pushTask(task1)
  40. .pushTask(task2)

参考资料

  1. github 仓库
  2. 笔记