双工流就是同时实现了 Readable 和 Writable 的流,即可以作为上游生产数据,又可以作为下游消费数据,这样可以处于数据流动管道的中间部分,即

  1. rs.pipe(rws1).pipe(rws2).pipe(rws3).pipe(ws);

在 NodeJS 中双工流常用的有两种

  1. Duplex
  2. Transform

    Duplex

    双工流 - 图1

    实现 Duplex

    和 Readable、Writable 实现方法类似,实现 Duplex 流非常简单,但 Duplex 同时实现了 Readable 和 Writable, NodeJS 不支持多继承,所以我们需要继承 Duplex 类

  3. 继承 Duplex 类

  4. 实现 _read() 方法
  5. 实现 _write() 方法

    相信看过前面章节后对 _read()、_write() 方法的实现不会陌生,和 Readable、Writable 完全一样

自定义 Duplex 有三种写法

  1. const { Duplex } = require('stream');
  2. class MyDuplex extends Duplex {
  3. constructor(options) {
  4. super(options);
  5. // ...
  6. }
  7. }
  1. const { Duplex } = require('stream');
  2. const util = require('util');
  3. function MyDuplex(options) {
  4. if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  5. Duplex.call(this, options);
  6. }
  7. util.inherits(MyDuplex, Duplex);
  1. const { Duplex } = require('stream')
  2. const myDuplex = new Duplex({
  3. read(size) {
  4. // ...
  5. },
  6. write(chunk, encoding, callback) {
  7. // ...
  8. }
  9. })

构造函数参数

Duplex 实例内同时包含可读流和可写流,在实例化 Duplex 类的时候可以传递几个参数

  • readableObjectMode <Boolean>: 可读流是否设置为 ObjectMode,默认 false
  • writableObjectMode <Boolean>: 可写流是否设置为 ObjectMode,默认 false
  • allowHalfOpen <Boolean>: 默认 true, 设置成 false 的话,当写入端结束的时,流会自动的结束读取端

小例子

了解了 Readable 和 Writable 之后看 Duplex 非常简单,直接用一个官网的例子

当然这是不能执行的伪代码,但是 Duplex 的作用可见一斑,进可以生产数据,又可以消费数据,所以才可以处于数据流动管道的中间环节,Node.js 中常见的 Duplex 流有

  • Tcp Scoket
  • Zlib
  • Crypto ```javascript const Duplex = require(‘stream’).Duplex; const kSource = Symbol(‘source’);

class MyDuplex extends Duplex { constructor(source, options) { super(options); this[kSource] = source; }

_write(chunk, encoding, callback) { // The underlying source only deals with strings if (Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); }

_read(size) { this[kSource].fetchSomeData(size, (data, encoding) => { this.push(Buffer.from(data, encoding)); }); } }

  1. <a name="zwqZG"></a>
  2. # Transform
  3. Transform 同样是双工流,看起来和 Duplex 重复了,但两者有一个重要的区别:
  4. 1. Duplex 虽然同时具备可读流和可写流,但两者是相对独立的;
  5. 2. Transform 的可读流的数据会经过一定的处理过程自动进入可写流
  6. 虽然会从可读流进入可写流,但并不意味这两者的数据量相同,上面说的一定的处理逻辑会决定如果 tranform 可读流,然后放入可写流,transform 原义即为转变,很贴切的描述了 Transform 流作用
  7. 最常见的压缩、解压缩用的 zlib 即为 Transform 流,压缩、解压前后的数据量明显不同,而流的作用就是输入一个 zip 包,输出一个解压文件或反过来。我们平时用的大部分双工流都是 Transform。<br />![](https://cdn.nlark.com/yuque/0/2022/webp/1174243/1649303465291-848ba3e2-bd93-4e49-aa34-e229176bd47c.webp#clientId=uc8662fb2-967c-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=ua853aa89&margin=%5Bobject%20Object%5D&originHeight=664&originWidth=1280&originalType=url&ratio=1&rotation=0&showTitle=false&status=done&style=none&taskId=ued18ec03-eefa-4b40-bec7-7a56ad4b3fc&title=)
  8. <a name="TdgGc"></a>
  9. ## 实现 Transform
  10. Tranform 类内部继承了 Duplex 并实现了 `writable.write()` 和 `readable._read()` 方法,自定义一个 Transform 流,只需要三个步骤
  11. 1. 继承 Transform 类
  12. 2. 实现 _transform() 方法
  13. 3. 实现 _flush() 方法(可以不实现)
  14. :::warning
  15. `_transform(chunk, encoding, callback)` 方法用来接收数据,并产生输出,参数我们已经很熟悉了,和 Writable 一样, chunk 默认是 Buffer,除非 decodeStrings 被设置为 false
  16. 在 `_transform()` 方法内部可以调用 `this.push(data)` 生产数据,交给可写流,也可以不调用,意味着输入不会产生输出
  17. 当数据处理完了必须调用 `callback(err, data)`,第一个参数用于传递错误信息,第二个参数可以省略,如果被传入了,效果和 `this.push(data)` 一样
  18. :::
  19. ```haskell
  20. transform.prototype._transform = function (data, encoding, callback) {
  21. this.push(data);
  22. callback();
  23. };
  24. transform.prototype._transform = function (data, encoding, callback) {
  25. callback(null, data);
  26. };

