文件可读流

文件可读流的创建和消费

  • 流动模式切换至暂停模式
    • rs.pause() 暂停
    • rs.resume() 开始 ```javascript const fs = require(“fs”);

let rs = fs.createReadStream(“test.txt”, { flags: “r”, // 当前以什么方式对文件操作 encoding: null, // 设为null,返回值为buffer fd: null, // 0 1 2已经被标准的输入,输出,错误占用了 mode: 438, // 十进制 438 autoClose: true, // 是否是自动关闭文件 start: 0, // end: 3, highWaterMark: 4, // 水位线 表示每次要读多少字节的数据,当前缓冲区里面的字节,读完之后会调用_read重新拿取 });

/ 流动模式切换至暂停模式 rs.pause() 暂停 rs.resume() 开始 / // 流动模式 // rs.on(“data”, (chunk) => { // console.log(chunk.toString()); // rs.pause() // 暂停 // setTimeout(() => { // rs.resume() // 开始 // }, 1000); // });

// 暂停模式 rs.on(“readable”, () => { // 数据准备完成之后就会告诉下面,缓存中已经有数据了 // let data = rs.read() // console.log(data); // 最后会输出null

let data; // 不然就变成了全局变量了 // 使用while循环 read()可以设置读取字节数 while ((data = rs.read(2)) !== null) { console.log(data.toString()); console.log(rs._readableState.length); } });

  1. <a name="e3C87"></a>
  2. #### 文件可读流常见事件与应用
  3. - open 监听事件是否被打开
  4. - data 监听数据
  5. - close 监听事件是否被关闭,需要消费才执行
  6. - end 清空数据之后,在close关闭之前执行,通常在end被触发时,读取数据
  7. - error 错误信息
  8. ```javascript
  9. const fs = require("fs");
  10. let rs = fs.createReadStream("test.txt", {
  11. flags: "r", // 当前以什么方式对文件操作
  12. encoding: null, // 设为null,返回值为buffer
  13. fd: null, // 0 1 2已经被标准的输入,输出,错误占用了
  14. mode: 438, // 十进制 438
  15. autoClose: true, // 是否是自动关闭文件
  16. start: 0,
  17. // end: 3,
  18. highWaterMark: 4
  19. });
  20. // open 监听事件是否被打开
  21. rs.on('open', fd => {
  22. console.log(fd, '文件打开了'); // fd: 3
  23. })
  24. // close 监听事件是否被关闭,需要消费才执行
  25. rs.on('close', () => {
  26. console.log('文件关闭了');
  27. })
  28. let bufferArr = []
  29. rs.on('data', chunk => {
  30. console.log(chunk);
  31. bufferArr.push(chunk)
  32. })
  33. // end在close之前执行
  34. // 在end被触发时,读取数据
  35. rs.on('end', () => {
  36. let buffer = Buffer.concat(bufferArr).toString()
  37. console.log(buffer);
  38. console.log('当数据被清空之后');
  39. })
  40. // error 错误信息
  41. rs.on('error', (err) => {
  42. console.log(err, '出错了');
  43. })

