回调地狱
import fs from 'fs'
import path from 'path'
import superagent from 'superagent'
import mkdirp from 'mkdirp'
import { urlToFilename } from './utils.js'
export function spider (url, cb) {
const filename = urlToFilename(url)
fs.access(filename, err => { // [1]
if (err && err.code === 'ENOENT') {
console.log(`Downloading ${url} into ${filename}`)
superagent.get(url).end((err, res) => { // [2]
if (err) {
cb(err)
} else {
mkdirp(path.dirname(filename), err => { // [3]
if (err) {
cb(err)
} else {
fs.writeFile(filename, res.text, err => { // [4]
if (err) {
cb(err)
} else {
cb(null, filename, true)
}
})
}
})
}
})
} else {
cb(null, filename, false)
}
})
}
涉及回调的编程技巧与控制流模式
编写回调逻辑时所应遵循的原则
写异步函数的第一准则:不要滥用闭包
减少嵌套层级的原则:
- 尽早退出。根据上下文,使用 return、continue 或 break 等关键字实现退出,而不要编写完整的 if…else 语句
- 用带有名称的函数表示回调逻辑,并把这些函数写在闭包结构的外面,如果需要传递中间结果,通过参数传递
- 提高代码的模块化程度。尽量把代码分成比较小且可复用的函数
运用相关的原则编写回调
尽早返回原则(early return principle):移除 else 语句,在发现错误后立即返回
if (err) {
return cb(err);
}
// 没有出错的情况下应该执行的代码
将可以复用的代码抽离到单独的函数中 ```typescript function saveFile (filename, contents, cb) { mkdirp(path.dirname(filename), err => { if (err) { return cb(err) } fs.writeFile(filename, contents, cb) }) }
function download (url, filename, cb) {
console.log(Downloading ${url}
)
superagent.get(url).end((err, res) => {
if (err) {
return cb(err)
}
saveFile(filename, res.text, err => {
if (err) {
return cb(err)
}
console.log(Downloaded and saved: ${url}
)
cb(null, res.text)
})
})
}
export function spider (url, cb) { const filename = urlToFilename(url) fs.access(filename, err => { if (!err || err.code !== ‘ENOENT’) { // [1] return cb(null, filename, false) } download(url, filename, err => { if (err) { return cb(err) } cb(null, filename, true) }) }) }
<a name="OuBXG"></a>
## sequential execution (顺序执行)
<a name="cDV5W"></a>
### executing a known set of tasks in sequence (顺序执行一组固定的任务)
重点在于把每项任务都包装成一个单元
```typescript
function asyncOperation (cb) {
process.nextTick(cb)
}
function task1 (cb) {
asyncOperation(() => {
task2(cb)
})
}
function task2 (cb) {
asyncOperation(() => {
task3(cb)
})
}
function task3 (cb) {
asyncOperation(() => {
cb() // finally executes the callback
})
}
task1(() => {
// executed when task1, task2 and task3 are completed
console.log('tasks 1, 2 and 3 executed')
})
按先后顺序迭代集合中的元素(sequential iterator)
创建名为 iterator 的函数,以触发集合中的下一项任务,并确保此任务执行完毕时,会继续触发后面应该执行的那项任务
const tasks = [
(cb) => {
console.log('Task 1')
setTimeout(cb, 1000)
},
(cb) => {
console.log('Task 2')
setTimeout(cb, 1000)
},
(cb) => {
console.log('Task 3')
setTimeout(cb, 1000)
}
]
function iterate (index) {
if (index === tasks.length) {
return finish()
}
const task = tasks[index]
task(() => iterate(index + 1))
}
function finish () {
// iteration completed
console.log('All tasks executed')
}
iterate(0)
如果 task () 是个同步操作,那么这种算法可能会陷入深层递归,因为此时程序不会像 task() 是异步操作时那样,立即从 iterate 中返回,而是会立即进入下一层递归,可能会让调用栈的深度超过系统的最大值。
如果是异步操作,系统会将 task 函数在异步逻辑里面所请求的下一轮迭代,安排到事件循环的下一个周期去执行,而不会让调用栈继续变深。
平行执行
只有异步操作,才能在 node 中平行地运行,因为它们能够通过非阻塞式的 API 得到处理,从而形成并发的效果。Node 平台无法并发地执行同步操作(阻塞式),可以通过和异步穿插起来,或用 setTimeout() 与 setImmediate() 等手段安排此类操作异步地执行。
平行执行集合中的各项任务时所采用的模式
Unlimited parallel execution (数量不限的平行执行)模式 把所有任务全都启动起来,让这些任务平行地运行,并关注已经完成的任务数量,等所有任务都完成后触发回调
function makeSampleTask (name) {
return (cb) => {
console.log(`${name} started`)
setTimeout(() => {
console.log(`${name} completed`)
cb()
}, Math.random() * 2000)
}
}
const tasks = [
makeSampleTask('Task 1'),
makeSampleTask('Task 2'),
makeSampleTask('Task 3'),
makeSampleTask('Task 4')
]
let completed = 0
tasks.forEach(task => {
task(() => {
if (++completed === tasks.length) {
finish()
}
})
})
function finish () {
// all the tasks completed
console.log('All tasks executed!')
}
平行地运行多项任务时,可能会遇到数据竞争现象,即这些任务可能会争用外部资源(例如文件或数据库里的记录)
使用该模式:
function spiderLinks (currentUrl, body, nesting, cb) {
if (nesting === 0) {
return process.nextTick(cb)
}
const links = getPageLinks(currentUrl, body)
if (links.length === 0) {
return process.nextTick(cb)
}
let completed = 0
let hasErrors = false
function done (err) {
if (err) {
hasErrors = true
return cb(err)
}
if (++completed === links.length && !hasErrors) {
return cb()
}
}
links.forEach(link => spider(link, nesting - 1, done))
}
设计 hasErrors 这个变量,用来记录这些平行执行的 spider 任务里面是否有哪个任务在处理过程中发生错误,因为需要在出现这种情况的时候,立即触发 cb 所表示的回调逻辑。另外,在所有 spider 任务都处理完毕的时候,还需要根据这个变量的值,来判断是否需要触发 cb,以免重复触发。
解决并发任务之间的数据争用问题
这种问题出现很频繁的根本原因,在于触发异步操作与获得操作结果之间,是有延迟的。
争用的例子:
export function spider (url, nesting, cb) {
const filename = urlToFilename(url)
fs.readFile(filename, 'utf8', (err, fileContent) => {
if (err) {
if (err.code !== 'ENOENT') {
return cb(err)
}
return download(url, filename, (err, requestContent) => {
if (err) {
return cb(err)
}
spiderLinks(url, requestContent, nesting, cb)
})
}
spiderLinks(url, fileContent, nesting, cb)
})
}
在同一个 URL 上操作的两个爬虫任务可能会在两个任务中针对同一份文件调用 fs.readFile(),这两个任务在调用时都发现该文件不存在,所以会分别安排自己的下载任务。
解决办法:
const spidering = new Set();
function spider(url, nesting, cb) {
if (spidering.has(url)) {
return process.nextTick(cb);
}
spidering.add(url);
}
限制任务数量的平行执行
不进行限制可能会导致的问题:
- 将进程能够使用的所有文件描述符用完
- 遭受 Dos (denial-of-service,拒绝服务)攻击
限制并发数量
``typescript function makeSampleTask (name) { return (cb) => { console.log(
${name} started`) setTimeout(() => {
}, Math.random() * 2000) } }console.log(`${name} completed`)
cb()
const tasks = [ makeSampleTask(‘Task 1’), makeSampleTask(‘Task 2’), makeSampleTask(‘Task 3’), makeSampleTask(‘Task 4’), makeSampleTask(‘Task 5’), makeSampleTask(‘Task 6’), makeSampleTask(‘Task 7’) ]
const concurrency = 2 let running = 0 let completed = 0 let index = 0
function next () { // [1] while (running < concurrency && index < tasks.length) { const task = tasks[index++] task(() => { // [2] if (++completed === tasks.length) { return finish() } running— next() }) running++ } } next()
function finish () { // all the tasks completed console.log(‘All tasks executed!’) }
<a name="fIYZc"></a>
### 限制整个程序的并发数量
<a name="bfu5J"></a>
#### 利用队列确保任务总数不突破全局上限
```typescript
export class TaskQueue {
constructor (concurrency) {
this.concurrency = concurrency
this.running = 0
this.queue = []
}
pushTask (task) {
this.queue.push(task)
process.nextTick(this.next.bind(this))
return this
}
next () {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift()
task(() => {
this.running--
process.nextTick(this.next.bind(this))
})
this.running++
}
}
}
在通过 process.nextTick 异步调用 next 时,必须拿 bind 函数给那次调用绑定执行情景(上下文),不然等真正触发时,next 无法获取自身到底应该在什么情景下执行。
完善 TaskQueue 类
将 TaskQueue 变成 EventEmitter 类,具备广播相应事件的能力,使得在任务失败或队列已经清空的时候通知用户
import { EventEmitter } from 'events'
export class TaskQueue extends EventEmitter {
constructor (concurrency) {
super()
this.concurrency = concurrency
this.running = 0
this.queue = []
}
pushTask (task) {
this.queue.push(task)
process.nextTick(this.next.bind(this))
return this
}
next () {
if (this.running === 0 && this.queue.length === 0) {
return this.emit('empty')
}
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift()
task((err) => {
if (err) {
this.emit('error', err)
}
this.running--
process.nextTick(this.next.bind(this))
})
this.running++
}
}
}
import { TaskQueue } from './TaskQueue.js'
function makeSampleTask (name) {
return (cb) => {
console.log(`${name} started`)
setTimeout(() => {
console.log(`${name} completed`)
cb()
}, Math.random() * 2000)
}
}
const queue = new TaskQueue(2)
queue.on('error', console.error)
queue.on('empty', () => console.log('Queue drained'))
// A task that spawns other 2 sub tasks
function task1 (cb) {
console.log('Task 1 started')
queue
.pushTask(makeSampleTask('task1 -> subtask 1'))
.pushTask(makeSampleTask('task1 -> subtask 2'))
setTimeout(() => {
console.log('Task 1 completed')
cb()
}, Math.random() * 2000)
}
// A task that spawns other 3 sub tasks
function task2 (cb) {
console.log('Task 2 started')
queue
.pushTask(makeSampleTask('task2 -> subtask 1'))
.pushTask(makeSampleTask('task2 -> subtask 2'))
.pushTask((done) => done(new Error('Simulated error')))
.pushTask(makeSampleTask('task2 -> subtask 3'))
setTimeout(() => {
console.log('Task 2 completed')
cb()
}, Math.random() * 2000)
}
queue
.pushTask(task1)
.pushTask(task2)