TCP数据粘包
- 通信包含数据发送端和接收端
- 发送端累积数据统一发送
- 接收端缓冲数据之后再消费
好处:减少IO操作带来的性能消耗
坏处:数据使用问题会产生粘包问题
问题: 数据是放在缓冲中的,那么在什么样的条件下,他才会去开始执行发送呢?
- TCP拥塞机制决定发送时机
客户端写入多条数据,打印数据
const net = require("net");
// 创建服务端实例
const server = net.createServer();
const PORT = 1234;
const HOST = "localhost";
server.listen(PORT, HOST);
server.on("listening", () => {
console.log(`服务端已经开启${HOST}:${PORT}`);
});
// 接收消息,回写消息 双工流
server.on("connection", (socket) => {
socket.on("data", (chunk) => {
const msg = chunk.toString();
console.log(msg);
// 给客户端回数据 可写流
socket.write(Buffer.from("您好" + msg));
});
});
server.on("close", () => {
console.log("服务端关闭了");
});
server.on("error", (err) => {
if (err.code == "EADDRINUSE") {
console.log("地址正在被使用");
} else {
console.log(err);
}
});
const net = require("net");
const client = net.createConnection({
port: 1234,
host: "127.0.0.1",
});
let dataArr = [
'上海1',
'上海2',
'上海3',
'上海4',
'上海5',
]
// 可读流
client.on('connect',() => {
// client.write('上海1')
// client.write('上海2')
// client.write('上海3')
// client.write('上海4')
// client.write('上海5')
for(let i = 0; i< dataArr.length;i++){
(function(val, index){
setTimeout(() => {
client.write(val)
},1000*(index+1))
})(dataArr[i],i)
}
})
client.on('data', chunk => {
console.log(chunk.toString());
})
client.on('error',(err) => {
console.log(err);
})
client.on('close',() => {
console.log('客户端断开连接');
})
数据的封包与拆包
- 对数据进行打包,避免客户端连续多次发送数据时,产生粘包的现象
- 自定义包的约束规则,在body前面拼接上header,在里面定义包的编号和内容长度
- 自定义规则实现包的编码和解码
数据传输过程
- 进行数据编码,获取二进制数据包
-
Buffer数据读写
writeInt16BE: 将value从指定位置写入
-
代码实现
```javascript class MyTransformCode { constructor() { this.packageHeaderLen = 4; this.serialNum = 0; // 数据包的编号 this.serialLen = 2; // 包的长度 }
// 编码 encode(data, serialNum) { // data 是非二进制数据,使用buffer转换成二进制数据 const body = Buffer.from(data);
// 01. 先按照指定的长度申请内存空间,作为header使用 const headerBuf = Buffer.alloc(this.packageHeaderLen);
// 02. 如果传了,就使用他自己的,没有传使用自己定义的 headerBuf.writeInt16BE(serialNum || this.serialNum);
// 跳过序列号 headerBuf.writeInt16BE(body.length, this.serialLen);
// 如果没有传包的编号 if (serialNum == undefined) {
this.serialNum++;
} return Buffer.concat([headerBuf, body]); }
// 解码 decode(buffer) { const headerBuf = buffer.slice(0, this.packageHeaderLen); // 从开始截取到头部长度位置 const bodyBuf = buffer.slice(this.packageHeaderLen); // 跳过头部截取 return {
serialNum: headerBuf.readInt16BE(), // 把头部编号读取出来
bodyLength: headerBuf.readInt16BE(this.serialLen),
body: bodyBuf.toString(),
}; }
// 获取当前数据包长度 getPackageLen(buffer) { // 如果包长度不满4,不应该从里面拿数据 if (buffer.length < this.packageHeaderLen) {
return 0;
} else{
// 返回实际的长度, 头部的长度+buffer的长度
return this.packageHeaderLen + buffer.readInt16BE(this.serialLen)
} } }
module.exports = MyTransformCode
```javascript
const MyTransformCode = require('./myTransform.js')
let ts = new MyTransformCode()
let str1 = '上海1'
// 参数二是指定的编号
console.log(Buffer.from(str1));
// const a = ts.encode(str1, 1)
// console.log(a);
let encodeBuf = ts.encode(str1, 1)
let a = ts.decode(encodeBuf)
console.log(a); // { serialNum: 1, bodyLength: 7, body: '上海1' }
let len = ts.getPackageLen(encodeBuf)
console.log(len); // 11个字节
使用封包拆包解决TCP粘包完整代码
const net = require("net");
const MyTransformCode = require("./myTransform.js");
// 创建服务端实例
const server = net.createServer();
let overageBuffer = null;
let ts = new MyTransformCode();
const PORT = 1234;
const HOST = "localhost";
server.listen(PORT, HOST);
server.on("listening", () => {
console.log(`服务端已经开启${HOST}:${PORT}`);
});
// 接收消息,回写消息 双工流
server.on("connection", (socket) => {
socket.on("data", (chunk) => {
const msg = chunk.toString();
// 如果有数据,表示上次有数据未处理完
if (overageBuffer) {
chunk = Buffer.concat([overageBuffer, chunk]);
}
let packageLen = 0; // 存放当前数据包的长度
// 如果等于0,代表不是一个完整的数据包,或者说不足以读取
while ((packageLen = ts.getPackageLen(chunk))) {
const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容
// 把上面截取过的数据从chunk中拿掉
chunk = chunk.slice(packageLen);
// 把客户度传过来的数据包解码
const ret = ts.decode(packageCon);
console.log(ret);
// 给客户端回数据 可写流
socket.write(ts.encode(ret.body, ret.serialNum));
}
// 并不能保证数据处理干净,如果没有被拿完,下一次再拿
overageBuffer = chunk
});
});
server.on("close", () => {
console.log("服务端关闭了");
});
server.on("error", (err) => {
if (err.code == "EADDRINUSE") {
console.log("地址正在被使用");
} else {
console.log(err);
}
});
const net = require("net");
const MyTransformCode = require('./myTransform.js')
let overageBuffer = null;
let ts = new MyTransformCode();
const client = net.createConnection({
port: 1234,
host: "127.0.0.1",
});
// 可读流
client.on("connect", () => {
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
client.write(ts.encode('上海1'));
});
client.on("data", (chunk) => {
if (overageBuffer) {
chunk = Buffer.concat([overageBuffer, chunk]);
}
let packageLen = 0; // 存放当前数据包的长度
// 如果等于0,代表不是一个完整的数据包,或者说不足以读取
while ((packageLen = ts.getPackageLen(chunk))) {
const packageCon = chunk.slice(0, packageLen); // 拿到包所有内容
// 把上面截取过的数据从chunk中拿掉
chunk = chunk.slice(packageLen);
const ret = ts.decode(packageCon);
console.log(ret);
}
// 并不能保证数据处理干净,如果没有被拿完,下一次再拿
overageBuffer = chunk
console.log(chunk.toString());
});
client.on("error", (err) => {
console.log(err);
});
client.on("close", () => {
console.log("客户端断开连接");
});