文件可写流

  • 继承了Writeable和EventEmitter类,通过fs创建使用 ```javascript const fs = require(“fs”);

const ws = fs.createWriteStream(“test.txt”, { flags: “w”, mode: 438, fd: null, // 从3开始 encoding: “utf-8”, start: 0, highWaterMark: 3, // 一个汉字,三个字节 });

let buf = Buffer.from(“abc”);

// write消耗数据,在一个文件中,重复执行会叠加 // 常见的异步操作,同步顺序执行 // 文件可写流,是对readable的重新实现和继承 // 字符串 或者 buffer ===> fs rs ws.write(buf,() => { console.log(‘数据写完了’); })

ws.write(‘五一过去啦111’,() => { console.log(‘数据写完了1’); })

// open 事件 ws.on(“open”, (fd) => { console.log(“open” + fd); // 3 });

ws.write(“1”);

// close事件 是在数据写入操作全部完成之后再执行 ws.on(“close”, () => { console.log(“close文件关闭了”); });

// end 执行之后就意味着数据写入操作完成(可传参,默认不传,把缓存区内容清空) // ws.end();

// 如果最后想写入数据,将数据写入end参数传入就OK了 ws.end(‘最后添加的数据’) // 类型:字符串+buffer

// 写入完成之后再次写入,报错 // ws.write(“2”);

// error ws.on(“error”, (err) => { console.log(“出错了”); });

  1. <a name="hkwEc"></a>
  2. ### write 执行流程
  3. - flag输出有 true 有false
  4. - 生产速度过快
  5. ```javascript
  6. const fs = require("fs");
  7. let ws = fs.createWriteStream("test.txt", {
  8. highWaterMark: 3, // 一个中文三个字节
  9. });
  10. let flag;
  11. flag = ws.write("1");
  12. console.log(flag); // true
  13. flag = ws.write("2");
  14. console.log(flag); // true
  15. // 如果flag为false,并不是说明当前数据不能被执行写入
  16. // flag 控制上游数据产量问题
  17. flag = ws.write("3");
  18. console.log(flag); // false

write.png

drain与写入速度

  • pipe方法 ```javascript / 需求: “五一假期”写入指定的文件 01 一次性写入 02 分批写入 对比: 对内存友好 /

let fs = require(“fs”); let ws = fs.createWriteStream(“test.txt”,{ highWaterMark: 3 // 显示中文,1中文===3个字节 });

// 一次性写入 只有第一次写不完的,才往缓存里面放 // ws.write(‘五一假期’)

// 拆开 let source = “五一假期”.split(‘’)

// 定义信号量 let num = 0 let flag = true

function executeWrite(){ flag = true while(num !== 4 && flag) { flag = ws.write(source[num]) num++ } } executeWrite()

ws.on(‘drain’,() => { console.log(‘drain执行了’); executeWrite() // 重新开启 })

<a name="vIcJo"></a>
### 背压机制

-  Nodejs的stream已实现了背压机制

-  读取磁盘中的速度远远大于写入磁盘的速度
-  消费者的速度,跟不上生产者的速度
-  **来不及消费的内容会缓存在队列中,而队列是有内存大小的,这时候需要背压机制**
-  如果不实现背压机制,会造成内存溢出,GC频繁调用,其他进程变慢

数据读写时可能存在一些问题
```javascript
const fs = require('fs')

let rs = fs.createReadStream('text.txt')
let ws = fs.createWriteStream('text1.txt')

// 读取磁盘中的速度远远大于写入磁盘的速度
// 消费者的速度,跟不上生产者的速度
// 来不及消费的内容会缓存在队列中,而队列是有内存大小的,这时候需要背压机制
// 如果不实现背压机制,会造成内存溢出,GC频繁调用,其他进程变慢
rs.on('data',chunk => {
  ws.write(chunk)
})

可读流流动模式3.png
写操作.png

  • 如果生产者超过了消费者,write会返回false,给生产者
  • 等消费者将数据消费的差不多,会触发drain事件,告诉生产者,可以生产了

    背压机制代码

    ```javascript const fs = require(‘fs’)

let rs = fs.createReadStream(‘test.txt’,{ highWaterMark: 4 // 水位线,默认是64kb, 文件可写流是16kb , 4: 1 })

let ws = fs.createWriteStream(‘test1.txt’,{ highWaterMark: 1
})

let flag = true

// 1. 除非想拿到每一个数据,进行处理 rs.on(‘data’,chunk => { flag = ws.write(chunk,() => { console.log(‘写完了’); }) // console.log(flag);// false 数据已超限,希望停一停 if(!flag){ rs.pause() // 暂停 } })

// 有新的空间可以接纳数据了 ws.on(‘drain’,() => { rs.resume() })

// 实际中对数据进行整体处理,使用pipe方法 rs.pipe(ws)
```