流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)

  • Node.js诞生之初就是为了提高IO性能(解决IO密集型问题)
  • Node.js内置Stream,实现了流操作对象
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP服务器request和response对象都是流,文件操作系统网络模块实现了流接口

    应用程序中为什么使用流来处理数据?

  • 场景:观看高清电影

看电影流0.png

常见问题

  • 同步读取资源文件,用户需要等待数据读取完成
  • 资源文件最终一次性加载至内存,开销较大(V8提供的内存1.4G)64位约1.4G,32位约0.7G
  • 实现数据缓冲,和数据操作

看电影流1.png
看电影流2.png

流处理数据的优势

  • 时间效率:流的分段处理可以同时操作多个数据chunk
  • 空间效率:同一时间流无须占据大内存空间
  • 使用方便:流配合管理,扩展程序变的简单

    Node.js中流的分类

  • Readable:可读流,能够实现数据的读取 (例如 fs.createReadStream())

  • Writeable:可读流,能够实现数据的写操作 (例如 fs.createWriteStream())
  • Duplex:双工流, 既可读又可写 (例如 net.Socket)
  • Tranform:转换流,可读可写,还能实现数据转换 (例如 zlib.createDeflate())

    Node.js流特点

  • Stream模块实现了四个具体的抽象

  • 所有流都继承自EventEmitter(可以去基于发布订阅的模式,让他们可以发布数据的读写事件,事件循环监控事件时机) ```javascript // 拷贝文件 const fs = require(‘fs’) let rs = fs.createReadStream(‘./test.txt’) let ws = fs.createWriteStream(‘./test1.txt’)

// 复制 rs.pipe(ws)

  1. <a name="k5znn"></a>
  2. ### 可读流(生产数据)
  3. - 是生产供程序消费数据的流(数据源)
  4. ```javascript
  5. const fs = require('fs')
  6. // 创建了一个可读流
  7. const rs = fs.createReadStream('00-note.txt')
  8. // 管道操作,传递给process.stdout
  9. rs.pipe(process.stdout)

可读流事件

  1. 常用操作:监听以下两个事件
  • readable事件:当流中存在可读取数据时触发
  • data事件:当流中数据块传给消费者后触发
  1. 两种读取模式:
  • 流动模式:数据会从底层系统读取,并通过 EventEmitter 尽快的将数据传递给所注册的事件处理程序中
  • 暂停模式:在这种模式下将不会读取数据,必须显示的调用 Stream.read () 方法来从流中读取数据
  1. 三种状态:
  • readableFlowing === null:不会产生数据,调用 Stream.pipe ()、Stream.resume 会使其状态变为 true,开始产生数据并主动触发事件
  • readableFlowing === false:此时会暂停数据的流动,但不会暂停数据的生成,因此会产生数据积压
  • readableFlowing === true:正常产生和消耗数据
  1. _read 方法是把数据存在缓存区中,因为是异步的,流是通过readable事件来通知消耗方的。
  • 说明一下,流中维护了一个缓存,当缓存中的数据足够多时,调用read()不会引起_read()的调用,即不需要向底层请求数据。state.highWaterMark是给缓存大小设置的一个上限阈值。如果取走n个数据后,缓存中保有的数据不足这个量,便会从底层取一次数据

    自定义可读流

  • 继承stream里的Readable

  • 重写_read方法调用push产出数据 ```javascript const { Readable } = require(“stream”);

// 模拟底层数据 let source = [“lg”, “zce”, “syy”];

// 自定义类继承 Readable

class MyReadable extends Readable { constructor(source) { super(); this.source = source; }

_read() { // 对数据进行添加的操作,操作完之后往source里面传一个null,这样底层才知道读完了 // push(null)结束推送 let data = this.source.shift() || null; this.push(data); // push方法是继承Readable readable.push } }

// 实例化 let myReadable = new MyReadable(source);

// 暂停模式 所有的流操作继承了EventEmitter myReadable.on(“readable”, () => { // console.log(1); // 打印了两次1, // 默认的是暂停模式, let data = null; // read(2)可以传入指定的数据长度 while ((data = myReadable.read(2)) !== null) { console.log(data.toString()); } });

// 流动模式 myReadable.on(‘data’, (chunk) => { // 不需要往缓存中放,直接读取了 console.log(chunk.toString()); })

<a name="IRG1e"></a>
#### 自定义可读流问题

- 底层数据读取完成之后如何处理?  读取完之后可以给source传递一个null值,让底层知道读取完了
- 消费者如何获取可读流中的数据?
- 消费数据为什么存在二种方式?流动模式,暂停模式(满足不同的场景)

![自定义可读流2.png](https://cdn.nlark.com/yuque/0/2022/png/22628793/1651661379023-00ebdaea-6215-4a1b-82e9-cf2af19074f0.png#clientId=ue213fed4-7dd7-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u558280b8&margin=%5Bobject%20Object%5D&name=%E8%87%AA%E5%AE%9A%E4%B9%89%E5%8F%AF%E8%AF%BB%E6%B5%812.png&originHeight=423&originWidth=984&originalType=binary&ratio=1&rotation=0&showTitle=false&size=105853&status=done&style=none&taskId=u6ae83c2e-e200-447d-9ac5-30b2abd9782&title=)
<a name="VVwN3"></a>
#### 可读流总结

- 明确数据生产与消费流程
- 利用API实现自定义的可读流
- 明确数据消费的事件使用
<a name="lVeQt"></a>
### 可写流(消费数据)

- 用于消费数据的流
```javascript
const fs = require('fs')

