1. 简介

拆包和粘包是在socket编程中经常出现的情况。

  • 粘包:在 socket 通讯过程中,如果通讯的一端一次性连续发送多条数据包,tcp 会将多个数据包打包成一个 tcp 报文发送出去,这就是所谓的粘包
  • 拆包:而如果通讯的一端发送的数据包超过一次 tcp 报文所能传输的最大值时,就会将一个数据包拆成多个最大 tcp 长度的 tcp 报文分开传输,这就叫做拆包。

    2. 一些基本概念

    MTU

    泛指通讯协议中的最大传输单元。一般用来说明 TCP/IP 四层协议中数据链路层的最大传输单元,不同类型的网络 MTU 也会不同,我们普遍使用的以太网的 MTU 是1500,即最大只能传输 1500 字节的数据帧。可以通过 ifconfig 命令查看电脑各个网卡的 MTU。
    image.png

    MSS

    指 TCP 建立连接后双方约定的可传输的最大 TCP 报文长度,是 TCP 用来限制应用层可发送的最大字节数。如果底层的 MTU 是 1500 byte,则 MSS = 1500- 20(IP Header) -20 (TCP Header) = 1460 byte

示意图

如图所示,客户端和服务端之间的通道代表 TCP 的传输通道,两个箭头之间的方块代表一个 TCP 数据包,正常情况下一个 TCP 传输一个应用数据。粘包时,两个或多个应用数据包被粘合在一起通过一个 TCP 传输。而拆包情况下,会一个应用数据包会被拆成两段分开传输,其他的一段可能会和其他应用数据包粘合。
image.png

3. 场景实例

下面通过简单实现两个 socket 端通讯,演示粘包和拆包的流程。客户端和服务端都在本机进行通讯,服务端使用 127.0.0.1 监听客户端,客户端也在 127.0.0.1 发起连接。

粘包

a. 实现服务端代码,服务监听55533端口,没有指定IP地址默认就是localhost,即本机IP环回地址 127.0.0.1,接着就等待客户端连接,代码如下:

  1. public class SocketServer {
  2. public static void main(String[] args) throws Exception {
  3. // 监听指定的端口
  4. int port = 55533;
  5. ServerSocket server = new ServerSocket(port);
  6. // server将一直等待连接的到来
  7. System.out.println("server将一直等待连接的到来");
  8. Socket socket = server.accept();
  9. // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
  10. InputStream inputStream = socket.getInputStream();
  11. byte[] bytes = new byte[1024 * 1024];
  12. int len;
  13. while ((len = inputStream.read(bytes)) != -1) {
  14. //注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
  15. String content = new String(bytes, 0, len,"UTF-8");
  16. System.out.println("len = " + len + ", content: " + content);
  17. }
  18. inputStream.close();
  19. socket.close();
  20. server.close();
  21. }
  22. }

b. 实现客户端代码,连接服务端,两端连接建立后,客户端就连续发送100个同样的字符串;

  1. public class SocketClient {
  2. public static void main(String[] args) throws Exception {
  3. // 要连接的服务端IP地址和端口
  4. String host = "127.0.0.1";
  5. int port = 55533;
  6. // 与服务端建立连接
  7. Socket socket = new Socket(host, port);
  8. // 建立连接后获得输出流
  9. OutputStream outputStream = socket.getOutputStream();
  10. String message = "这是一个整包!!!";
  11. for (int i = 0; i < 1; i++) {
  12. //Thread.sleep(1);
  13. outputStream.write(message.getBytes("UTF-8"));
  14. }
  15. Thread.sleep(20000);
  16. outputStream.close();
  17. socket.close();
  18. }
  19. }

c. 先运行服务端代码,运行到 server.accept() 时阻塞,打印“server将一直等待连接的到来”来等待客户端的连接,接着再运行客户端代码;

d. 客户端代码运行后,就能看到服务端的控制台打印结果如下:

  1. server将一直等待连接的到来
  2. len = 21, content: 这是一个整包!!!
  3. len = 168, content: 这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!
  4. len = 105, content: 这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!
  5. len = 42, content: 这是一个整包!!!这是一个整包!!!
  6. len = 42, content: 这是一个整包!!!这是一个整包!!!
  7. len = 63, content: 这是一个整包!!!这是一个整包!!!这是一个整包!!!
  8. len = 42, content: 这是一个整包!!!这是一个整包!!!
  9. len = 21, content: 这是一个整包!!!
  10. len = 42, content: 这是一个整包!!!这是一个整包!!!
  11. len = 21, content: 这是一个整包!!!
  12. len = 147, content: 这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!
  13. len = 63, content: 这是一个整包!!!这是一个整包!!!这是一个整包!!!
  14. len = 21, content: 这是一个整包!!!
  15. len = 252, content: 这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!这是一个整包!!!

