https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/

stream的原型方法pipe()

背景:

以流的方式响应数据最核心的实现就是使用 pipe 方法来实现的输入、输出

参数 readable.pipe(destination[, options])

https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options

  1. readerStream.pipe(writerStream);

例如压缩(边读边写)

  1. var fs = require("fs");
  2. var zlib = require('zlib');
  3. // 压缩 input.txt 文件为 input.txt.gz
  4. fs.createReadStream('input.txt')
  5. .pipe(zlib.createGzip())
  6. .pipe(fs.createWriteStream('input.txt.gz'));

支持链式调用

v8.x之后独立函数 stream.pipeline(streams, callback)

https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback

  1. const { pipeline } = require('stream');
  2. const fs = require('fs');
  3. const zlib = require('zlib');
  4. pipeline(
  5. fs.createReadStream('archive.tar'),
  6. zlib.createGzip(),
  7. fs.createWriteStream('archive.tar.gz'),
  8. (err) => { }
  9. );

原理

https://cloud.tencent.com/developer/article/1630068

  1. 可读流监听data事件
    1. 首先判断dest.Writable,当写完时会赋值为false
    2. 此时如果消费者消费速度慢,这时产生了一个现象,叫做背压。背压问题即是外部的生产者和消费者速度差造成的,此时我们需要暂停写入.pause()
  2. 可写流监听drain事件
    1. 消费者完成消费,可以触发drain事件,此时我们可以继续向流中写入数据。执行.resume()
  3. 触发pipe事件,通知有流写入
  4. 返回dest流,可以进行链式调用

简单实现(手写)
https://github.com/nodejs/node/blob/v14.x/lib/internal/streams/readable.js#L645

  1. stream.prototype.pipe = function(dest, options) {
  2. this.on('data', (chunk) => {
  3. if (dest.writable) {
  4. if (false === dest.write(chunk) && this.pause) {
  5. this.pause();
  6. }
  7. }
  8. });
  9. dest.on('drain', () => {
  10. this.resume();
  11. });
  12. dest.emit('pipe', this);
  13. return dest;
  14. };