理解 stream 在 Node.js 平台中的重要作用

缓冲模式与流模式

  • buffer mode(缓冲模式):系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成后,系统会把这些数据当作一个整块,传回给调用方
  • 流模式:系统只要从资源方收到数据,就立即发给消费者,让它能够尽快处理这些数据

流模式的优点:

  1. 空间上占据的内存较少
  2. 时间上占用 CPU 时钟周期较少
  3. 便于组合

    流模式在空间占用方面的优势

    V8 引擎对缓冲区的尺寸是有限制的

    用缓冲模式的 API 把文件压缩成 GZIP 格式

    ```typescript import { promises as fs } from ‘fs’ import { gzip } from ‘zlib’ import { promisify } from ‘util’ const gzipPromise = promisify(gzip)

const filename = process.argv[2]

async function main () { const data = await fs.readFile(filename) const gzippedData = await gzipPromise(data) await fs.writeFile(${filename}.gz, gzippedData) console.log(‘File successfully compressed’) }

main()

  1. <a name="r9Dqa"></a>
  2. ### 用流模式的 API 把文件压缩成 GZIP 模式
  3. ```typescript
  4. import { createReadStream, createWriteStream } from 'fs'
  5. import { createGzip } from 'zlib'
  6. const filename = process.argv[2]
  7. createReadStream(filename)
  8. .pipe(createGzip())
  9. .pipe(createWriteStream(`${filename}.gz`))
  10. .on('finish', () => console.log('File successfully compressed'))

流模式在处理时间方面的优势

下面是一个利用流模式把文件压缩上传到远程的程序。
服务器端:

  1. import { createServer } from 'http'
  2. import { createWriteStream } from 'fs'
  3. import { createGunzip } from 'zlib'
  4. import { basename, join } from 'path'
  5. const server = createServer((req, res) => {
  6. const filename = basename(req.headers['x-filename'])
  7. const destFilename = join('received_files', filename)
  8. console.log(`File request received: ${filename}`)
  9. req
  10. .pipe(createGunzip())
  11. .pipe(createWriteStream(destFilename))
  12. .on('finish', () => {
  13. res.writeHead(201, { 'Content-Type': 'text/plain' })
  14. res.end('OK\n')
  15. console.log(`File saved: ${destFilename}`)
  16. })
  17. })
  18. server.listen(3000, () => console.log('Listening on http://localhost:3000'))

req 是个 stream 对象,每收到一小块数据,都可以立即压缩这块数据并将其写入磁盘。

服务器端的代码,会用 basename 来处理收到的这份文件名,以便将文件名里面的路径部分全部删去。从安全的角度看,这样做可以保证相关文件总能保存到我们自己的 received_files 目录下,而不会保存到别的什么地方。假如不用 basename 处理文件名,恶意用户就有可能专门构造一项请求,把服务器端的系统文件覆盖掉,从而给服务器注入恶意代码。比如,如果有人故意让 filename 所表示的文件名变成 /usr/bin/node,这种情况下,攻击者实际上可以让服务器的 node.js 解释器替换成任意文件。

客户端:

  1. import { request } from 'http'
  2. import { createGzip } from 'zlib'
  3. import { createReadStream } from 'fs'
  4. import { basename } from 'path'
  5. const filename = process.argv[2]
  6. const serverHost = process.argv[3]
  7. const httpRequestOptions = {
  8. hostname: serverHost,
  9. port: 3000,
  10. path: '/',
  11. method: 'PUT',
  12. headers: {
  13. 'Content-Type': 'application/octet-stream',
  14. 'Content-Encoding': 'gzip',
  15. 'X-Filename': basename(filename)
  16. }
  17. }
  18. const req = request(httpRequestOptions, (res) => {
  19. console.log(`Server response: ${res.statusCode}`)
  20. })
  21. createReadStream(filename)
  22. .pipe(createGzip())
  23. .pipe(req)
  24. .on('finish', () => {
  25. console.log('File successfully sent')
  26. })

处理一份文件经历的环节:

  1. 客户端从文件系统中读取数据
  2. 客户端压缩数据
  3. 客户端把数据发给服务器
  4. 服务器端从客户端接收数据
  5. 服务器端解压缩数据
  6. 服务器端把数据写入磁盘

无标题-2022-05-09-2210.png

  • 缓冲模式中,整个流程完全按先后顺序执行,下一个步骤的执行必须等待上一个步骤执行结束
  • 流模式下的生产线,只要收到第一块数据,就立刻开始运作。同时,只要下一块数据能够使用,它就立刻开启一条平行的生产线,并把那块数据放到那条生产线上处理,而不用等待前面的数据块处理完毕。因为对每块数据所做的处理都是异步任务,所以这些任务在 Node.js 平台里,完全能够平行地运行。

    stream 之间的组合

    组合方式可以将多个处理环节连接起来,使得每个环节只需要负责把一项功能实现好就行。 但是管道中的后一个 stream 对象,必须支持前一个 stream 对象所产生的那种数据。

stream 知识点

流对象的体系结构

流对象的四种基本抽象类:

  • Readable
  • Writable
  • Duplex
  • Transform

    每个 stream 类的对象,本身都是一个 EventEmitter 实例。所以,流对象实际上可以触发多种事件(end、finish、error 等)

