TCP数据粘包
- 通信包含数据发送端和接收端
- 发送端累积数据统一发送
- 接收端缓冲数据之后再消费
好处:减少IO操作带来的性能消耗
坏处:数据使用问题会产生粘包问题
问题: 数据是放在缓冲中的,那么在什么样的条件下,他才会去开始执行发送呢?
- TCP拥塞机制决定发送时机
客户端写入多条数据,打印数据
const net = require("net");// 创建服务端实例const server = net.createServer();const PORT = 1234;const HOST = "localhost";server.listen(PORT, HOST);server.on("listening", () => {console.log(`服务端已经开启${HOST}:${PORT}`);});// 接收消息,回写消息 双工流server.on("connection", (socket) => {socket.on("data", (chunk) => {const msg = chunk.toString();console.log(msg);// 给客户端回数据 可写流socket.write(Buffer.from("您好" + msg));});});server.on("close", () => {console.log("服务端关闭了");});server.on("error", (err) => {if (err.code == "EADDRINUSE") {console.log("地址正在被使用");} else {console.log(err);}});
const net = require("net");const client = net.createConnection({port: 1234,host: "127.0.0.1",});let dataArr = ['上海1','上海2','上海3','上海4','上海5',]// 可读流client.on('connect',() => {// client.write('上海1')// client.write('上海2')// client.write('上海3')// client.write('上海4')// client.write('上海5')for(let i = 0; i< dataArr.length;i++){(function(val, index){setTimeout(() => {client.write(val)},1000*(index+1))})(dataArr[i],i)}})client.on('data', chunk => {console.log(chunk.toString());})client.on('error',(err) => {console.log(err);})client.on('close',() => {console.log('客户端断开连接');})
数据的封包与拆包
- 对数据进行打包,避免客户端连续多次发送数据时,产生粘包的现象
- 自定义包的约束规则,在body前面拼接上header,在里面定义包的编号和内容长度
- 自定义规则实现包的编码和解码
数据传输过程
- 进行数据编码,获取二进制数据包
-
Buffer数据读写
writeInt16BE: 将value从指定位置写入
-
代码实现
```javascript class MyTransformCode { constructor() { this.packageHeaderLen = 4; this.serialNum = 0; // 数据包的编号 this.serialLen = 2; // 包的长度 }
// 编码 encode(data, serialNum) { // data 是非二进制数据,使用buffer转换成二进制数据 const body = Buffer.from(data);
// 01. 先按照指定的长度申请内存空间,作为header使用 const headerBuf = Buffer.alloc(this.packageHeaderLen);
// 02. 如果传了,就使用他自己的,没有传使用自己定义的 headerBuf.writeInt16BE(serialNum || this.serialNum);
// 跳过序列号 headerBuf.writeInt16BE(body.length, this.serialLen);
// 如果没有传包的编号 if (serialNum == undefined) {
this.serialNum++;
} return Buffer.concat([headerBuf, body]); }
// 解码 decode(buffer) { const headerBuf = buffer.slice(0, this.packageHeaderLen); // 从开始截取到头部长度位置 const bodyBuf = buffer.slice(this.packageHeaderLen); // 跳过头部截取 return {
serialNum: headerBuf.readInt16BE(), // 把头部编号读取出来 bodyLength: headerBuf.readInt16BE(this.serialLen), body: bodyBuf.toString(),}; }
// 获取当前数据包长度 getPackageLen(buffer) { // 如果包长度不满4,不应该从里面拿数据 if (buffer.length < this.packageHeaderLen) {
return 0;} else{
// 返回实际的长度, 头部的长度+buffer的长度 return this.packageHeaderLen + buffer.readInt16BE(this.serialLen)} } }
module.exports = MyTransformCode
```javascript
const MyTransformCode = require('./myTransform.js')
let ts = new MyTransformCode()
let str1 = '上海1'
// 参数二是指定的编号
console.log(Buffer.from(str1));
// const a = ts.encode(str1, 1)
// console.log(a);
let encodeBuf = ts.encode(str1, 1)
let a = ts.decode(encodeBuf)
console.log(a); // { serialNum: 1, bodyLength: 7, body: '上海1' }
let len = ts.getPackageLen(encodeBuf)
console.log(len); // 11个字节
使用封包拆包解决TCP粘包完整代码
const net = require("net");
const MyTransformCode = require("./myTransform.js");
// 创建服务端实例
const server = net.createServer();
let overageBuffer = null;
let ts = new MyTransformCode();
const PORT = 1234;
const HOST = "localhost";
server.listen(PORT, HOST);
server.on("listening", () => {
console.log(`服务端已经开启${HOST}:${PORT}`);
});
// 接收消息,回写消息 双工流
server.on("connection", (socket) => {
socket.on("data", (chunk) => {
const msg = chunk.toString();
// 如果有数据,表示上次有数据未处理完
if (overageBuffer) {
chunk = Buffer.concat([overageBuffer, chunk]);
}
let packageLen = 0; // 存放当前数据包的长度
// 如果等于0,代表不是一个完整的数据包,或者说不足以读取
while ((packageLen = ts.getPackageLen(chunk))) {
const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容
// 把上面截取过的数据从chunk中拿掉
chunk = chunk.slice(packageLen);
// 把客户度传过来的数据包解码
const ret = ts.decode(packageCon);
console.log(ret);
// 给客户端回数据 可写流
socket.write(ts.encode(ret.body, ret.serialNum));
}
// 并不能保证数据处理干净,如果没有被拿完,下一次再拿
overageBuffer = chunk
});
});
server.on("close", () => {
console.log("服务端关闭了");
});
server.on("error", (err) => {
if (err.code == "EADDRINUSE") {
console.log("地址正在被使用");
} else {
console.log(err);
}
});
const net = require("net");
const MyTransformCode = require('./myTransform.js')
let overageBuffer = null;
let ts = new MyTransformCode();
const client = net.createConnection({
port: 1234,
host: "127.0.0.1",
});
// 可读流
client.on("connect", () => {
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
});
client.on("data", (chunk) => {
if (overageBuffer) {
chunk = Buffer.concat([overageBuffer, chunk]);
}
let packageLen = 0; // 存放当前数据包的长度
// 如果等于0,代表不是一个完整的数据包,或者说不足以读取
while ((packageLen = ts.getPackageLen(chunk))) {
const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容
// 把上面截取过的数据从chunk中拿掉
chunk = chunk.slice(packageLen);
const ret = ts.decode(packageCon);
console.log(ret);
}
// 并不能保证数据处理干净,如果没有被拿完,下一次再拿
overageBuffer = chunk
console.log(chunk.toString());
});
client.on("error", (err) => {
console.log(err);
});
client.on("close", () => {
console.log("客户端断开连接");
});