按照原来的理解,在客户端每次发送一段字符串“这是一个整包!!!”, 分别发送了50次。服务端应该也会是分50次接收,会打印50行同样的字符串。但结果却是这样不寻常的结果,这就是由于粘包导致的结果。

总结出现粘包的原因

  1. 要发送的数据小于 TCP 发送缓冲区的大小,TCP 将多次写入缓冲区的数据一次发送出去;
  2. 接收数据端的应用层没有及时读取接收缓冲区中的数据;
  3. 数据发送过快,数据包堆积导致缓冲区积压多个数据后才一次性发送出去(如果客户端每发送一条数据就睡眠一段时间就不会发生粘包);

拆包

如果数据包太大,超过 MSS 的大小,就会被拆包成多个 TCP 报文分开传输。所以要演示拆包的情况,就需要发送一个超过 MSS 大小的数据,而 MSS 的大小是多少呢,就要看数据所经过网络的 MTU 大小。由于上面 socket 中的客户端和服务端 IP 都是 127.0.0.1, 数据只在回环网卡间进行传输,所以客户端和服务端的 MSS 都为回环网卡的 MTU - 20(IP Header) -20 (TCP Header),沿用粘包的例子,下面是拆包的处理步骤。

a. mac电脑可以通过 ifconfig 查看本地的各个网卡的 MTU,以下我的电脑运行 ifconfig 后输出的一部分,其中lo0就是回环网卡,可看出 mtu 是 16384:

  1. lo0: flags=8049<UP,LOOPBACK,RUNNING,MULTICAST> mtu 16384
  2. options=1203<RXCSUM,TXCSUM,TXSTATUS,SW_TIMESTAMP>
  3. inet 127.0.0.1 netmask 0xff000000
  4. inet6 ::1 prefixlen 128
  5. inet6 fe80::1%lo0 prefixlen 64 scopeid 0x1
  6. nd6 options=201<PERFORMNUD,DAD>
  7. en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
  8. ether 88:e9:fe:76:dc:57
  9. inet6 fe80::18d4:84fb:fa10:7f8%en0 prefixlen 64 secured scopeid 0x6
  10. inet 192.168.1.8 netmask 0xffffff00 broadcast 192.168.1.255
  11. inet6 240e:d2:495f:9700:182a:c53f:c720:5f63 prefixlen 64 autoconf secured
  12. inet6 240e:d2:495f:9700:d96:48f2:8108:2b33 prefixlen 64 autoconf temporary
  13. nd6 options=201<PERFORMNUD,DAD>
  14. media: autoselect
  15. status: active
  16. en1: flags=8963<UP,BROADCAST,SMART,RUNNING,PROMISC,SIMPLEX,MULTICAST> mtu 1500
  17. options=60<TSO4,TSO6>
  18. ether 7a:00:5c:40:cf:01
  19. media: autoselect <full-duplex>
  20. status: inactive
  21. en2: flags=8963<UP,BROADCAST,SMART,RUNNING,PROMISC,SIMPLEX,MULTICAST> mtu 1500
  22. options=60<TSO4,TSO6>
  23. ether 7a:00:5c:40:cf:00
  24. media: autoselect <full-duplex>
  25. status: inactive
  26. ......

b. 服务端代码和粘包时一样,将客户端代码改为发送一个超过16384字节的字符串,假设使用UTF-8编码的中文字符一个文字3个字节,那么就需要发送一个大约5461字的字符串,TCP才会拆包,为了篇幅不会太长,发送的字符串我只用一小段文字代替。客户端代码如下:

  1. public class SocketClient {
  2. private final static String CONTENT = "这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很.....长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串";//测试时大于5461文字,由于篇幅所限,只用这一段作为代表
  3. public static void main(String[] args) throws Exception {
  4. // 要连接的服务端IP地址和端口
  5. String host = "127.0.0.1";
  6. int port = 55533;
  7. // 与服务端建立连接
  8. Socket socket = new Socket(host, port);
  9. // 建立连接后获得输出流
  10. OutputStream outputStream = socket.getOutputStream();
  11. outputStream.write(CONTENT.getBytes("UTF-8"));
  12. Thread.sleep(20000);
  13. outputStream.close();
  14. socket.close();
  15. }
  16. }