流对象的操作模式:

  • 二进制模式(Binary mode):以 chunk(块)的形式串流数据,这种模式可以用来处理缓冲或字符串
  • 对象模式(Object mode):以对象序列的形式串流数据(这意味着基本能够处理任何一种 JavaScript 值)

    Readable 流(可读流)

    Readable 流表示的是数据源。在 Node.js 平台中,这样的流用 Readable 这个抽象类来实现,该类位于 stream 模块中。

    通过流对象读取数据

    non-flowing (非流动模式,也叫 paused — 暂停模式)

    从 Readable 流中读取数据的默认模式。
    该模式下,可以给流对象注册监听器,以监听 readable 事件,一旦发生这样的事件,说明有数据可以读取。此时,可以通过一个循环结构,反复读取数据,直到把内部缓冲区里的数据读完为止。这种读取操作,是通过 read() 方法(同步操作)实现的,该方法会从内部缓冲区里面同步地读取数据,并返回一个 Buffer 对象,以代表读到的这块数据。
    1. process.stdin
    2. .on('readable', () => {
    3. let chunk
    4. console.log('New data available')
    5. while ((chunk = process.stdin.read()) !== null) {
    6. console.log(
    7. `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
    8. )
    9. }
    10. })
    11. .on('end', () => console.log('End of stream'))
    我们还可以给 read () 方法传入 size 参数,以指定这次读取操作应该读取的数据量。这种做法在实现网络协议或解析特定格式的数据时很有用。

    如果 Readable 流在二进制模式运作,可以在流对象上调用 setEncoding(encoding)方法,并给 encoding 参数传入一种有效的编码格式,例如 utf8,这样就不用读取 Buffer 对象,可以直接读取字符串。 在面对 UTF-8 格式的文本数据时,使用这种方法读取,可以让流对象自动处理由多个字节所构成的字符,并适当地安排缓冲,以防止某个字符的那些字节,分别切割到两个数据块里面。即流对象在这种情况下所产生的每块数据,都是一条有效的 UTF-8 字节序列。 setEncoding 这个方法可以在同一个 Readable 流上面调用多次,即使已经开始从这个数据流中获取数据,也可以调用。流对象会自动切换编码,以处理接下来的数据块。 流里面的数据,本身没有所谓编码,只不过是一种二进制的数据,至于指定编码是指我们可以把流所产生的这些二进制数据,按照某一套标准解读成字符。

flowing (流动模式)

不通过 read () 方法提取数据,而是等着流对象把数据推送到 data 监听器里,只要流对象拿到数据,就会推送过来。

  1. process.stdin
  2. .on('data', (chunk) => {
  3. console.log('New data available')
  4. console.log(
  5. `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
  6. )
  7. })
  8. .on('end', () => console.log('End of stream'))

和流动模式相比,不能够灵活控制数据的流动情况。

异步迭代器

Readable 流本身是一种异步迭代器(async iterator)

  1. async function main () {
  2. for await (const chunk of process.stdin) {
  3. console.log('New data available')
  4. console.log(
  5. `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
  6. )
  7. }
  8. console.log('End of stream')
  9. }
  10. main()

实现自己的 Readable 流

  1. 必须从 stream 模块里继承 Readable 原型
  2. 必须在自己的这个具体类中,给 _read() 方法提供实现代码

readable._read(size)

  1. _read() 中必须通过 push () 操作,向内部缓冲区填入数据

readable.push(chunk)

read() 是给流对象的消费方使用的,而 _read() 方法则是我们在定制 stream 字类时必须自己实现的一个方法,这个方法不应该由消费方直接调用。 习惯上,如果某个方法的名称以下划线开头,说明该方法不对外开放。

  1. import { Readable } from 'stream'
  2. import Chance from 'chance'
  3. const chance = new Chance()
  4. export class RandomStream extends Readable {
  5. constructor (options) {
  6. super(options)
  7. this.emittedBytes = 0
  8. }
  9. _read (size) {
  10. // 利用 chance 库生成一个长度为 size 的随机字符串
  11. const chunk = chance.string({ length: size })
  12. // 把字符串推入内部缓冲区
  13. // 如果推入的是字符串,必须在第二个参数指定编码方案
  14. this.push(chunk, 'utf8')
  15. this.emittedBytes += chunk.length
  16. // 让这个流对象有百分之五的概率得以终止
  17. if (chance.bool({ likelihood: 5 })) {
  18. // 以 null 为参数,会给内部缓冲区推入 EOF (文件结束符),表示这条数据流至此结束
  19. this.push(null)
  20. }
  21. }
  22. }

options 参数本身是个对象,具有以下属性:

  • encoding 属性:表示流对象按照什么样的编码标准,把缓冲区中的数据转化成字符串。默认值为 null
  • objectMode 属性:标志对象模式是否启用,默认值为 false
  • highWaterMark 属性:表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源中读取数据了。默认值为 16KB

    调用 push () 的时候,应该检查返回值是不是 false,如果是,说明正在接收数据的这个流对象,其缓冲区中的数据量已经触碰了 highWaterMark 所表示的上限,这时候不应该继续往里面添加数据。

  1. import { RandomStream } from './random-stream.js'
  2. const randomStream = new RandomStream()
  3. randomStream
  4. .on('data', (chunk) => {
  5. console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
  6. })
  7. .on('end', () => {
  8. console.log(`Produced ${randomStream.emittedBytes} bytes of random data`)
  9. })

简化版的定制方案

把一个包含 read() 方法的对象传给 options 参数即可

  1. import { Readable } from 'stream'
  2. import Chance from 'chance'
  3. const chance = new Chance()
  4. let emittedBytes = 0
  5. const randomStream = new Readable({
  6. read (size) {
  7. const chunk = chance.string({ length: size })
  8. this.push(chunk, 'utf8')
  9. emittedBytes += chunk.length
  10. if (chance.bool({ likelihood: 5 })) {
  11. this.push(null)
  12. }
  13. }
  14. })
  15. randomStream
  16. .on('data', (chunk) => {
  17. console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
  18. })
  19. .on('end', () => {
  20. console.log(`Produced ${emittedBytes} bytes of random data`)
  21. })

用 iterable 做数据源以构建 Readable 流

  1. import { Readable } from 'stream'
  2. const mountains = [
  3. { name: 'Everest', height: 8848 },
  4. { name: 'K2', height: 8611 },
  5. { name: 'Kangchenjunga', height: 8586 },
  6. { name: 'Lhotse', height: 8516 },
  7. { name: 'Makalu', height: 8481 }
  8. ]
  9. const mountainsStream = Readable.from(mountains)
  10. mountainsStream.on('data', (mountain) => {
  11. console.log(`${mountain.name.padStart(14)}\t${mountain.height}m`)
  12. })

在这种情况下,就算我们通过 Readable 流来读取这个数组,也没办法发挥流对象在节省内存用量方面的优势,因为它所读取的数组,已经提前加载到内存里面了。 所以建议数据还是一块一块地加载比较好。在面对比较庞大的数据源时,应该使用原生的流方案,例如 fs.createReadStream,或是自己来定制流对象,就算要使用 Readable.from,也应该考虑拿那种惰性的(即等用到的时候再去获取的) iterable 当数据源,例如生成器,迭代器、异步迭代器等。

Writable 流(可写流)

Writable 流表示数据目标,即能够写入或容纳数据的地方。

向 stream 中写入数据

使用 write 方法
writable.write(chunk, [encoding], [callback])

  • encoding 可选,chunk 为 string 时,默认为 utf-8,chunk 为 Buffer 时,该参数会被系统忽略
  • callback 可选

让 Writable 流知道已经没有数据需要写入时,可以调用 end 方法
writable.end([chunk], [encoding], [callback])

  1. import { createServer } from 'http'
  2. import Chance from 'chance'
  3. const chance = new Chance()
  4. const server = createServer((req, res) => {
  5. res.writeHead(200, { 'Content-Type': 'text/plain' }) // ①
  6. while (chance.bool({ likelihood: 95 })) { // ②
  7. res.write(`${chance.string()}\n`) // ③
  8. }
  9. res.end('\n\n') // ④
  10. // 在系统把所有数据都写入底层 socket 时触发
  11. res.on('finish', () => console.log('All data sent'))
  12. })
  13. server.listen(8080, () => {
  14. console.log('listening on http://localhost:8080')
  15. })

backpressure (拥堵)

Node.js 平台的流,写入数据的速度可能比消耗数据的速度快,造成性能瓶颈或拥堵。
应对手段:流对象把写入的数据先放入缓冲区,让写入数据的用户如果不知道这种情况,还是持续写入数据,造成累积的数据持续增加。所以 writable.write 方法在内部缓冲区触碰 highWaterMark 上限时,返回 false,表明此时不应该继续写入数据。这套机制称为 backpressure (防拥堵机制)。
但这套机制不是强制性的,即用户仍可以继续写入数据。

  1. import { createServer } from 'http'
  2. import Chance from 'chance'
  3. const chance = new Chance()
  4. const server = createServer((req, res) => {
  5. res.writeHead(200, { 'Content-Type': 'text/plain' })
  6. function generateMore () { // ①
  7. while (chance.bool({ likelihood: 95 })) {
  8. const randomChunk = chance.string({ length: (16 * 1024) - 1 }) // ②
  9. const shouldContinue = res.write(`${randomChunk}\n`)
  10. // shouldContinue 为 false 时,说明内部缓冲区已满
  11. if (!shouldContinue) {
  12. console.log('back-pressure')
  13. return res.once('drain', generateMore)
  14. }
  15. }
  16. res.end('\n\n')
  17. }
  18. generateMore()
  19. res.on('finish', () => console.log('All data sent'))
  20. })
  21. server.listen(8080, () => {
  22. console.log('listening on http://localhost:8080')
  23. })

实现 Writable 流

继承 Writable 类,并实现 _write() 方法

  1. import { Writable } from 'stream'
  2. import { promises as fs } from 'fs'
  3. import { dirname } from 'path'
  4. import mkdirp from 'mkdirp-promise'
  5. export class ToFileStream extends Writable {
  6. constructor (options) {
  7. super({ ...options, objectMode: true })
  8. }
  9. _write (chunk, encoding, cb) {
  10. mkdirp(dirname(chunk.path))
  11. .then(() => fs.writeFile(chunk.path, chunk.content))
  12. .then(() => cb())
  13. .catch(cb)
  14. }
  15. }

调用超类的构造器,初始化相关的内部状态,将 objectMode 设置为 true 以便从默认的二进制模式切换到对象模式。Writable 还支持的选项:

  • highWaterMark:内部缓冲区的数据上限,默认为 16KB
  • decodeStrings:表示对象在把数据传给 _write 方法之前,如果发现数据是 String,需不需要先将数据转为二进制的 Buffer 再传。在对象模式下,系统会忽略这个选项。默认值为 true

    简化版的定制方案

    ```typescript import { Writable } from ‘stream’ import { promises as fs } from ‘fs’ import { dirname, join } from ‘path’ import mkdirp from ‘mkdirp-promise’

const tfs = new Writable({ objectMode: true, write (chunk, encoding, cb) { mkdirp(dirname(chunk.path)) .then(() => fs.writeFile(chunk.path, chunk.content)) .then(() => cb()) .catch(cb) } })

tfs.write({ path: join(‘files’, ‘file1.txt’), content: ‘Hello’ }) tfs.write({ path: join(‘files’, ‘file2.txt’), content: ‘Node.js’ }) tfs.write({ path: join(‘files’, ‘file3.txt’), content: ‘streams’ }) tfs.end(() => console.log(‘All files created’))

  1. <a name="AyDgZ"></a>
  2. ## Duplex 流(双工流/读写流)
  3. 既是 Readable 流,又是 Writable 流,可以描述那种既充当数据来源,又充当数据目标的实体,例如 network socket(网络套接字)。
  4. <a name="mS6hT"></a>
  5. ## Transform 流(传输流)
  6. > Transform 流(传输流)是一种特殊的 Duplex 流,专门用来转换数据。
  7. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/651859/1652453962484-70330a58-5b27-4708-a7a8-e8f681580404.png#clientId=u783c0bf1-27f1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=309&id=ua799d08c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=340&originWidth=654&originalType=binary&ratio=1&rotation=0&showTitle=false&size=25269&status=done&style=none&taskId=u37d2f473-6159-4839-a807-cbe73bbafce&title=&width=594.5454416590292)<br />对于简单的 Duplex 流,从中读取的数据与写入其中的数据之间没有直接联系,对于流对象本身,并不关心这两种数据之间有没有关系。<br />以 TCP socket 为例,它只需要知道自己可以给远端发送数据,并从远端接收数据即可,至于发出去的数据与收到的数据之间是什么关系,并不需要它来操心。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/651859/1652454028183-282ed77c-62fa-4fe2-9bc0-d853008b3334.png#clientId=u783c0bf1-27f1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=308&id=u26856317&margin=%5Bobject%20Object%5D&name=image.png&originHeight=339&originWidth=731&originalType=binary&ratio=1&rotation=0&showTitle=false&size=24615&status=done&style=none&taskId=u42c3493e-8e73-4584-b729-ce0933c79d0&title=&width=664.54544014182)<br />Transform 流是特殊的 Duplex 流,它会对自己从 Writable 端收到的每一块数据都做出转换,并让用户可以从 Readable 端读取到转换后的数据。
  8. <a name="Vz3V4"></a>
  9. ### 实现 Transform 流
  10. 定制一个 Transform 流,可以把数据里面的某一个字符串,全部替换成另一个字符串
  11. ```typescript
  12. import { Transform } from 'stream'
  13. export class ReplaceStream extends Transform {
  14. constructor (searchStr, replaceStr, options) {
  15. super({ ...options })
  16. this.searchStr = searchStr
  17. this.replaceStr = replaceStr
  18. this.tail = ''
  19. }
  20. // 类似于 Writable 流的 _write() 方法
  21. _transform (chunk, encoding, callback) {
  22. const pieces = (this.tail + chunk).split(this.searchStr)
  23. const lastPiece = pieces[pieces.length - 1]
  24. const tailLen = this.searchStr.length - 1
  25. this.tail = lastPiece.slice(-tailLen)
  26. pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
  27. // 将数据推送到内部缓冲区以供读取
  28. this.push(pieces.join(this.replaceStr))
  29. callback()
  30. }
  31. // 系统会在整条数据流即将结束时触发
  32. _flush (callback) {
  33. this.push(this.tail)
  34. callback()
  35. }
  36. }

简化版的定制方案

  1. import { Transform } from 'stream'
  2. const searchStr = 'World'
  3. const replaceStr = 'Node.js'
  4. let tail = ''
  5. const replaceStream = new Transform({
  6. defaultEncoding: 'utf8',
  7. transform (chunk, encoding, cb) {
  8. const pieces = (tail + chunk).split(searchStr)
  9. const lastPiece = pieces[pieces.length - 1]
  10. const tailLen = searchStr.length - 1
  11. tail = lastPiece.slice(-tailLen)
  12. pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
  13. this.push(pieces.join(replaceStr))
  14. cb()
  15. },
  16. flush (cb) {
  17. this.push(tail)
  18. cb()
  19. }
  20. })
  21. replaceStream.on('data', chunk => console.log(chunk.toString()))
  22. replaceStream.write('Hello W')
  23. replaceStream.write('orld!')
  24. replaceStream.end()

通过 Transform 流实现过滤(data filtering)与聚合(data aggregation)

  • Transform filter:有条件地调用 this.push,只让符合条件的数据进入管道的下一个环节
  • Streaming aggregation:在 _transform 里面处理数据,并把目前已经处理的这一部分结果累积起来,等到所有数据处理完之后,在 _flush 里面调用一次 this.push ,以推送最终的结果 ```typescript import { createReadStream } from ‘fs’ import { createGunzip } from ‘zlib’ import parse from ‘csv-parse’ import { FilterByCountry } from ‘./filter-by-country.js’ import { SumProfit } from ‘./sum-profit.js’

const csvParser = parse({ columns: true })

// A small difference from the code presented in the book is that // here we have gzipped the data to keep the download size of the repository // as small as possible. For this reason we added an extra step that decompresses // the data on the fly. The final result doesn’t change createReadStream(‘data.csv.gz’) .pipe(createGunzip()) .pipe(csvParser) .pipe(new FilterByCountry(‘Italy’)) .pipe(new SumProfit()) .pipe(process.stdout)

  1. ```typescript
  2. // filter-by-country.js
  3. import { Transform } from 'stream'
  4. export class FilterByCountry extends Transform {
  5. constructor (country, options = {}) {
  6. // objectMode 设为 true,说明处理的不是二进制,而是一个个对象
  7. options.objectMode = true
  8. super(options)
  9. this.country = country
  10. }
  11. _transform (record, enc, cb) {
  12. if (record.country === this.country) {
  13. // 数据进入到管道的下一个环节
  14. this.push(record)
  15. }
  16. cb()
  17. }
  18. }
  1. // sum-profit.js
  2. import { Transform } from 'stream'
  3. export class SumProfit extends Transform {
  4. constructor (options = {}) {
  5. options.objectMode = true
  6. super(options)
  7. this.total = 0
  8. }
  9. _transform (record, enc, cb) {
  10. this.total += Number.parseFloat(record.profit)
  11. cb()
  12. }
  13. _flush (cb) {
  14. // 在所有数据处理完时,进行数据转换,并推送出去
  15. this.push(this.total.toString())
  16. cb()
  17. }
  18. }

PassThrough 流

特殊的 Transform 流,不会对输出的数据块做任何转换
作用:

  • 观察数据的流动情况
  • 实现 late piping 与惰性的 stream 模式

    观察数据的流动情况

    构造一个 PassThrough 实例,针对 data 事件注册监听器,把这个实例安排在管道的某一点上 ```typescript import { PassThrough } from ‘stream’

let bytesWritten = 0 const monitor = new PassThrough() monitor.on(‘data’, (chunk) => { bytesWritten += chunk.length }) monitor.on(‘finish’, () => { console.log(${bytesWritten} bytes written) })

monitor.write(‘Hello!’) monitor.end()

  1. 实际使用上,不会直接给 monitor 写入数据,而是像下安排在管道的某一点上
  2. ```typescript
  3. createReadStream(filename)
  4. .pipe(createGzip())
  5. .pipe(monitor)
  6. .pipe(createWriteStream(`${filename}.gz`))

使用定制的 Transform 流也能实现相同的效果,但是你必须把收到的数据块按照原样尽快推送出去,即不能修改也不能延迟。而使用 PassThrough 方案,这一点系统会自行保证。

Late piping (先把流交给 API,然后再将其安排到管道中)

如果你想先用某个流对象把 API 调用起来,然后再从其中读取数据或是给其中写入数据,那么可以把一个 PassThrough 流交给这个 API,稍后再将这个 PassThrough 安排在管道中的相应位置上面。

  1. import axios from 'axios'
  2. export function upload (filename, contentStream) {
  3. return axios.post(
  4. 'http://localhost:3000',
  5. contentStream,
  6. {
  7. headers: {
  8. 'Content-Type': 'application/octet-stream',
  9. 'X-Filename': filename
  10. }
  11. }
  12. )
  13. }

如果想在文件上传前,对文件流做一些处理,如压缩或加密数据。或者想做数据转换操作,以异步方法安排 upload 函数调用起来后再去执行,怎么做?

  1. import { createReadStream } from 'fs'
  2. import { createBrotliCompress } from 'zlib'
  3. import { PassThrough } from 'stream'
  4. import { basename } from 'path'
  5. import { upload } from './upload.js'
  6. const filepath = process.argv[2] // ①
  7. const filename = basename(filepath)
  8. const contentStream = new PassThrough() // ②
  9. upload(`${filename}.br`, contentStream) // ③
  10. .then((response) => {
  11. console.log(`Server response: ${response.data}`)
  12. })
  13. .catch((err) => {
  14. console.error(err)
  15. process.exit(1)
  16. })
  17. createReadStream(filepath) // ④
  18. .pipe(createBrotliCompress())
  19. .pipe(contentStream)

创建一个 PassThrough 实例做占位符,等到把 API 调用起来后,可以将该实例安排在管道的末端, 让真正提供数据的那些流对象,最终能够把数据交给它,进而给 API 所取用。
这段代码在执行的时候,会调用 upload 函数,从而尽快启动上传流程,但直到我们把管道搭建好之后,数据才会流动到 upload 里面。我们构建的这条管道,会在数据处理结束后关闭,从而让 upload 函数知道,它所要上传的内容就是这些,没有其他内容需要上传了。
更加优雅的代码:

  1. import axios from 'axios'
  2. import { PassThrough } from 'stream'
  3. export function createUploadStream (filename) {
  4. const connector = new PassThrough()
  5. axios
  6. .post(
  7. 'http://localhost:3000',
  8. connector,
  9. {
  10. headers: {
  11. 'Content-Type': 'application/octet-stream',
  12. 'X-Filename': filename
  13. }
  14. }
  15. )
  16. .catch((err) => {
  17. connector.emit(err)
  18. })
  19. return connector
  20. }
  1. import { createReadStream } from 'fs'
  2. import { pipeline } from 'stream'
  3. import { basename } from 'path'
  4. import { createUploadStream } from './upload.js'
  5. const filepath = process.argv[2]
  6. const filename = basename(filepath)
  7. pipeline(
  8. createReadStream(filepath),
  9. createUploadStream(filename),
  10. (err) => {
  11. if (err) {
  12. console.error(err)
  13. process.exit(1)
  14. }
  15. console.log('File uploaded')
  16. }
  17. )

这段代码用 PassThrough 流做 connector(连接器),它会返回一个流对象,使得调用方可以根据自己的需要,来决定什么时候应该向这个流对象推送数据,从而让这些数据能够通过该对象,流入需要读取数据的那个函数。

lazy stream(惰性流)

一般来说,只要创建 stream 实例,就有可能促使系统执行一些开销很大的初始化操作(例如打开一份文件、开启一个 socket,或者开始与数据库建立连接等等),即使我们还没开始使用这个 stream,系统也会先做这些操作。这意味着,如果我们只是想先把多个流对象创建出来,后面再使用,系统执行的这些初始化操作也显得太早了。
遇到开销比较大的初始化操作,可以使用 lazystream 这个库针对实际的 stream 实例创建代理(proxy),只有当我们真正开始通过代理来使用数据的时候,才会把目标 stream 给创建出来。

用管道连接流对象

  1. import { ReplaceStream } from './replace-stream.js'
  2. process.stdin
  3. .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  4. .pipe(process.stdout)

流(尤其是文本流)是一种通用的接口,所有的流之间,几乎都能通过管道组合连接起来。
两个流通过 pipe 方法连接起来,会形成吸附(suction)效应,使得数据能够从 readable 流自动进入 writable 流,没必要再调用 read 或 write,不用再担心 backpressure(数据拥堵)问题,因为系统会自动处理。

处理管道中的错误

  1. import { createGzip, createGunzip } from 'zlib'
  2. import { Transform, pipeline } from 'stream'
  3. const uppercasify = new Transform({
  4. transform (chunk, enc, cb) {
  5. this.push(chunk.toString().toUpperCase())
  6. cb()
  7. }
  8. })
  9. pipeline(
  10. process.stdin,
  11. createGunzip(),
  12. uppercasify,
  13. createGzip(),
  14. process.stdout,
  15. (err) => {
  16. if (err) {
  17. console.error(err)
  18. process.exit(1)
  19. }
  20. }
  21. )

pipeline 工具函数会把参数列表中的这些流对象串联起来,并针对每个流对象都注册适当的 error 监听器与 close 监听器,这样子无论管道中哪个环节出现错误,系统都能够正确地销毁所有的流。最后一个 cb 参数会在整个管道结束时触发。

用 stream 实现异步控制流模式(asynchronous control flow)

顺序执行

默认情况下,stream 是按照先后顺序处理数据的。

利用一条或多条 stream 对象,很容易就能按先后顺序迭代一系列异步任务

  1. import { createWriteStream, createReadStream } from 'fs'
  2. import { Readable, Transform } from 'stream'
  3. export function concatFiles (dest, files) {
  4. return new Promise((resolve, reject) => {
  5. const destStream = createWriteStream(dest)
  6. Readable.from(files) // ①
  7. .pipe(new Transform({ // ②
  8. objectMode: true,
  9. transform (filename, enc, done) {
  10. const src = createReadStream(filename)
  11. src.pipe(destStream, { end: false })
  12. src.on('error', done)
  13. src.on('end', done) // ③
  14. }
  15. }))
  16. .on('error', reject)
  17. .on('finish', () => { // ④
  18. destStream.end()
  19. resolve()
  20. })
  21. })
  22. }

在通过 pipe() 方法串接的时候,要指定 { end: false } 选项,不让系统在这条 Readable 结束的时候,自动关掉串在它后面的 Writable 流

无序的平行执行

用 stream 实现无序的平行执行

  1. import { Transform } from 'stream'
  2. export class ParallelStream extends Transform {
  3. constructor (userTransform, opts) {
  4. super({ objectMode: true, ...opts })
  5. this.userTransform = userTransform
  6. this.running = 0
  7. this.terminateCb = null
  8. }
  9. _transform (chunk, enc, done) {
  10. this.running++
  11. this.userTransform(
  12. chunk,
  13. enc,
  14. this.push.bind(this),
  15. this._onComplete.bind(this) // 这个才是实际关闭本项任务的地方
  16. )
  17. // 通知 Transform 流,本次变换已经结束
  18. done()
  19. }
  20. // 系统在即将终止当前 stream 时触发该方法
  21. _flush (done) {
  22. if (this.running > 0) {
  23. this.terminateCb = done
  24. } else {
  25. done()
  26. }
  27. }
  28. // 每项异步任务完成时触发该方法
  29. _onComplete (err) {
  30. this.running--
  31. if (err) {
  32. return this.emit('error', err)
  33. }
  34. if (this.running === 0) {
  35. this.terminateCb && this.terminateCb()
  36. }
  37. }
  38. }

通过 ParallelStream 类可以创建出能够平行地执行任务的流。但没办法保证这些异步操作的完成顺序以及它们给这条 stream 推送执行结果的顺序,肯定与我们启动这些任务时所依照的顺序相同。

  • 二进制流不适合做平行处理,数据需要顺序性
  • 对象流适合某些场合

    无序且带有并发上限的平行执行模式

    大量连接会影响系统性能,为了控制负载量与资源使用量,必须给平行任务设定并发上限 ```typescript import { Transform } from ‘stream’

export class LimitedParallelStream extends Transform { constructor (concurrency, userTransform, opts) { super({ …opts, objectMode: true }) this.concurrency = concurrency // 表示并发上限 this.userTransform = userTransform this.running = 0 this.continueCb = null this.terminateCb = null }

_transform (chunk, enc, done) { this.running++ this.userTransform( chunk, enc, this.push.bind(this), this._onComplete.bind(this) ) if (this.running < this.concurrency) { done() // 没到并发数时,可以让系统继续安排下一个任务 } else { // 让正在运行的任务,在其运行完毕时尽快触发该 done this.continueCb = done } }

_flush (done) { if (this.running > 0) { this.terminateCb = done } else { done() } }

_onComplete (err) { this.running— if (err) { return this.emit(‘error’, err) }

  1. // 如果存在 continueCb,即说明到达上限,尽快触发
  2. const tmpCb = this.continueCb
  3. this.continueCb = null
  4. tmpCb && tmpCb()
  5. if (this.running === 0) {
  6. this.terminateCb && this.terminateCb()
  7. }

} }

  1. <a name="adFwx"></a>
  2. ## 有序的平行执行
  3. > 需要把每项任务所给到的数据先放到缓冲区里排列好
  4. <a name="Y8kGw"></a>
  5. # 管道模式
  6. <a name="MOqUT"></a>
  7. ## 组合 stream
  8. - 组合后的 stream,可以当成一个黑盒来使用,用户无需了解这个黑盒内部的管道,具体是怎么搭建起来的
  9. - 组合后的 stream,只需要挂接一个事件监听器,就可以处理整条管道中的错误,不用给管道中的每个流对象都注册这样的监听器
  10. ```typescript
  11. import { createGzip, createGunzip } from 'zlib'
  12. import { createCipheriv, createDecipheriv, scryptSync } from 'crypto'
  13. import pumpify from 'pumpify' // 能够把几个流组合成一个新的 stream
  14. function createKey (password) {
  15. return scryptSync(password, 'salt', 24)
  16. }
  17. export function createCompressAndEncrypt (password, iv) {
  18. const key = createKey(password)
  19. const combinedStream = pumpify(
  20. createGzip(),
  21. createCipheriv('aes192', key, iv)
  22. )
  23. combinedStream.iv = iv
  24. return combinedStream
  25. }
  26. export function createDecryptAndDecompress (password, iv) {
  27. const key = createKey(password)
  28. return pumpify(
  29. createDecipheriv('aes192', key, iv),
  30. createGunzip()
  31. )
  32. }

拆分 stream(fork 模式)

同一个 Readable 流,可以分别与多个 Writable 流相连,从而实现 fork(分支/拆分)操作。 让程序能够对同一份数据,分别做出不同的转换,或者根据某项标准,把源数据划分到不同的组里。

image.png

  1. // 让同一个程序生成多种校验和
  2. import { createReadStream, createWriteStream } from 'fs'
  3. import { createHash } from 'crypto'
  4. const filename = process.argv[2]
  5. const sha1Stream = createHash('sha1').setEncoding('hex')
  6. const md5Stream = createHash('md5').setEncoding('hex')
  7. const inputStream = createReadStream(filename)
  8. inputStream
  9. .pipe(sha1Stream)
  10. .pipe(createWriteStream(`${filename}.sha1`))
  11. inputStream
  12. .pipe(md5Stream)
  13. .pipe(createWriteStream(`${filename}.md5`))
  • inputStream 结束后,md5Stream 与 shalStream 也会自动结束
  • 两条流处理的是同一份数据,所以在其中一条中如果对数据执行了操作,产生了副作用,会影响到另外一条流
  • 数据从 inputStream 流出的速度,会跟分支里面最慢的那一支相同
  • 如果在各分支已经开始消耗源 stream 的数据之后,又给源 stream 挂接新的分支,称为异步构建管道(async piping),新挂接的分支,只能接收源 stream 在你挂接之后所产生的那些数据。如果你不想让新分支错过之前的数据,可以先挂一个 PassThrough 实例上去,把位置占住,这样就算其他分支想要开始从源 stream 中获取数据,也必须等这个 PassThrough 一起做才行,于是 PassThrough 背后那个真正的分支,就不会因为其他分支先开始处理而错过数据了。容易产生 backpressure 问题。

    合并 stream(merge 模式)

    merge (合并/归并)是与 fork (分支/拆分) 相反的操作,它把多个 Readable 流分别跟同一个 Writable 流拼接

image.png
需要注意的问题:end 事件的触发时机。系统默认采用 { end: true } 选项来拼接,只要任何一条源 stream 结束,系统会自动让目标 stream 也随之结束,这经常导致程序出错,因为如果还有其他的源 stream 尚未结束,那么这些 stream 可能会给已经终止的这个目标 stream 里面继续写入内容
解决办法:在把多个源 stream 分别与同一个目标 stream 相拼接时指定{ end: false } 选项,在把所有的源 stream 都读取完毕后,对目标 stream 明确调用 end() 方法

  1. import { createReadStream, createWriteStream } from 'fs'
  2. import split from 'split'
  3. const dest = process.argv[2]
  4. const sources = process.argv.slice(3)
  5. const destStream = createWriteStream(dest)
  6. let endCount = 0
  7. for (const source of sources) {
  8. const sourceStream = createReadStream(source, { highWaterMark: 16 })
  9. sourceStream.on('end', () => {
  10. if (++endCount === sources.length) {
  11. destStream.end()
  12. console.log(`${dest} created`)
  13. }
  14. })
  15. sourceStream
  16. .pipe(split((line) => line + '\n'))
  17. .pipe(destStream, { end: false })
  18. }

多路复用与解多路复用(mux/demux模式)

把多个源 stream 的数据纳入同一个共享通道(shared channel),该通道里面的数据,在逻辑上依然能够跟产生这块数据的源 stream 对应起来,等数据到达通道另一端的时候,将其重新分割,让每块数据都能够流到与源 stream 相对应的通道里面。
image.png

  • 多路复用(multiplexing):把多条 stream 组合到一起,让它们能够沿着一条 stream 传输数据
  • 解多路复用(demultiplexing):把同一条 stream 里面承载的数据,按照来源重新分流到不同的通道中

    构建一款远程的日志记录器

    该程序中的共享媒介是一条 TCP 连接,利用 packet switching(数据包交换)技术实现多路复用,这项技术会把有待传递的数据封装成 packer(数据包),同时让封装者能够在包里标注一些 meta information(元信息),以确定这个数据包的身份,从而正确地实现多路复用、路由、流向控制以及受损数据检测等功能。
    客户端 —— 多路复用 ```typescript import { fork } from ‘child_process’ import { connect } from ‘net’

function multiplexChannels (sources, destination) { let openChannels = sources.length

// 针对每个源 stream 为 readable 事件注册监听器 for (let i = 0; i < sources.length; i++) { sources[i] .on(‘readable’, function () { // ① let chunk while ((chunk = this.read()) !== null) {

  1. // 封装数据包,1个字节表示通道 ID,4个字节表示数据块大小
  2. const outBuff = Buffer.alloc(1 + 4 + chunk.length) // ②
  3. outBuff.writeUInt8(i, 0)
  4. outBuff.writeUInt32BE(chunk.length, 1)
  5. chunk.copy(outBuff, 5)
  6. console.log(`Sending packet to channel: ${i}`)
  7. destination.write(outBuff) // 写入目标通道
  8. }
  9. })
  10. .on('end', () => { // ④
  11. if (--openChannels === 0) {
  12. destination.end()
  13. }
  14. })

} }

const socket = connect(3000, () => { // ① const child = fork( // ② process.argv[2], process.argv.slice(3), { silent: true } // 让子进程不继承父进程的 stdout 与 stderr ) multiplexChannels([child.stdout, child.stderr], socket) // ③ })

  1. 服务端 —— 解多路复用
  2. ```typescript
  3. import { createWriteStream } from 'fs'
  4. import { createServer } from 'net'
  5. function demultiplexChannel (source, destinations) {
  6. let currentChannel = null
  7. let currentLength = null
  8. source
  9. .on('readable', () => { // ①
  10. let chunk
  11. // 没有通道 ID,读取 1 字节
  12. if (currentChannel === null) { // ②
  13. chunk = source.read(1)
  14. currentChannel = chunk && chunk.readUInt8(0)
  15. }
  16. if (currentLength === null) { // ③
  17. chunk = source.read(4)
  18. currentLength = chunk && chunk.readUInt32BE(0)
  19. if (currentLength === null) {
  20. return null
  21. }
  22. }
  23. chunk = source.read(currentLength) // ④
  24. if (chunk === null) {
  25. return null
  26. }
  27. console.log(`Received packet from: ${currentChannel}`)
  28. // 将读取到的数据写入与数据来源相对应的目标通道
  29. destinations[currentChannel].write(chunk) // ⑤
  30. currentChannel = null
  31. currentLength = null
  32. })
  33. .on('end', () => { // ⑥
  34. // 关闭源 stream 后,把所有的目标通道关闭
  35. destinations.forEach(destination => destination.end())
  36. console.log('Source channel closed')
  37. })
  38. }
  39. const server = createServer((socket) => {
  40. const stdoutStream = createWriteStream('stdout.log')
  41. const stderrStream = createWriteStream('stderr.log')
  42. demultiplexChannel(socket, [stdoutStream, stderrStream])
  43. })
  44. server.listen(3000, () => console.log('Server started'))

对象流的多路复用与解多路复用

处理对象流与其他流的最大区别在于,系统每次传输的是一个完整的对象,而不像文本流或二进制流那样,传输的有可能只是整套数据中的某个片段。
所以在处理对象流的多路复用时,只需要记录每个对象的 channel ID(通道 ID)即可,而不用记录该对象所对应的数据是多少字节。解多路复用时,只需根据这个 ID,把对象路由到正确的目标 stream 即可,不用优先考虑这个对象所对应的数据总量。

参考资料

  1. github 仓库