TCP数据粘包

  • 通信包含数据发送端和接收端
  • 发送端累积数据统一发送
  • 接收端缓冲数据之后再消费

好处:减少IO操作带来的性能消耗
坏处:数据使用问题会产生粘包问题

问题: 数据是放在缓冲中的,那么在什么样的条件下,他才会去开始执行发送呢?

  • TCP拥塞机制决定发送时机

客户端写入多条数据,打印数据
粘包.png

  1. const net = require("net");
  2. // 创建服务端实例
  3. const server = net.createServer();
  4. const PORT = 1234;
  5. const HOST = "localhost";
  6. server.listen(PORT, HOST);
  7. server.on("listening", () => {
  8. console.log(`服务端已经开启${HOST}:${PORT}`);
  9. });
  10. // 接收消息,回写消息 双工流
  11. server.on("connection", (socket) => {
  12. socket.on("data", (chunk) => {
  13. const msg = chunk.toString();
  14. console.log(msg);
  15. // 给客户端回数据 可写流
  16. socket.write(Buffer.from("您好" + msg));
  17. });
  18. });
  19. server.on("close", () => {
  20. console.log("服务端关闭了");
  21. });
  22. server.on("error", (err) => {
  23. if (err.code == "EADDRINUSE") {
  24. console.log("地址正在被使用");
  25. } else {
  26. console.log(err);
  27. }
  28. });
  1. const net = require("net");
  2. const client = net.createConnection({
  3. port: 1234,
  4. host: "127.0.0.1",
  5. });
  6. let dataArr = [
  7. '上海1',
  8. '上海2',
  9. '上海3',
  10. '上海4',
  11. '上海5',
  12. ]
  13. // 可读流
  14. client.on('connect',() => {
  15. // client.write('上海1')
  16. // client.write('上海2')
  17. // client.write('上海3')
  18. // client.write('上海4')
  19. // client.write('上海5')
  20. for(let i = 0; i< dataArr.length;i++){
  21. (function(val, index){
  22. setTimeout(() => {
  23. client.write(val)
  24. },1000*(index+1))
  25. })(dataArr[i],i)
  26. }
  27. })
  28. client.on('data', chunk => {
  29. console.log(chunk.toString());
  30. })
  31. client.on('error',(err) => {
  32. console.log(err);
  33. })
  34. client.on('close',() => {
  35. console.log('客户端断开连接');
  36. })

数据的封包与拆包

  • 对数据进行打包,避免客户端连续多次发送数据时,产生粘包的现象
  • 自定义包的约束规则,在body前面拼接上header,在里面定义包的编号和内容长度
  • 自定义规则实现包的编码和解码

消息分为,定长的信息头和不定长的消息体
封包拆包.png

数据传输过程

  • 进行数据编码,获取二进制数据包
  • 按规则拆解数据,获取指定长度的数据

    Buffer数据读写

  • writeInt16BE: 将value从指定位置写入

  • readInt16BE:从指定位置开始读取数据

    代码实现

    ```javascript class MyTransformCode { constructor() { this.packageHeaderLen = 4; this.serialNum = 0; // 数据包的编号 this.serialLen = 2; // 包的长度 }

    // 编码 encode(data, serialNum) { // data 是非二进制数据,使用buffer转换成二进制数据 const body = Buffer.from(data);

    // 01. 先按照指定的长度申请内存空间,作为header使用 const headerBuf = Buffer.alloc(this.packageHeaderLen);

    // 02. 如果传了,就使用他自己的,没有传使用自己定义的 headerBuf.writeInt16BE(serialNum || this.serialNum);

    // 跳过序列号 headerBuf.writeInt16BE(body.length, this.serialLen);

    // 如果没有传包的编号 if (serialNum == undefined) {

    1. this.serialNum++;

    } return Buffer.concat([headerBuf, body]); }

    // 解码 decode(buffer) { const headerBuf = buffer.slice(0, this.packageHeaderLen); // 从开始截取到头部长度位置 const bodyBuf = buffer.slice(this.packageHeaderLen); // 跳过头部截取 return {

    serialNum: headerBuf.readInt16BE(), // 把头部编号读取出来
    bodyLength: headerBuf.readInt16BE(this.serialLen),
    body: bodyBuf.toString(),
    

    }; }

    // 获取当前数据包长度 getPackageLen(buffer) { // 如果包长度不满4,不应该从里面拿数据 if (buffer.length < this.packageHeaderLen) {

    return 0;
    

    } else{

      // 返回实际的长度, 头部的长度+buffer的长度
      return this.packageHeaderLen + buffer.readInt16BE(this.serialLen)
    

    } } }

module.exports = MyTransformCode

```javascript
const MyTransformCode = require('./myTransform.js')