// 创建一个可读流,生产数据
let rs = fs.createReadStream('test.txt')

// 修改字符编码,便于后续使用
rs.setEncoding('utf-8')

// 创建一个可写流,消费数据
let ws = fs.createWriteStream('02test.txt')

// 监听事件调用方法完成具体的消费
rs.on('data',(chunk) => {
  // 写入数据
  ws.write(chunk)
})

自定义可写流

  • 继承stream模块的Writeable
  • 重写_write方法,调用write执行写入 ```javascript const { Writable } = require(“stream”); class MyWriteable extends Writable{ constructor(){ super() } // 参数:执行写入的数据,当前写入的编码集,回调 _write(chunk, en ,done){ process.stdout.write(chunk.toString() + ‘<——‘) process.nextTick(done) // 放入异步,让在同步代码之后再执行回调 } }

let myWriteable = new MyWriteable()

myWriteable.write(‘早上好啊’,’utf-8’,() => { console.log(‘end’); })

<a name="da4Ru"></a>
#### 可写流事件

- pipe事件:可读流调用pipe()方法时触发(数据的到来)
- unpipe事件:可读流调用unpipe()方法时触发(数据的切断)
<a name="NnoSX"></a>
### 双工流和转换流(Duplex && Transform)

- 可读、可写、双工、转换是单一抽象具体实现
- 流操作的核心功能就是处理数据
- Nodejs诞生的初衷就是解决密集型IO事务
- Nodejs中处理数据模块继承了流和EventEmitter
- stream、四种类型流、实现流操作的模块
<a name="lhPUK"></a>
#### Duplex双工流(实现了Readable和Writable)

- 既能生产又能消费
<a name="zx7JE"></a>
#### 自定义双工流

1. 继承Duplex类
1. 重写_read方法,调用push生产数据
1. 重写_write方法,调用write消费数据
```javascript
const { Duplex } = require('stream')

// 继承Duplex,重写_read和_wirte方法
class MyDuplex extends Duplex{
  constructor(source){
    super() // 父类的构造方法执行起来
    this.source = source
  }

  _read(){
    // 模拟从底层读取的数据,读取完,传null
    let data = this.source.shift() || null
    this.push(data) // push(null)结束推送
  }

  // 参数:操作的数据,字符编码,回调
  _write(chunk, en, next){
    // 消耗数据,放在可写流写出
    process.stdout.write(chunk)
    // 在上面执行完之后执行回调,这里采用异步操作
    process.nextTick(next)
  }
}

let source = ['a','b','c']
let myDuplex = new MyDuplex(source)
// myDuplex.on('data', (chunk) => {
//   console.log(chunk.toString());
// })

myDuplex.write('早上好',() => {
  console.log(111);
})

Transform转换流,也是双工流

  • 继承Transform类
  • 重写_transform方法,调用push和callback
  • 重写_flush 方法,处理剩余数据

    自定义转换流

    ```javascript const { Transform } = require(‘stream’)

class MyTransform extends Transform{ constructor(){ super() }

_transform(chunk, en, cb){ // 放在可读流里面 push方法是继承Readable readable.push this.push(chunk.toString().toUpperCase()) cb(null) // 如果要把原始的数据给进去,加chunk } }

let t = new MyTransform() // 可读写 t.write(‘a’)

t.on(‘data’, (chunk) => { console.log(chunk.toString()); }) ```

Duplex和Transform的区别

  1. Duplex的读和写是相互独立的,读操作的数据不能被写操作直接当作数据源使用的
  2. Transform是可以的,转换流的底层将读写操作进行了连通,同时转换流还可以对相应的数据进行转换操作(相应:自己定义实现)

    Nodejs中四种流

  3. Readable 可读流(生成数据的,常用:监听readable和data事件,Readable是需要我们主动调用read方法来消耗数据的,data就是一个流动模式,他可以一直去读,当然流动和暂停是可以切换的)

  4. Writeable 可写流(专门消费数据的流,主要是调用write方法,然后再把数据源里的数据写入指定的位置)
  5. Duplex 双工流(可读可写,但是两者是独立的)
  6. Transform转换流(可读可写,中间可以添加数据的转换操作,可写流和可读流是打通的,非常利用在中间管道传输中做状态处理)

    推荐大佬文章

    本文章只是作为初学者了解Node流模块,如果深入,请看下面大佬文章和官方文档,总结极全
    Node.js 流源码解读之可读流