有些时候,transform 操作可能需要在流的最后多写入可写流一些数据。例如, Zlib流会存储一些内部状态,以便优化压缩输出。在这种情况下,可以使用 _flush() 方法,它会在所有写入数据被消费、触发 end之前被调用

自定义 Duplex 有三种写法

  1. const { Transform } = require('stream');
  2. class MyTransform extends Transform {
  3. constructor(options) {
  4. super(options);
  5. // ...
  6. }
  7. }
  1. const { Transform } = require('stream');
  2. const util = require('util');
  3. function MyTransform(options) {
  4. if (!(this instanceof MyTransform)) return new MyTransform(options)
  5. Transform.call(this, options);
  6. }
  7. util.inherits(MyTransform, Transform);
  1. const { Transform } = require('stream');
  2. const myTransform = new Transform({
  3. transform(chunk, encoding, callback) {
  4. // ...
  5. }
  6. });

Transform 的事件

Transform 流有两个常用的事件

  1. 来自 Writable 的 finish
  2. 来自 Readable 的 end

当调用 transform.end() 并且数据被 _transform() 处理完后会触发 finish,调用_flush后,所有的数据输出完毕,触发 end 事件

回顾初始 Stream 的例子

如果有个需求,把本地一个 package.json 文件中的所有字母都改为大写,并保存到同目录下的 package-upper.json 文件下

这时候就需要用到双向的流了,假定有一个专门处理字符转大写的流 toUppercase

  1. const fs = require('fs')
  2. const rs = fs.createReadStream('./package.json')
  3. const ws = fs.createWriteStream('./package-upper.json')
  4. const { Transform } = require('stream');
  5. // 所有转换流也是双工流。
  6. const toUpperCase = new Transform({
  7. writableObjectMode: true,
  8. transform(chunk, encoding, callback) {
  9. const str = cahunk.toString()
  10. const source = JSON.parse(str)
  11. const data = {}
  12. for (let key of Object.keys(source)) {
  13. const val = source[key]
  14. key = key[0].toUpperCase() + key.substring(1)
  15. data[key] = val
  16. }
  17. // 将数据推送到可读队列中。
  18. callback(null, JSON.stringify(data));
  19. }
  20. });
  21. rs.pipe(toUpperCase).pipe(ws)
  1. const fs = require('fs')
  2. const rs = fs.createReadStream('./package.json')
  3. const ws = fs.createWriteStream('./package-upper.json')
  4. const { Duplex } = require('stream');
  5. class ToUpperCase extends Duplex {
  6. constructor(options) {
  7. super(options)
  8. this.source = Buffer.alloc(0)
  9. }
  10. _read(size) {
  11. }
  12. _write(chunk, encoding, callback) {
  13. this.source = Buffer.concat([this.source, chunk])
  14. const str = this.source.toString()
  15. const source = JSON.parse(str)
  16. const data = {}
  17. for (let key of Object.keys(source)) {
  18. const val = source[key]
  19. key = key[0].toUpperCase() + key.substring(1)
  20. data[key] = val
  21. }
  22. this.push(JSON.stringify(data))
  23. callback()
  24. }
  25. }
  26. const toUpperCase = new ToUpperCase()
  27. rs.pipe(toUpperCase).pipe(ws)