c. 和粘包的代码示例一样,先运行原来的的服务端代码,接着运行客户端代码,看服务端的打印输出。

  1. server将一直等待连接的到来
  2. len = 22328, content: 这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很.....长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串这是一个很长很长的字符串...(有22328字节数组的文字)

通过输出的log,可发现客户端发送的字符串并没有在服务端被拆开,而是一次读取了客户端发送的完整字符串。是不是就没有被拆包呢,其实不是的,这是因为字符串被分拆成两个TCP报文,发送到了服务端的缓冲数据流中,服务端一次性读取了流中的数据,显示的结果就是两个 tcp 数据报串接在一起了。我们可以通过 tcpdump 抓包查看数据的传送细节:

在控制台输入 sudo tcpdump -i lo0 'port 55533',作用是监听回环网卡 lo0 上在 55533 端口传输的数据包,有从这个端口出入的数据包都会被抓获并打印出来,这个命令需要管理员权限,输入用户密码后,开始监听数据。这时我们按照刚才的测试步骤重新运行一遍,抓包的结果如下:

  1. tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
  2. listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes
  3. 23:15:44.641208 IP 192.168.1.8.58748 > 192.168.1.8.55533: Flags [S], seq 2331897419, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 261991443 ecr 0,sackOK,eol], length 0
  4. 23:15:44.641261 IP 192.168.1.8.55533 > 192.168.1.8.58748: Flags [S.], seq 3403812509, ack 2331897420, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 261991443 ecr 261991443,sackOK,eol], length 0
  5. 23:15:44.641270 IP 192.168.1.8.58748 > 192.168.1.8.55533: Flags [.], ack 1, win 6379, options [nop,nop,TS val 261991443 ecr 261991443], length 0
  6. 23:15:44.641279 IP 192.168.1.8.55533 > 192.168.1.8.58748: Flags [.], ack 1, win 6379, options [nop,nop,TS val 261991443 ecr 261991443], length 0
  7. 23:15:44.644808 IP 192.168.1.8.58748 > 192.168.1.8.55533: Flags [.], seq 1:16333, ack 1, win 6379, options [nop,nop,TS val 261991446 ecr 261991443], length 16332
  8. 23:15:44.644812 IP 192.168.1.8.58748 > 192.168.1.8.55533: Flags [P.], seq 16333:22329, ack 1, win 6379, options [nop,nop,TS val 261991446 ecr 261991443], length 5996
  9. 23:15:44.644835 IP 192.168.1.8.55533 > 192.168.1.8.58748: Flags [.], ack 22329, win 6030, options [nop,nop,TS val 261991446 ecr 261991446], length 0
  1. 第三行中,客户端发起连接请求,options参数中有一个mss 16344的参数,就表示连接建立后,客户端能接收的最大TCP报文大小,超过后就会被拆包分开传送;
  2. 前四行都是两端的连接过程;
  3. 第五行客户端口58748向服务端口55533传输了16332字节大小的数据包;
  4. 第六行客户端口58748向服务端口55533传输了5996字节大小的数据包;

从抓包过程就能看出,客户端发送一个字符串,被拆成了两个TCP数据报进行传输。

4. 解决方案

对于粘包的情况,要对粘在一起的包进行拆包。对于拆包的情况,要对被拆开的包进行粘包,即将一个被拆开的完整应用包再组合成一个完整包。比较通用的做法就是每次发送一个应用数据包前在前面加上四个字节的包长度值,指明这个应用包的真实长度。如下图就是应用数据包格式。
TCP 拆包和粘包 - 图3

下面我修改前文的代码示例,来实现解决拆包粘包问题,有两种实现方式:

  1. 一种方式是引入 netty 库,netty 封装了多种拆包粘包的方式,只需要对接口熟悉并调用即可,减少自己处理数据协议的繁琐流程;
  2. 自己写协议封装和解析流程,相当于实现了 netty 库拆粘包的简易版本,本篇文章是为了学习需要,就通过这个方式实现:

a. 客户端。每次发送一个字符串前,都将字符串转为字节数组,在原数据字节数组前再加上一个四个字节的代表该数据的长度,然后将组合的字节数组发送出去;

  1. public class SocketClient {
  2. public static void main(String[] args) throws Exception {
  3. // 要连接的服务端IP地址和端口
  4. String host = "127.0.0.1";
  5. int port = 55533;
  6. // 与服务端建立连接
  7. Socket socket = new Socket(host, port);
  8. // 建立连接后获得输出流
  9. OutputStream outputStream = socket.getOutputStream();
  10. String message = "这是一个整包!!!";
  11. byte[] contentBytes = message.getBytes("UTF-8");
  12. System.out.println("contentBytes.length = " + contentBytes.length);
  13. int length = contentBytes.length;
  14. byte[] lengthBytes = Utils.int2Bytes(length);
  15. byte[] resultBytes = new byte[4 + length];
  16. System.arraycopy(lengthBytes, 0, resultBytes, 0, lengthBytes.length);
  17. System.arraycopy(contentBytes, 0, resultBytes, 4, contentBytes.length);
  18. for (int i = 0; i < 10; i++) {
  19. outputStream.write(resultBytes);
  20. }
  21. Thread.sleep(20000);
  22. outputStream.close();
  23. socket.close();
  24. }
  25. }
  26. public final class Utils {
  27. //int数值转为字节数组
  28. public static byte[] int2Bytes(int i) {
  29. byte[] result = new byte[4];
  30. result[0] = (byte) (i >> 24 & 0xFF);
  31. result[1] = (byte) (i >> 16 & 0xFF);
  32. result[2] = (byte) (i >> 8 & 0xFF);
  33. result[3] = (byte) (i & 0xFF);
  34. return result;
  35. }
  36. //字节数组转为int数值
  37. public static int bytes2Int(byte[] bytes){
  38. int num = bytes[3] & 0xFF;
  39. num |= ((bytes[2] << 8) & 0xFF00);
  40. num |= ((bytes[1] << 16) & 0xFF0000);
  41. num |= ((bytes[0] << 24) & 0xFF000000);
  42. return num;
  43. }
  44. }

b. 服务端。接收到客户端发送过来的字节数组后,先提取前面四个字节转为int值,然后再往后取该int数值长度的字节数,再转为字符串就是客户端端发送过来的数据,详见代码:

  1. public class SocketServer {
  2. public static void main(String[] args) throws Exception {
  3. // 监听指定的端口
  4. int port = 55533;
  5. ServerSocket server = new ServerSocket(port);
  6. // server将一直等待连接的到来
  7. System.out.println("server将一直等待连接的到来");
  8. Socket socket = server.accept();
  9. // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
  10. InputStream inputStream = socket.getInputStream();
  11. byte[] bytes = new byte[1024 * 128];
  12. int len;
  13. byte[] totalBytes = new byte[]{};
  14. int totalLength = 0;
  15. while ((len = inputStream.read(bytes)) != -1) {
  16. //1. 将读取的数据和上一次遗留的数据拼起来
  17. int tempLength = totalLength;
  18. totalLength = len + totalLength;
  19. byte[] tempBytes = totalBytes;
  20. totalBytes = new byte[totalLength];
  21. System.arraycopy(tempBytes, 0, totalBytes, 0, tempLength);
  22. System.arraycopy(bytes, 0, totalBytes, tempLength, len);
  23. while (totalLength > 4) {
  24. byte[] lengthBytes = new byte[4];
  25. System.arraycopy(totalBytes, 0, lengthBytes, 0, lengthBytes.length);
  26. int contentLength = Utils.bytes2Int(lengthBytes);
  27. //2. 如果剩下数据小于数据头标的长度,则出现拆包,再次获取数据连接
  28. if (totalLength < contentLength + 4) {
  29. break;
  30. }
  31. //3. 将数据头标的指定长度的数据取出则为应用数据
  32. byte[] contentBytes = new byte[contentLength];
  33. System.arraycopy(totalBytes, 4, contentBytes, 0, contentLength);
  34. //注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
  35. String content = new String(contentBytes, "UTF-8");
  36. System.out.println("contentLength = " + contentLength + ", content: " + content);
  37. //4. 去掉已读取的数据
  38. totalLength -= (4 + contentLength);
  39. byte[] leftBytes = new byte[totalLength];
  40. System.arraycopy(totalBytes, 4 + contentLength, leftBytes, 0, totalLength);
  41. totalBytes = leftBytes;
  42. }
  43. }
  44. inputStream.close();
  45. socket.close();
  46. server.close();
  47. }
  48. }

