TCP数据粘包
- 通信包含数据发送端和接收端
- 发送端累积数据统一发送
- 接收端缓冲数据之后再消费
好处:减少IO操作带来的性能消耗
坏处:数据使用问题会产生粘包问题
问题: 数据是放在缓冲中的,那么在什么样的条件下,他才会去开始执行发送呢?
- TCP拥塞机制决定发送时机
客户端写入多条数据,打印数据
const net = require("net");// 创建服务端实例const server = net.createServer();const PORT = 1234;const HOST = "localhost";server.listen(PORT, HOST);server.on("listening", () => {console.log(`服务端已经开启${HOST}:${PORT}`);});// 接收消息,回写消息 双工流server.on("connection", (socket) => {socket.on("data", (chunk) => {const msg = chunk.toString();console.log(msg);// 给客户端回数据 可写流socket.write(Buffer.from("您好" + msg));});});server.on("close", () => {console.log("服务端关闭了");});server.on("error", (err) => {if (err.code == "EADDRINUSE") {console.log("地址正在被使用");} else {console.log(err);}});
const net = require("net");const client = net.createConnection({port: 1234,host: "127.0.0.1",});let dataArr = ['上海1','上海2','上海3','上海4','上海5',]// 可读流client.on('connect',() => {// client.write('上海1')// client.write('上海2')// client.write('上海3')// client.write('上海4')// client.write('上海5')for(let i = 0; i< dataArr.length;i++){(function(val, index){setTimeout(() => {client.write(val)},1000*(index+1))})(dataArr[i],i)}})client.on('data', chunk => {console.log(chunk.toString());})client.on('error',(err) => {console.log(err);})client.on('close',() => {console.log('客户端断开连接');})
数据的封包与拆包
- 对数据进行打包,避免客户端连续多次发送数据时,产生粘包的现象
- 自定义包的约束规则,在body前面拼接上header,在里面定义包的编号和内容长度
- 自定义规则实现包的编码和解码
数据传输过程
- 进行数据编码,获取二进制数据包
-
Buffer数据读写
writeInt16BE: 将value从指定位置写入
-
代码实现
```javascript class MyTransformCode { constructor() { this.packageHeaderLen = 4; this.serialNum = 0; // 数据包的编号 this.serialLen = 2; // 包的长度 }
// 编码 encode(data, serialNum) { // data 是非二进制数据,使用buffer转换成二进制数据 const body = Buffer.from(data);
// 01. 先按照指定的长度申请内存空间,作为header使用 const headerBuf = Buffer.alloc(this.packageHeaderLen);
// 02. 如果传了,就使用他自己的,没有传使用自己定义的 headerBuf.writeInt16BE(serialNum || this.serialNum);
// 跳过序列号 headerBuf.writeInt16BE(body.length, this.serialLen);
// 如果没有传包的编号 if (serialNum == undefined) {
this.serialNum++;
} return Buffer.concat([headerBuf, body]); }
// 解码 decode(buffer) { const headerBuf = buffer.slice(0, this.packageHeaderLen); // 从开始截取到头部长度位置 const bodyBuf = buffer.slice(this.packageHeaderLen); // 跳过头部截取 return {
serialNum: headerBuf.readInt16BE(), // 把头部编号读取出来bodyLength: headerBuf.readInt16BE(this.serialLen),body: bodyBuf.toString(),
}; }
// 获取当前数据包长度 getPackageLen(buffer) { // 如果包长度不满4,不应该从里面拿数据 if (buffer.length < this.packageHeaderLen) {
return 0;
} else{
// 返回实际的长度, 头部的长度+buffer的长度return this.packageHeaderLen + buffer.readInt16BE(this.serialLen)
} } }
module.exports = MyTransformCode
```javascriptconst MyTransformCode = require('./myTransform.js')let ts = new MyTransformCode()let str1 = '上海1'// 参数二是指定的编号console.log(Buffer.from(str1));// const a = ts.encode(str1, 1)// console.log(a);let encodeBuf = ts.encode(str1, 1)let a = ts.decode(encodeBuf)console.log(a); // { serialNum: 1, bodyLength: 7, body: '上海1' }let len = ts.getPackageLen(encodeBuf)console.log(len); // 11个字节
使用封包拆包解决TCP粘包完整代码
const net = require("net");const MyTransformCode = require("./myTransform.js");// 创建服务端实例const server = net.createServer();let overageBuffer = null;let ts = new MyTransformCode();const PORT = 1234;const HOST = "localhost";server.listen(PORT, HOST);server.on("listening", () => {console.log(`服务端已经开启${HOST}:${PORT}`);});// 接收消息,回写消息 双工流server.on("connection", (socket) => {socket.on("data", (chunk) => {const msg = chunk.toString();// 如果有数据,表示上次有数据未处理完if (overageBuffer) {chunk = Buffer.concat([overageBuffer, chunk]);}let packageLen = 0; // 存放当前数据包的长度// 如果等于0,代表不是一个完整的数据包,或者说不足以读取while ((packageLen = ts.getPackageLen(chunk))) {const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容// 把上面截取过的数据从chunk中拿掉chunk = chunk.slice(packageLen);// 把客户度传过来的数据包解码const ret = ts.decode(packageCon);console.log(ret);// 给客户端回数据 可写流socket.write(ts.encode(ret.body, ret.serialNum));}// 并不能保证数据处理干净,如果没有被拿完,下一次再拿overageBuffer = chunk});});server.on("close", () => {console.log("服务端关闭了");});server.on("error", (err) => {if (err.code == "EADDRINUSE") {console.log("地址正在被使用");} else {console.log(err);}});
const net = require("net");const MyTransformCode = require('./myTransform.js')let overageBuffer = null;let ts = new MyTransformCode();const client = net.createConnection({port: 1234,host: "127.0.0.1",});// 可读流client.on("connect", () => {client.write(ts.encode('上海1'));client.write(ts.encode('上海1'));client.write(ts.encode('上海1'));client.write(ts.encode('上海1'));client.write(ts.encode('上海1'));client.write(ts.encode('上海1'));client.write(ts.encode('上海1'));});client.on("data", (chunk) => {if (overageBuffer) {chunk = Buffer.concat([overageBuffer, chunk]);}let packageLen = 0; // 存放当前数据包的长度// 如果等于0,代表不是一个完整的数据包,或者说不足以读取while ((packageLen = ts.getPackageLen(chunk))) {const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容// 把上面截取过的数据从chunk中拿掉chunk = chunk.slice(packageLen);const ret = ts.decode(packageCon);console.log(ret);}// 并不能保证数据处理干净,如果没有被拿完,下一次再拿overageBuffer = chunkconsole.log(chunk.toString());});client.on("error", (err) => {console.log(err);});client.on("close", () => {console.log("客户端断开连接");});

