1.分隔符解码器
在日常项目中,我们可能会遇到并不是用回车换行分隔符来修饰包尾,有可能会想自定义分隔符来进行拆包粘包问题的解决,这时就要用到netty里面的DelimiterBasedFrameDecoder解码器来解决上述问题
1.服务端代码
package demo1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 19:11:57
* @describe: 时间服务器服务端
*/
public class TimeServer {
/**
* 开启socket链接
*
* @param port 端口号
*/
public void bind(int port) {
//1.创建线程组用于接受服务端链接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//2.创建线程组用户socketChannel读写
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
//3.创建服务端
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//3.绑定端口,开启同步等待
ChannelFuture sync = server.bind(port).sync();
System.out.println("服务端已启动!");
//4.等待服务端监听端口关闭
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("-ftc".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
private class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端已与" + ctx.channel().id() + "建立链接");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.字符串判定
System.out.println("收到" + ctx.channel().id() + "渠道的请求:" + msg);
//2.定义返回结果
String responseMessage = "响应信息-ftc";
//3.相应数据
ByteBuf response = Unpooled.copiedBuffer(responseMessage.getBytes());
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
2.客户端代码
package demo1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 21:09:16
* @describe: 时间服务器客户端
*/
public class TimeClient {
/**
* 创建客户端链接
*
* @param host 主机
* @param port 端口号
*/
public void connect(String host, int port) {
//1. 创建用于IO读写的线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.创建客户端启动类
Bootstrap bootstrap = new Bootstrap();
//3.启动类初始化
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("-ftc".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//4.发起异步链接操作
ChannelFuture sync = bootstrap.connect(host, port).sync();
//5.等待客户端链路链接关闭
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 21:01:23
* @describe: 时间服务器客户端
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 20; i++) {
String requestMessage = "发送消息-ftc";
ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
message.writeBytes(requestMessage.getBytes());
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到服务端答复:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("链接出现异常,异常信息:" + cause.getMessage());
ctx.close();
}
}
}
3.调试结果
可以看到客户端在发送消息时在包体尾部添加”-ftc”来表示一个数据包的结束,通过DelimiterBasedFrameDecoder实现了拆包粘包问题的解决
4.原理
与LineBasedFrameDecoder原理一样,都是通过遍历字节数组找到对应的包尾标识,来对缓冲区字节数组进行包体划分,进而解决的拆包粘包问题
2.定长解码器
除了在包体增加标识位外,也可以通过消息定长来实现拆包粘包问题的解决。netty中FixedLengthFrameDecoder可以解决上述问题
1.服务端代码
package demo1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 19:11:57
* @describe: 时间服务器服务端
*/
public class TimeServer {
/**
* 开启socket链接
*
* @param port 端口号
*/
public void bind(int port) {
//1.创建线程组用于接受服务端链接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//2.创建线程组用户socketChannel读写
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
//3.创建服务端
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//3.绑定端口,开启同步等待
ChannelFuture sync = server.bind(port).sync();
System.out.println("服务端已启动!");
//4.等待服务端监听端口关闭
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
private class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端已与" + ctx.channel().id() + "建立链接");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.字符串判定
System.out.println("收到" + ctx.channel().id() + "渠道的请求:" + msg);
//2.定义返回结果
String responseMessage = "nihao woshidasahguaa";
//3.相应数据
ByteBuf response = Unpooled.copiedBuffer(responseMessage.getBytes());
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
2.客户端代码
package demo1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 21:09:16
* @describe: 时间服务器客户端
*/
public class TimeClient {
/**
* 创建客户端链接
*
* @param host 主机
* @param port 端口号
*/
public void connect(String host, int port) {
//1. 创建用于IO读写的线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.创建客户端启动类
Bootstrap bootstrap = new Bootstrap();
//3.启动类初始化
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//4.发起异步链接操作
ChannelFuture sync = bootstrap.connect(host, port).sync();
//5.等待客户端链路链接关闭
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 21:01:23
* @describe: 时间服务器客户端
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 20; i++) {
String requestMessage = "nihao woshidasahchun";
ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
message.writeBytes(requestMessage.getBytes());
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到服务端答复:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("链接出现异常,异常信息:" + cause.getMessage());
ctx.close();
}
}
}
package demo1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 21:09:16
* @describe: 时间服务器客户端
*/
public class TimeClient {
/**
* 创建客户端链接
*
* @param host 主机
* @param port 端口号
*/
public void connect(String host, int port) {
//1. 创建用于IO读写的线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.创建客户端启动类
Bootstrap bootstrap = new Bootstrap();
//3.启动类初始化
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("-ftc".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//4.发起异步链接操作
ChannelFuture sync = bootstrap.connect(host, port).sync();
//5.等待客户端链路链接关闭
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
/**
* @author 冯铁城 [17615007230@163.com]
* @date 2022-05-09 21:01:23
* @describe: 时间服务器客户端
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 20; i++) {
String requestMessage = "发送消息-ftc";
ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
message.writeBytes(requestMessage.getBytes());
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到服务端答复:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("链接出现异常,异常信息:" + cause.getMessage());
ctx.close();
}
}
}
3.调试结果
如图服务端与客户端分别发送了长度为20的消息,同时设置FixedLengthFrameDecoder的定长为20,客户端与服务端的消息收发完全没有问题
4.原理
通过定长指定消息的长度,只需要按照定长读取缓冲区字节数组即可。读取规定长度后,标志位重置再继续读取下一个包内容