c. 打印结果:

  1. server将一直等待连接的到来
  2. contentLength = 21, content: 这是一个整包!!!
  3. contentLength = 21, content: 这是一个整包!!!
  4. contentLength = 21, content: 这是一个整包!!!
  5. contentLength = 21, content: 这是一个整包!!!
  6. contentLength = 21, content: 这是一个整包!!!
  7. contentLength = 21, content: 这是一个整包!!!
  8. contentLength = 21, content: 这是一个整包!!!
  9. contentLength = 21, content: 这是一个整包!!!
  10. contentLength = 21, content: 这是一个整包!!!
  11. contentLength = 21, content: 这是一个整包!!!

客户端连续发送十个字符串,服务端也收到了分开的十个字符串,不再出现多个数据包连在一起的情况了。

在Netty的应用层,按照 ByteBuf 为 单位来发送数据,但是到了底层操作系统仍然是按照字节流发送数据,因此,从底层到应用层,需要进行二次拼装
操作系统底层,是按照字节流的方式读入,到了 Netty 应用层面,需要二次拼装成 ByteBuf。

这就是粘包和半包的根源。

在Netty 层面,拼装成ByteBuf时,就是对底层缓冲的读取,这里就有问题了。

首先,上层应用层每次读取底层缓冲的数据容量是有限制的,当TCP底层缓冲数据包比较大时,将被分成多次读取,造成断包,在应用层来说,就是半包。
其次,如果上层应用层一次读到多个底层缓冲数据包,就是粘包。

如何解决呢?
基本思路是,在接收端,需要根据自定义协议来,来读取底层的数据包,重新组装我们应用层的数据包,这个过程通常在接收端称为拆包

拆包的原理

拆包基本原理,简单来说:

  • 接收端应用层不断从底层的 TCP 缓冲区中读取数据。
  • 每次读取完,判断一下是否为一个完整的应用层数据包。如果是,上层应用层数据包读取完成。
  • 如果不是,那就保留该数据在应用层缓冲区,然后继续从 TCP 缓冲区中读取,直到得到一个完整的应用层数据包为止。
  • 至此,半包问题得以解决
  • 如果从TCP底层读到了多个应用层数据包,则将整个应用层缓冲区,拆成一个一个的独立的应用层数据包,返回给调用程序。
  • 至此,粘包问题得以解决

Netty 中的拆包器

拆包这个工作,Netty 已经为大家备好了很多不同的拆包器。本着不重复发明轮子的原则,我们直接使用 Netty 现成的拆包器。

Netty 中的拆包器大致如下:

  1. 固定长度的拆包器 FixedLengthFrameDecoder
    每个应用层数据包的都拆分成都是固定长度的大小,比如 1024字节。
    这个显然不大适应在 Java 聊天程序 进行实际应用。

  2. 行拆包器 LineBasedFrameDecoder
    每个应用层数据包,都以换行符作为分隔符,进行分割拆分。
    这个显然不大适应在 Java 聊天程序 进行实际应用。

  3. 分隔符拆包器 DelimiterBasedFrameDecoder
    每个应用层数据包,都通过自定义的分隔符,进行分割拆分。
    这个版本,是LineBasedFrameDecoder 的通用版本,本质上是一样的。
    这个显然不大适应在 Java 聊天程序 进行实际应用。

  4. 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder
    将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度。
    这个显然比较适和在 Java 聊天程序 进行实际应用。下面我们来应用这个拆分器。

    拆包之前的消息包装

    在使用LengthFieldBasedFrameDecoder 拆包器之前 ,在发送端需要对protobuf 的消息包进行一轮包装

发送端包装的方法是:
在实际的protobuf 二进制消息包的前面,加上四个字节。
前两个字节为版本号,后两个字节为实际发送的 protobuf 的消息长度。
TCP 拆包和粘包 - 图4

