title: Netty网络编程
date: 2022-1-23 10:17:20
tags: 网络编程
categories: Java
cover: imgcat/java.png
1. 网络系统
1.1 用户空间与内核空间
任何Linux发行版,其系统内核都是Linux。我们的应用都需要通过Linux内核与硬件交互
Linux内核负责与硬件直接交互,里面的一些核心的命令不能由用户随意操作,以免用户应用导致冲突,甚至使内核崩溃,因此Linux系统将用户应用与内核应用隔离开!
进程的寻址空间会划分为两部分:内核空间和用户空间
- 用户空间:只能执行受限的命令,不能够直接调用系统资源,需要通过Linux内核提供的接口开访问
- 内核空间:可以执行特权命令,调用一切的系统资源
以32-bit的Linux系统为例,虚拟地址的寻址空间为,即0-4GB,其中高位的1GB用作内核空间,低位的3GB用作用户空间。工作在用户空间的进程称之为用户态,工作在内核空间的进程称之为内核态!
Linux系统为了提高IO效率,会在用户空间和内核空间都加入缓冲区:
- 写数据时,要把用户缓冲数据拷贝到内核缓冲区,然后写入设备
- 读数据时,要从设备读取数据到内核缓冲区,然后拷贝到用户缓冲区
从上图中我们可以看到操作系统的IO有两部分最为耗时:
- 等待数据就绪,也就是将磁盘或网卡中的数据读入内核空间
- 将数据从内核空间拷贝到用户空间
1.2 网络模型
漫话:如何给女朋友解释什么是Linux的五种IO模型?
Linux操作系统共有5中IO模型
阻塞IO
我们先简化一下前面IO的读取流程
阻塞IO(Blockning IO):是用户空间读取数据时,等待数据就绪和从内核中拷贝数据到用户空间中两个阶段都是阻塞等待的
用户在读取数据时,会通过recvfrom接口对内核发起系统调用:
在第一个阶段:
- 用户进程尝试获取网卡数据
- 此时数据尚未到达,内核需要等待数据到达
- 此时用户应用也是阻塞等待的
在第二个阶段
- 数据已到达内核空间,并且拷贝至内核空间缓冲区,表示数据已经就绪
- 将内核空间缓冲区中的数据拷贝至内核空间缓冲区
- 数据拷贝过程中,用户进程依然阻塞等待
- 数据拷贝完成,用户进程解除阻塞,开始处理数据
因此,阻塞IO模型中,用户进程在等待数据就绪和从内核中拷贝数据两个阶段都是阻塞状态
非阻塞IO
非阻塞IO(Non-Blocking IO):就是用户在发起recvfrom调用后,等到数据期间是不阻塞的,而是立即返回结果
用户在读取数据时,会通过recvfrom接口对内核发起系统调用:
在第一个阶段:
- 用户进程尝试读取数据(比如网卡数据)
- 此时数据尚未到达,内核需要等待数据
- 返回异常给用户进程用户进程拿到error后,
- 再次尝试读取循环往复,直到数据就绪
在第二个阶段
- 将内核数据拷贝到用户缓冲区
- 拷贝过程中,用户进程依然阻塞等待
- 拷贝完成,用户进程解除阻塞,处理数据
可以看到,在非阻塞IO模型中,用户在等待数据的期间是不阻塞的,在拷贝数据的过程中是阻塞的!
虽然第一个阶段是非阻塞的,但是NIO的性能并没有提高,因为用户进程在接收到没有读到数据的信息时,仍然要不断尝试去读取数据,这样造成了CPU空转,使得CPU使用率暴增!
IO多路复用
阻塞IO与非阻塞IO最大的区别在于数据未就绪等待数据的过程,BIO是阻塞等待的,NIO是不阻塞的
而在单线程情况下,只能依次处理IO事件,如果正在处理的IO事件恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有IO事件都必须等待,性能自然会很差。
就比如服务员给顾客点餐,分两步:
- 顾客思考要吃什么(等待数据就绪)
- 顾客想好了,开始点餐(读取数据)
要提高效率有几种办法?
- 方案一:增加更多服务员(多线程)
- 方案二:不排队,谁想好了吃什么(数据就绪了),服务员就给谁点餐(用户应用就去读取数据)
用户进程如何知道内核中数据是否就绪呢?
IO多路复用:利用单个线程同时监听多个文件描述符,并在文件描述符可读可写时得到通知,从而避免无效的等待,充分的利用CPU资源!
文件描述符:File Descriptor,简称FD,用来关联Linux中的一个文件。Linux中一切皆文件,包括文件、视频、网络套接字socket等
用户进程通过select函数监听socket,以判断内核中的数据是否就绪
在第一个阶段:
- 用户进程调用select,指定要监听的FD集合
- 内核监听FD对应的多个socke
- t任意一个或多个socket数据就绪则返回readable
- 此过程中用户进程阻塞
在第二个阶段
- 用户进程找到就绪的socket
- 依次调用recvfrom读取数据
- 内核将数据拷贝到用户空间
- 用户进程处理数据
监听FD的方式有多中包括:select、poll、epoll
所谓的select,poll,epoll其实是Linux操作系统IO多路复用中用于监听socket的方法,我们知道IO多路复用,就是Linux在进行IO操作时,用一个线程来监控有哪些socket上的数据是就绪的,可以进行读写操作。
socket: socket是应用层与TCP/IP协议之间的抽象层,它是一套API。我们知道TCP/IP协议是由操作系统内核来实现的,socket就是内核提供给应用层的一系列接口,socket封装了TCP/IP,调用socket的函数可以调用内核通过TCP/IP来发送数据
文件描述符:在Linux的进程里,会维护一个file数组,他用来指向进程中已经打开文件的地址,这个数组的索引就是我们所说的文件描述符。
例如在进程file数组的索引6上建立socket链接,file[6]就会指向这个socket链接
select
select是Linux最早是用的I/O多路复用技术
// 定义类型别名 __fd_mask,本质是 long int
typedef long int __fd_mask;
/* fd_set 记录要监听的fd集合,及其对应状态 */
typedef struct {
// fds_bits是long类型数组,长度为 1024/32 = 32
// 共1024个bit位,每个bit位代表一个fd,0代表未就绪,1代表就绪
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
// ...
} fd_set;
// select函数,用于监听fd_set,也就是多个fd的集合
int select(
int nfds, // 要监视的fd_set的最大fd + 1
fd_set *readfds, // 要监听读事件的fd集合
fd_set *writefds,// 要监听写事件的fd集合
fd_set *exceptfds, // // 要监听异常事件的fd集合
// 超时时间,null-用不超时;0-不阻塞等待;大于0-固定等待时间
struct timeval *timeout
);
select的工作流程
- 首先select函数在用户空间中创建一个fd_set,它是一个1024 bit的数组,用于标记socket
- 将要监听的文件标识符位设为1,其他的置为0
- 执行select函数,将fd_set从用户空间拷贝至内核空间
- select函数在内核空间中遍历fd_set,查询对应的socket有没有就绪
- 没有就绪,休眠
- 如有socket就绪,将fd_set原始数据清0,将对应位的fd_set置为1
- 再将内核空间的fd_set拷贝回用户空间
- 用户空间再次遍历fd_set,寻找已经就绪的fd,调用recvfrom,处理数据
select模式的问题:
- 需要将fd_set从用户空间拷贝至内核空间,还要再从内核空间拷贝至用户空间
- fd_set中不能够向用户空间直接提供是哪个socket就绪,需要遍历整个fd_set
- fd_set监听的数量不能够超过1024
poll
poll对select的结构做了一点改进 ```c // pollfd 中的事件类型define POLLIN //可读事件
define POLLOUT //可写事件
define POLLERR //错误事件
define POLLNVAL //fd未打开
// pollfd结构 struct pollfd { int fd; / 要监听的fd / short int events; / 要监听的事件类型:读、写、异常 / short int revents;/ 实际发生的事件类型 / };
// poll函数 int poll( struct pollfd *fds, // pollfd数组,可以自定义大小 nfds_t nfds, // 数组元素个数 int timeout // 超时时间 );
<a name="w9sRz"></a>
##### poll的工作流程
1. 调用poll函数,首先创建一个pollfd数组,pollfd中记录这要监听的fd,和要监听的事件以及实际发生的事件
1. 将数组拷贝至内核空间,将数组转为链表存储,数量没有上限
1. 内核遍历fd,判断是否就绪
1. 数组就绪后,将pollfd数组拷贝回用户空间,并返回就绪数量n
1. 遍历poolfd数组,找到就绪的fd
与select的区别<br />poll用链表在内核中存储fd,理论上fd的数量没有上限<br />但是他们都需要遍历整个数组😤
<a name="cfrDV"></a>
##### epoll
<a name="RUDdv"></a>
# 2. NIO基础
**NIO**: non-blocking IO 非阻塞式IO
<a name="GZBz3"></a>
## 2.1 NIO三大组件
1. **Channel(通道)**
channel类似于IO中的stream输入输出数据流,它是读写数据的双向通道,可以从channel从读取数据写到缓存buffer中,也可以从buffer缓存中读取数据写入buffer里。
- 与stream流的区别:stream流要么是输入流inputstream(读数据),要么是输出流outputstream(写数据),channel通道既可以读又可以写
常见的Channel: FileChannel, DatagramChannel,SocketChannel,ServerSocketChannel
2. **Buffer (缓冲区)**
buffer用来缓冲数据,常见的Buffer有ByteBuffer,ShortBuffer,IntBuffer,LongBUffer等等
- 总之:Java NIO的核心在于通道(Channel)与缓冲区(Buffer)。通道表示打开IO设备的链接,若使用NIO系统,需要链接IO设备的1通道和容纳数据的缓冲区,获得数据后操作缓冲区,对数据进行处理。即通道负责传输数据,缓冲区负责存储数据。
3. **Selector(选择器)**
selector的作用是配合一个线程管理多各channel,监听channel上发生的事件。所有的channel都工作在非阻塞模式下,不会在一个channel上一直等待。适合连接数特别多,但流量低的场景(low traffic)![未命名文件 (7).png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1643165652455-31dd5339-d561-4d3c-8d49-847bf83e329f.png#clientId=u7d03133d-fe9f-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u09a13ca1&margin=%5Bobject%20Object%5D&name=%E6%9C%AA%E5%91%BD%E5%90%8D%E6%96%87%E4%BB%B6%20%287%29.png&originHeight=510&originWidth=644&originalType=binary&ratio=1&rotation=0&showTitle=false&size=30842&status=done&style=none&taskId=ud015a05b-1585-459d-9be0-a301a7200d9&title=)<br />调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理
<a name="goFEF"></a>
## 2.2 ByteBuffer
<br />
<a name="AZuBG"></a>
### 1. ByteBuffer的使用步骤
1. 通过Stream流或者RandomAccessFile获取channel
1. 调用ByteBuffer.allocate()在内存中分配ByteBuffer
1. 向buffer中写数据,调用channel.read(buffer)
1. 调用buffer.flip(),将buffer切换至读模式
1. 调用buffer.get(),从buffer中读取数据
1. 调用buffer.clear()或buffer.compact(),将buffer切换至写模式
1. 重复3-7步骤
**案例:使用FileChannel读取文件内容**
// data.txt 0123456789abcdef
```java
public static void main(String[] args){
// 通过RandomAccessFile获取channel
try(RandomAccessFile file = new RandomAccessFile("data.txt", "rw")) {
// 1. 获取channel
FileChannel channel = file.getChannel();
// 2. 分配ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(10);
while(true){
// 3. 从channel读 向buffer中写数据
int len = channel.read(buffer);
log.debug("读的字节数: {}", len);
if(len == -1) break;
// 4. 读取buffer中写入的数据
// 将buffer切换至读模式
buffer.flip();
while(buffer.hasRemaining()){
log.debug("{}", (char) buffer.get());
}
// 5. 将buffer切换至写模式
buffer.clear();
System.out.println(buffer);
}
}catch (IOException e) {
e.printStackTrace();
}
}
输出
13:12:21 [DEBUG] [main] h._01ChannelTest03 - 读的字节数: 10
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
13:12:21 [DEBUG] [main] h._01ChannelTest03 - 读的字节数: 6
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [6]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 66 36 37 38 39 |abcdef |
+--------+-------------------------------------------------+----------------+
13:12:21 [DEBUG] [main] h._01ChannelTest03 - 读的字节数: -1
2. ByteBuffer结构
Bytebuffer有三个重要属性:positionlimitcapacity
public abstract class Buffer {
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
....
}
ByteBuffer默认写模式,开始时
3. ByteBuffer常用方法
分配空间
可以使用 allocate 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法
Bytebuffer buf = ByteBuffer.allocate(16);
Bytebuffer buf = ByteBuffer.allocateDirect(16);
// 直接内存,读写效率高,分配效率低;直接内存不涉及垃圾回收,可能会引发内存泄露
向Buffer中写数据
- channel.read(buffer) 从channel中读,向buffer中写数据
buffer.put((byte) 127) 调用buffer的put方法
int len = channel.read(buffer);
buffer.put("Bukayo".getBytes(StandardCharsets.UTF_8));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 42 75 6b 61 79 6f 36 37 38 39 |Bukayo6789 |
+--------+-------------------------------------------------+----------------+
从Buffer中读数据
channel.write(buffer) 从buffer中读,向channel中写数据
- buffer.get() 调用buffer自己的get方法
buffer.get()方法会读取buffer的当前位置,然后会让指针向后走int len = channel.write(buffer);
byte b = buffer.get();
- 可以调用 rewind 方法将 position 重新置为 0
- 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针
mark, reset
- mark()方法会将postion的值保存到mark属性中
reset()方法会将position的值改为mark中保存的值
public static void main(String[] args){
ByteBuffer buffer = ByteBuffer.allocate(10);
// 1. 向buffer中写1个字节的数据
buffer.put((byte) 97);
debugAll(buffer);
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 00 00 |a......... |
+--------+-------------------------------------------------+----------------+
// 2. 向buffer中写4个字节的数据
buffer.put(new byte[]{98, 99, 100, 101});
debugAll(buffer);
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... |
+--------+-------------------------------------------------+----------------+
// 3. 切换至读模式
buffer.flip();
debugAll(buffer);
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... |
+--------+-------------------------------------------------+----------------+
System.out.println((char) buffer.get());
System.out.println((char) buffer.get());
// 调用了两次get()方法后,pos指针指向第三个位置
debugAll(buffer);
+--------+-------------------- all ------------------------+----------------+
position: [2], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... |
+--------+-------------------------------------------------+----------------+
// 4. 调用compact方法,将剩余的三个字符压缩至前面
buffer.compact();
debugAll(buffer);
+--------+-------------------- all ------------------------+----------------+
position: [3], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 64 65 64 65 00 00 00 00 00 |cdede..... |
+--------+-------------------------------------------------+----------------+
// 5. 再次写入新的字符
buffer.put((byte) 102);
buffer.put((byte) 103);
debugAll(buffer);
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 64 65 66 67 00 00 00 00 00 |cdefg..... |
+--------+-------------------------------------------------+----------------+
}
字符串转ByteBuffer
3. Netty基础
4. Netty进阶
4.1 黏包与半包
服务端代码
@Slf4j(topic = "h._01StickyBagServer")
public class _01StickyBagServer{
public static void main(String[] args){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置ByteBuf长度,长度过低会出现半包现象
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
ChannelFuture channelFuture = serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
log.debug("客户端链接成功:{}", ctx.channel().remoteAddress());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception{
log.debug("客户端已断开: {}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf) msg;
// System.out.println(buf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
Channel channel = channelFuture.sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("服务器已关闭!!!");
}
}
}
客户端代码:分10次发送数据
@Slf4j(topic = "h._02StickyBagClient")
public class _02StickyBagClient{
public static void main(String[] args){
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
log.debug("连接建立成功...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
log.debug("正在发送数据...");
Random r = new Random();
char c = 'a';
for(int i = 0; i < 10; i++){
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
super.channelActive(ctx);
}
});
}
}).connect("localhost", 8080);
Channel channel = channelFuture.sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
D:\CodeDevolpment\JDK\bin\java.exe "-javaagent:C:\Users\黄争\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar=56386:C:\Users\黄争\IntelliJ IDEA 2021.1.3\bin" -Dfile.encoding=UTF-8 -classpath D:\CodeDevolpment\JDK\jre\lib\charsets.jar;D:\CodeDevolpment\JDK\jre\lib\ext\access-bridge-64.jar;D:\CodeDevolpment\JDK\jre\lib\ext\cldrdata.jar;D:\CodeDevolpment\JDK\jre\lib\ext\dnsns.jar;D:\CodeDevolpment\JDK\jre\lib\ext\jaccess.jar;D:\CodeDevolpment\JDK\jre\lib\ext\jfxrt.jar;D:\CodeDevolpment\JDK\jre\lib\ext\localedata.jar;D:\CodeDevolpment\JDK\jre\lib\ext\nashorn.jar;D:\CodeDevolpment\JDK\jre\lib\ext\sunec.jar;D:\CodeDevolpment\JDK\jre\lib\ext\sunjce_provider.jar;D:\CodeDevolpment\JDK\jre\lib\ext\sunmscapi.jar;D:\CodeDevolpment\JDK\jre\lib\ext\sunpkcs11.jar;D:\CodeDevolpment\JDK\jre\lib\ext\zipfs.jar;D:\CodeDevolpment\JDK\jre\lib\jce.jar;D:\CodeDevolpment\JDK\jre\lib\jfr.jar;D:\CodeDevolpment\JDK\jre\lib\jfxswt.jar;D:\CodeDevolpment\JDK\jre\lib\jsse.jar;D:\CodeDevolpment\JDK\jre\lib\management-agent.jar;D:\CodeDevolpment\JDK\jre\lib\resources.jar;D:\CodeDevolpment\JDK\jre\lib\rt.jar;O:\Java\Netty网络编程\target\classes;C:\Users\黄争\.m2\repository\io\netty\netty-all\4.1.39.Final\netty-all-4.1.39.Final.jar;C:\Users\黄争\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;C:\Users\黄争\.m2\repository\com\google\code\gson\gson\2.7\gson-2.7.jar;C:\Users\黄争\.m2\repository\com\google\guava\guava\19.0\guava-19.0.jar;C:\Users\黄争\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\黄争\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\黄争\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\黄争\.m2\repository\redis\clients\jedis\3.6.0\jedis-3.6.0.jar;C:\Users\黄争\.m2\repository\org\apache\commons\commons-pool2\2.9.0\commons-pool2-2.9.0.jar;C:\Users\黄争\.m2\repository\junit\junit\4.13.2\junit-4.13.2.jar;C:\Users\黄争\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar _03Netty进阶._01StickyBagServer
17:24:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x25f7d521, L:/127.0.0.1:8080 - R:/127.0.0.1:56588] REGISTERED
17:24:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x25f7d521, L:/127.0.0.1:8080 - R:/127.0.0.1:56588] ACTIVE
17:24:24 [DEBUG] [nioEventLoopGroup-3-1] h._01StickyBagServer - 客户端链接成功:/127.0.0.1:56588
17:24:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x25f7d521, L:/127.0.0.1:8080 - R:/127.0.0.1:56588] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
17:24:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x25f7d521, L:/127.0.0.1:8080 - R:/127.0.0.1:56588] READ COMPLETE
解决方案
短链接:发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
@Slf4j(topic = "h._02StickyBagClient02")
class _02StickyBagClient02{
/**
* 1. 短链接解决黏包问题
* @param args
*/
public static void main(String[] args){
for(int i = 0; i < 10; i++){
send();
}
}
private static void send(){
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
log.debug("连接建立成功...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
log.debug("正在发送数据...");
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 短链接,发完关闭客户端连接
ctx.close();
super.channelActive(ctx);
}
});
}
}).connect("localhost", 8080);
Channel channel = channelFuture.sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
定长解码器:固定长度发送数据包
服务器添加一个定长解码器,一定要放在LoggingHandler的前面,保证数据被打印时已被拆分
@Override
protected void initChannel(SocketChannel ch) throws Exception{
// 数据包长度固定为8:定长解码器要放在最前面
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
ch.pipeline().addLast(new LoggingHandler());
}
客户端:随机发送长度为8的数据包
@Slf4j(topic = "h._02StickyBagClient02")
class _02StickyBagClient02{
/**
* 2. 固定长度解决黏包
*/
public static void main(String[] args){
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
log.debug("连接建立成功...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
log.debug("正在发送数据...");
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
// 分10次发送随机的数据,固定长度为8
for(int i = 0; i < 10; i++){
byte[] bytes = new byte[8];
for(int j = 0; j < r.nextInt(8); j++){
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
super.channelActive(ctx);
}
});
}
}).connect("localhost", 8080);
Channel channel = channelFuture.sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x45f3c5bb, L:/127.0.0.1:8080 - R:/127.0.0.1:64403] REGISTERED
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x45f3c5bb, L:/127.0.0.1:8080 - R:/127.0.0.1:64403] ACTIVE
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] h._01StickyBagServer01 - 客户端链接成功:/127.0.0.1:64403
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x45f3c5bb, L:/127.0.0.1:8080 - R:/127.0.0.1:64403] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 |a....... |
+--------+-------------------------------------------------+----------------+
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x45f3c5bb, L:/127.0.0.1:8080 - R:/127.0.0.1:64403] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 00 00 00 00 00 |bbb..... |
+--------+-------------------------------------------------+----------------+
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x45f3c5bb, L:/127.0.0.1:8080 - R:/127.0.0.1:64403] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 00 00 00 00 00 |ccc..... |
+--------+-------------------------------------------------+----------------+
20:23:15 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x45f3c5bb, L:/127.0.0.1:8080 - R:/127.0.0.1:64403] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 64 64 00 00 00 00 |dddd.... |