常用解码器
- FixedLengthFrameDecoder:基于固定长度的解码器
- LineBasedFrameDecoder:基于 \n,\r 的解码器
- DelimiterBasedFrameDecoder:基于分隔符的解码器
- LengthFieldBasedFrameDecoder:基于长度的解码器
ByteBuf
ByteBuf 的分类
- Pooled 和 Unpooled
- Pooled:从已经分配好的内存中取内存
- Unpooled:新创建一块内存
- Unsafe 和 非Unsafe
- Unsafe:依赖 JDK 底层的 Unsafe 对象
- 非Unsafe:不依赖 JDK 底层的 Unsafe 对象
- Heap 和 Direct
- Heap 底层就是 byte 数组
- Direct 依赖于 Nio 的 ByteBuffer 创建出的 DirectByteBuffer(堆外内存)
* <pre>
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
* </pre>
public class TestByteBuf {
public static void main(String[] args) {
/*
内部使用的用:堆内内存
要写出去的用:堆外内存
*/
// 1、创建一个非池化的 ByteBuf,大小为 10 个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始 ByteBuf 为 =======================> " + buf.toString());
System.out.println("1、ByteBuf 中的内容为 ==================> " + Arrays.toString(buf.array()) + "\n");
// 2、写入一段内容(影响 widx)
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的 bytes 为 =======================> " + Arrays.toString(bytes));
System.out.println("写入一段内容后的 ByteBuf 为 ============> " + buf.toString());
System.out.println("2、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
// 3、读取一段内容(影响 ridx)
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的 bytes 为 =======================> " + Arrays.toString(new byte[]{b1, b2}));
System.out.println("读取一段内容后的 ByteBuf 为 ============> " + buf.toString());
System.out.println("3、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
// 4、将读取的内容丢弃(影响 ridx, widx)
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后的 ByteBuf 为 ==========> " + buf.toString());
System.out.println("4、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
// 5、清空读写指针(影响 ridx = 0, widx = 0)
buf.clear();
System.out.println("清空读写指针后的 ByteBuf 为 ============> " + buf.toString());
System.out.println("5、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
// 6、再次写入一段内容,比第一次写入的少(影响 widx)
bytes = new byte[]{1, 2, 3};
buf.writeBytes(bytes);
System.out.println("再次写入的 bytes 为 ===================> " + Arrays.toString(bytes));
System.out.println("再次写入一段内容后的 ByteBuf 为 ========> " + buf.toString());
System.out.println("6、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
// 7、将 buf 清零(不影响指针)
buf.setZero(0, buf.capacity());
System.out.println("将 buf 清零后的 ByteBuf 为 ============> " + buf.toString());
System.out.println("7、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
// 8、再次写入一段内容超过容量的内容(从 widx 开始写,并开始扩容)
bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
buf.writeBytes(bytes);
System.out.println("再次写入的 bytes 为 ===================> " + Arrays.toString(bytes));
System.out.println("再次写入一段内容后的 ByteBuf 为 ========> " + buf.toString());
System.out.println("8、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
}
}
原始 ByteBuf 为 =======================> UnpooledByteBufAllocator$(ridx: 0, widx: 0, cap: 10)
1、ByteBuf 中的内容为 ==================> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的 bytes 为 =======================> [1, 2, 3, 4, 5]
写入一段内容后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 0, widx: 5, cap: 10)
2、ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
读取的 bytes 为 =======================> [1, 2]
读取一段内容后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 2, widx: 5, cap: 10)
3、ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将读取的内容丢弃后的 ByteBuf 为 ==========> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)
4、ByteBuf 中的内容为 =================> [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
清空读写指针后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 0, widx: 0, cap: 10)
5、ByteBuf 中的内容为 =================> [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
再次写入的 bytes 为 ===================> [1, 2, 3]
再次写入一段内容后的 ByteBuf 为 ========> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)
6、ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将 buf 清零后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)
7、ByteBuf 中的内容为 =================> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
再次写入的 bytes 为 ===================> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
再次写入一段内容后的 ByteBuf 为 ========> UnpooledByteBufAllocator$(ridx: 0, widx: 15, cap: 64)
8、ByteBuf 中的内容为 =================> [0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ..., 0 ]
Web 客户端 Demo
TestServer
public class TestServer {
public static void main(String[] args) {
// 接收客户端连接的线程组:主线程,通常给 1
EventLoopGroup masterGroup = new NioEventLoopGroup(1);
// 处理读写时间的线程组:工作线程,默认是 cpu 的核心数 * 2
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 创建服务器启动助手来配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(masterGroup, workGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列中等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持活动连接状态
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
System.out.println("Server is starting on 9090...");
channelFuture.channel().closeFuture().sync(); // 等待关闭,加上 sync 起到阻塞作用
} catch (Exception ex) {
ex.printStackTrace();
} finally {
masterGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
/*
HttpRequestEncoder:将 HttpRequest 或 HttpContent 编码成 ByteBuf
HttpRequestDecoder:将 ByteBuf 解码成 HttpRequest 或 HttpContent
HttpResponseEncoder:将 HttpResponse 或 HttpContent 编码成 ByteBuf
HttpResponseDecoder:将 ByteBuf 解码成 HttpResponse 或 HttpContent
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("httpServerCodec", new HttpServerCodec()); // ChannelDuplexHandler = Inbound + Outbound
pipeline.addLast("testServerHandler", new TestServerHandler()); // ChannelInboundHandler
}
}
TestServerHandler
public class TestServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) throws Exception {
if (httpObject instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) httpObject;
String uri = httpRequest.uri();
System.out.println("uri = " + uri);
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
ctx.writeAndFlush(fullHttpResponse);
}
}
}
Socket Demo
TestServer
public class TestServer {
public static void main(String[] args) {
EventLoopGroup masterGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(masterGroup, workGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列中等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持活动连接状态
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
System.out.println("Server is starting on 9090...");
channelFuture.channel().closeFuture().sync(); // 等待关闭,加上 sync 起到阻塞作用
} catch (Exception ex) {
ex.printStackTrace();
} finally {
masterGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
/*
常用解码器:
1、FixedLengthFrameDecoder:基于固定长度的解码器
2、LineBasedFrameDecoder:基于 \n,\r 的解码器
3、DelimiterBasedFrameDecoder:基于分隔符的解码器
4、LengthFieldBasedFrameDecoder:基于长度的解码器
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ChannelInboundHandler
// lengthFieldOffset: 长度字段偏差,lengthFieldLength: 长度字段占的字节数
// lengthAdjustment: 添加到长度字段的补偿值,initialBytesToStrip: 从解码帧中第一次去除的字节数
// 24来自客户端的问候24来自客户端的问候24来自客户端的问候 => 来自客户端的问候 来自客户端的问候 来自客户端的问候
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// ChannelOutboundHandler: 计算当前待发送消息的二进制字节长度,将该长度添加到 ByteBuf 的缓冲区头中,来标识发送字符串的长度
pipeline.addLast(new LengthFieldPrepender(4));
// ChannelInboundHandler: 将 byte 解码成 String 数据
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// ChannelOutboundHandler: 将 String 编码成 byte 数据
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// ChannelInboundHandler
pipeline.addLast(new TestServerHandler());
}
}
TestServerHandler
public class TestServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(channelHandlerContext.channel().remoteAddress() + ", " + s);
channelHandlerContext.writeAndFlush("form server: " + UUID.randomUUID());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TestClient
public class TestClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
TestClientInitializer
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/*
* 1) lengthFieldOffset:长度字段的偏差
* 2) lengthFieldLength:长度字段占的字节数
* 3) lengthAdjustment:添加到长度字段的补偿值
* 4) initialBytesToStrip:从解码帧中第一次去除的字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));// 计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 将byte数据解码成String
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 将字符串编码成byte数据
pipeline.addLast(new TestClientHandler());
}
}
TestClientHandler
public class TestClientHandler extends SimpleChannelInboundHandler<String> {
// 读取服务端数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(channelHandlerContext.channel().remoteAddress() + ", client output = " + s);
// channelHandlerContext.writeAndFlush("form client"+ LocalDateTime.now());
}
// 通道就绪
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush("来自客户端的问候" + i);
}
}
//有异常发生
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}
聊天室
TestServer
public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
System.out.println("Server is starting on 8989...");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// ChannelInboundHandler,基于分隔符的解码器
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 字节 转 String
pipeline.addLast(new TestServerHandler());
// ChannelOutboundHandler
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // String 转 字节
}
}
TestServerHandler
public class TestServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// channel 助手类(拦截器)的添加
// 对于上线:handlerAdded 先执行,channelActive 后执行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// group.writeAndFlush 会发送给组内的所有人
group.writeAndFlush(channel.remoteAddress() + " 加入了\n");
group.add(channel); // 不转发给自己,所以这行代码在下面
}
// channel活跃 通道准备就绪事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 上线了");
System.out.println("在线人数 = " + group.size() + "\n");
}
// channel读取数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel channel = channelHandlerContext.channel();
// 因为自己已经在群组里了,所以要循环判断,不发送给自己
group.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush(channel.remoteAddress() + ":" + s + "\r\n");
}
});
// 向下传播
channelHandlerContext.fireChannelRead(s);
}
// channel不活跃 通道关闭事件
// 对于下线:channelInactive 先执行,handlerRemoved 后执行
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// ctx.channel 会自动从 group 中 remove 掉 channel
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线了");
System.out.println("在线人数 = " + group.size() + "\n");
}
// channel 助手类(拦截器)移除
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
group.writeAndFlush(channel.remoteAddress() + " 离开了\n");
}
// channel注册事件
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
super.channelRegistered(ctx);
}
// channel取消注册事件
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
super.channelUnregistered(ctx);
}
// 发生异常回调
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TestClient
public class TestClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();
Channel channel = channelFuture.channel();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
TestClientInitializer
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));//基于分隔符的解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new TestClientHandler());
}
}
TestClientHandler
public class TestClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim() + "\n");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
心跳检测
利用 netty 自带的 IdleStateHandler 来做心跳检测机制
TestServer
public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接收客户端连接的线程组
EventLoopGroup workGroup = new NioEventLoopGroup(); // 真正处理读写事件的线程组
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 2. 配置handler
// 2.1 websocket 基于 http 协议,需要 http 编码和解码工具
pipeline.addLast(new HttpServerCodec());
// 2.2 对于大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 2.3 对于 http 的消息进行聚合,聚合成 FullHttpRequest 或者 FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// 针对客户端,如果 1 分钟之内没有向服务器发送读写心跳,则主动断开
pipeline.addLast(new IdleStateHandler(40, 50, 45));
// 自定义的读写空闲状态检测
pipeline.addLast(new HeartBeatHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 定义自己的handler,主要是对请求进行处理和发送
pipeline.addLast(new ChatHandler());
}
}
TestServerHandler
public class TestServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String str = "";
switch (idleStateEvent.state()) {
case READER_IDLE:
str = "读空闲";
System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
break;
case WRITER_IDLE:
str = "写空闲";
System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
break;
case ALL_IDLE:
str = "读写空闲";
System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
ctx.channel().close();
break;
}
}
}
}
HeartBeatHandler
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
/**
* 用户事件触发的处理器
*
* @param ctx 上下文
* @param evt 事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断 evt 是否属于 IdleStateEvent,用于触发用户事件,包含读空闲,写空闲,读写空闲
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 读空闲,不做处理
System.out.println("进入读空闲");
} else if (event.state() == IdleState.WRITER_IDLE) {
// 写空闲,不做处理
System.out.println("进入写空闲");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
// 关闭channel
Channel channel = ctx.channel();
channel.close();
System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
}
}
}
}