强调一下,二进制消息包装,在发送端进行。
修改发送端的编码器 ProtobufEncoder ,代码如下:

  1. /**
  2. * 编码器
  3. */
  4. public class ProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {
  5. @Override
  6. protected void encode(ChannelHandlerContext ctx, ProtoMsg.Message msg, ByteBuf out)
  7. throws Exception {
  8. byte[] bytes = msg.toByteArray();// 将对象转换为byte
  9. int length = bytes.length;// 读取 ProtoMsg 消息的长度
  10. ByteBuf buf = Unpooled.buffer(2 + length);
  11. // 先将消息协议的版本写入,也就是消息头
  12. buf.writeShort(Constants.PROTOCOL_VERSION);
  13. // 再将 ProtoMsg 消息的长度写入
  14. buf.writeShort(length);
  15. // 写入 ProtoMsg 消息的消息体
  16. buf.writeBytes(bytes);
  17. //发送
  18. out.writeBytes(buf);
  19. }
  20. }

发送端的步骤是:

  • 先将消息协议的版本写入,也就是消息头 buf.writeShort(Constants.PROTOCOL_VERSION);
  • 再将 ProtoMsg 消息的长度写入 buf.writeShort(length);
  • 最后,写入 ProtoMsg 消息的消息体 buf.writeBytes(bytes);

开发一个接收端的自定义拆包器

使用Netty中,基于长度域拆包器 LengthFieldBasedFrameDecoder,按照实际的应用层数据包长度来拆分。

需要做两个工作:

  • 设置长度信息(长度域)在数据包中的位置。
  • 设置长度信息(长度域)自身的长度,也就是占用的字节数。

在前面的小节中,我们的长度信息(长度域)的占用字节数为 2个字节; 在报文中的所处的位置,长度信息(长度域)处于版本号之后。
版本号是2个字节,从0开始数,长度信息(长度域)的在数据包中的位置为2。

这些数据定义在Constansts常量类中。

  1. public class Constants
  2. {
  3. //协议版本号
  4. public static final short PROTOCOL_VERSION = 1;
  5. //头部的长度: 版本号 + 报文长度
  6. public static final short PROTOCOL_HEADLENGTH = 4;
  7. //长度的偏移
  8. public static final short LENGTH_OFFSET = 2;
  9. //长度的字节数
  10. public static final short LENGTH_BYTES_COUNT = 2;
  11. }

有了这些数据之后,可以基于Netty 的长度拆包器 LengthFieldBasedFrameDecoder, 开发自己的长度分割器。
新开发的分割器为PackageSpliter,代码如下:

  1. package com.crazymakercircle.chat.common.codec;
  2. public class PackageSpliter extends LengthFieldBasedFrameDecoder
  3. {
  4. public PackageSpliter() {
  5. super(Integer.MAX_VALUE, Constants.LENGTH_OFFSET,Constants.LENGTH_BYTES_COUNT);
  6. }
  7. @Override
  8. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  9. return super.decode(ctx, in);
  10. }
  11. }

分割器 PackageSpliter 继承了 LengthFieldBasedFrameDecoder,传入了三个参数。

  • 长度的偏移量 ,这里是 Constants.LENGTH_OFFSET,值为 2
  • 长度的字节数,这里是 Constants.LENGTH_BYTES_COUNT,值为 2
  • 最大的应用包长度,这里是 Integer.MAX_VALUE,表示不限制

分割器 写好之后,只需要在 pipeline 的最前面加上这个分割器,就可以使用这个分割器(自定义的拆包器)。

自定义拆包器的实际应用

在服务器端的 pipeline 的最前面加上这个分割器,代码如下:

  1. package com.crazymakercircle.chat.server;
  2. //...
  3. @Service("ChatServer")
  4. public class ChatServer
  5. {
  6. static final Logger LOGGER = LoggerFactory.getLogger(ChatServer.class);
  7. //...
  8. //有连接到达时会创建一个channel
  9. protected void initChannel(SocketChannel ch) throws Exception
  10. { //应用自定义拆包器
  11. ch.pipeline().addLast(new PackageSpliter());
  12. ch.pipeline().addLast(new ProtobufDecoder());
  13. ch.pipeline().addLast(new ProtobufEncoder());
  14. // pipeline管理channel中的Handler
  15. // 在channel队列中添加一个handler来处理业务
  16. ch.pipeline().addLast("serverHandler", serverHandler);
  17. }
  18. });
  19. //....
  20. }

在发送端的 pipeline 的最前面加上这个分割器,代码也是类似的, 这里不再赘述。大家可以下载源码查看。