let ts = new MyTransformCode()
let str1 = '上海1'

// 参数二是指定的编号
console.log(Buffer.from(str1));


// const a = ts.encode(str1, 1)
// console.log(a);

let encodeBuf = ts.encode(str1, 1)
let a = ts.decode(encodeBuf)
console.log(a); // { serialNum: 1, bodyLength: 7, body: '上海1' }

let len = ts.getPackageLen(encodeBuf)
console.log(len); // 11个字节

使用封包拆包解决TCP粘包完整代码

const net = require("net");
const MyTransformCode = require("./myTransform.js");

// 创建服务端实例
const server = net.createServer();

let overageBuffer = null;
let ts = new MyTransformCode();

const PORT = 1234;
const HOST = "localhost";
server.listen(PORT, HOST);

server.on("listening", () => {
  console.log(`服务端已经开启${HOST}:${PORT}`);
});

// 接收消息,回写消息  双工流
server.on("connection", (socket) => {
  socket.on("data", (chunk) => {
    const msg = chunk.toString();

    // 如果有数据,表示上次有数据未处理完
    if (overageBuffer) {
      chunk = Buffer.concat([overageBuffer, chunk]);
    }

    let packageLen = 0; // 存放当前数据包的长度
    // 如果等于0,代表不是一个完整的数据包,或者说不足以读取
    while ((packageLen = ts.getPackageLen(chunk))) {
      const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容
      // 把上面截取过的数据从chunk中拿掉
      chunk = chunk.slice(packageLen);

      // 把客户度传过来的数据包解码
      const ret = ts.decode(packageCon);
      console.log(ret);
      // 给客户端回数据   可写流
      socket.write(ts.encode(ret.body, ret.serialNum));
    }
    // 并不能保证数据处理干净,如果没有被拿完,下一次再拿
    overageBuffer = chunk
  });
});

server.on("close", () => {
  console.log("服务端关闭了");
});

server.on("error", (err) => {
  if (err.code == "EADDRINUSE") {
    console.log("地址正在被使用");
  } else {
    console.log(err);
  }
});
const net = require("net");
const MyTransformCode = require('./myTransform.js')
let overageBuffer = null;
let ts = new MyTransformCode();


const client = net.createConnection({
  port: 1234,
  host: "127.0.0.1",
});


// 可读流
client.on("connect", () => {
  client.write(ts.encode('上海1'));
  client.write(ts.encode('上海1'));
  client.write(ts.encode('上海1'));
  client.write(ts.encode('上海1'));
  client.write(ts.encode('上海1'));
  client.write(ts.encode('上海1'));
  client.write(ts.encode('上海1'));
});

client.on("data", (chunk) => {
  if (overageBuffer) {
    chunk = Buffer.concat([overageBuffer, chunk]);
  }

  let packageLen = 0; // 存放当前数据包的长度
  // 如果等于0,代表不是一个完整的数据包,或者说不足以读取
  while ((packageLen = ts.getPackageLen(chunk))) {
    const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容
    // 把上面截取过的数据从chunk中拿掉
    chunk = chunk.slice(packageLen);
    const ret = ts.decode(packageCon);
    console.log(ret); 
  }
  // 并不能保证数据处理干净,如果没有被拿完,下一次再拿
  overageBuffer = chunk
  console.log(chunk.toString());
});

client.on("error", (err) => {
  console.log(err);
});

client.on("close", () => {
  console.log("客户端断开连接");
});

粘包处理2.png