
常用解码器
- FixedLengthFrameDecoder:基于固定长度的解码器
- LineBasedFrameDecoder:基于 \n,\r 的解码器
- DelimiterBasedFrameDecoder:基于分隔符的解码器
- LengthFieldBasedFrameDecoder:基于长度的解码器
ByteBuf
ByteBuf 的分类
- Pooled 和 Unpooled
- Pooled:从已经分配好的内存中取内存
- Unpooled:新创建一块内存
- Unsafe 和 非Unsafe
- Unsafe:依赖 JDK 底层的 Unsafe 对象
- 非Unsafe:不依赖 JDK 底层的 Unsafe 对象
- Heap 和 Direct
- Heap 底层就是 byte 数组
- Direct 依赖于 Nio 的 ByteBuffer 创建出的 DirectByteBuffer(堆外内存)
* <pre>* +-------------------+------------------+------------------+* | discardable bytes | readable bytes | writable bytes |* | | (CONTENT) | |* +-------------------+------------------+------------------+* | | | |* 0 <= readerIndex <= writerIndex <= capacity* </pre>
public class TestByteBuf {public static void main(String[] args) {/*内部使用的用:堆内内存要写出去的用:堆外内存*/// 1、创建一个非池化的 ByteBuf,大小为 10 个字节ByteBuf buf = Unpooled.buffer(10);System.out.println("原始 ByteBuf 为 =======================> " + buf.toString());System.out.println("1、ByteBuf 中的内容为 ==================> " + Arrays.toString(buf.array()) + "\n");// 2、写入一段内容(影响 widx)byte[] bytes = {1, 2, 3, 4, 5};buf.writeBytes(bytes);System.out.println("写入的 bytes 为 =======================> " + Arrays.toString(bytes));System.out.println("写入一段内容后的 ByteBuf 为 ============> " + buf.toString());System.out.println("2、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");// 3、读取一段内容(影响 ridx)byte b1 = buf.readByte();byte b2 = buf.readByte();System.out.println("读取的 bytes 为 =======================> " + Arrays.toString(new byte[]{b1, b2}));System.out.println("读取一段内容后的 ByteBuf 为 ============> " + buf.toString());System.out.println("3、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");// 4、将读取的内容丢弃(影响 ridx, widx)buf.discardReadBytes();System.out.println("将读取的内容丢弃后的 ByteBuf 为 ==========> " + buf.toString());System.out.println("4、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");// 5、清空读写指针(影响 ridx = 0, widx = 0)buf.clear();System.out.println("清空读写指针后的 ByteBuf 为 ============> " + buf.toString());System.out.println("5、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");// 6、再次写入一段内容,比第一次写入的少(影响 widx)bytes = new byte[]{1, 2, 3};buf.writeBytes(bytes);System.out.println("再次写入的 bytes 为 ===================> " + Arrays.toString(bytes));System.out.println("再次写入一段内容后的 ByteBuf 为 ========> " + buf.toString());System.out.println("6、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");// 7、将 buf 清零(不影响指针)buf.setZero(0, buf.capacity());System.out.println("将 buf 清零后的 ByteBuf 为 ============> " + buf.toString());System.out.println("7、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");// 8、再次写入一段内容超过容量的内容(从 widx 开始写,并开始扩容)bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};buf.writeBytes(bytes);System.out.println("再次写入的 bytes 为 ===================> " + Arrays.toString(bytes));System.out.println("再次写入一段内容后的 ByteBuf 为 ========> " + buf.toString());System.out.println("8、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");}}原始 ByteBuf 为 =======================> UnpooledByteBufAllocator$(ridx: 0, widx: 0, cap: 10)1、ByteBuf 中的内容为 ==================> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]写入的 bytes 为 =======================> [1, 2, 3, 4, 5]写入一段内容后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 0, widx: 5, cap: 10)2、ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]读取的 bytes 为 =======================> [1, 2]读取一段内容后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 2, widx: 5, cap: 10)3、ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]将读取的内容丢弃后的 ByteBuf 为 ==========> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)4、ByteBuf 中的内容为 =================> [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]清空读写指针后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 0, widx: 0, cap: 10)5、ByteBuf 中的内容为 =================> [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]再次写入的 bytes 为 ===================> [1, 2, 3]再次写入一段内容后的 ByteBuf 为 ========> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)6、ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]将 buf 清零后的 ByteBuf 为 ============> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)7、ByteBuf 中的内容为 =================> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]再次写入的 bytes 为 ===================> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]再次写入一段内容后的 ByteBuf 为 ========> UnpooledByteBufAllocator$(ridx: 0, widx: 15, cap: 64)8、ByteBuf 中的内容为 =================> [0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ..., 0 ]
Web 客户端 Demo
TestServer
public class TestServer {public static void main(String[] args) {// 接收客户端连接的线程组:主线程,通常给 1EventLoopGroup masterGroup = new NioEventLoopGroup(1);// 处理读写时间的线程组:工作线程,默认是 cpu 的核心数 * 2EventLoopGroup workGroup = new NioEventLoopGroup();try {// 创建服务器启动助手来配置参数ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(masterGroup, workGroup) // 设置线程组.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列中等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持活动连接状态.childHandler(new TestServerInitializer());ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();System.out.println("Server is starting on 9090...");channelFuture.channel().closeFuture().sync(); // 等待关闭,加上 sync 起到阻塞作用} catch (Exception ex) {ex.printStackTrace();} finally {masterGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {/*HttpRequestEncoder:将 HttpRequest 或 HttpContent 编码成 ByteBufHttpRequestDecoder:将 ByteBuf 解码成 HttpRequest 或 HttpContentHttpResponseEncoder:将 HttpResponse 或 HttpContent 编码成 ByteBufHttpResponseDecoder:将 ByteBuf 解码成 HttpResponse 或 HttpContent*/@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("httpServerCodec", new HttpServerCodec()); // ChannelDuplexHandler = Inbound + Outboundpipeline.addLast("testServerHandler", new TestServerHandler()); // ChannelInboundHandler}}
TestServerHandler
public class TestServerHandler extends SimpleChannelInboundHandler<HttpObject> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) throws Exception {if (httpObject instanceof HttpRequest) {HttpRequest httpRequest = (HttpRequest) httpObject;String uri = httpRequest.uri();System.out.println("uri = " + uri);ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());ctx.writeAndFlush(fullHttpResponse);}}}
Socket Demo
TestServer
public class TestServer {public static void main(String[] args) {EventLoopGroup masterGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(masterGroup, workGroup) // 设置线程组.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列中等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持活动连接状态.childHandler(new TestServerInitializer());ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();System.out.println("Server is starting on 9090...");channelFuture.channel().closeFuture().sync(); // 等待关闭,加上 sync 起到阻塞作用} catch (Exception ex) {ex.printStackTrace();} finally {masterGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {/*常用解码器:1、FixedLengthFrameDecoder:基于固定长度的解码器2、LineBasedFrameDecoder:基于 \n,\r 的解码器3、DelimiterBasedFrameDecoder:基于分隔符的解码器4、LengthFieldBasedFrameDecoder:基于长度的解码器*/@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// ChannelInboundHandler// lengthFieldOffset: 长度字段偏差,lengthFieldLength: 长度字段占的字节数// lengthAdjustment: 添加到长度字段的补偿值,initialBytesToStrip: 从解码帧中第一次去除的字节数// 24来自客户端的问候24来自客户端的问候24来自客户端的问候 => 来自客户端的问候 来自客户端的问候 来自客户端的问候pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));// ChannelOutboundHandler: 计算当前待发送消息的二进制字节长度,将该长度添加到 ByteBuf 的缓冲区头中,来标识发送字符串的长度pipeline.addLast(new LengthFieldPrepender(4));// ChannelInboundHandler: 将 byte 解码成 String 数据pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));// ChannelOutboundHandler: 将 String 编码成 byte 数据pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// ChannelInboundHandlerpipeline.addLast(new TestServerHandler());}}
TestServerHandler
public class TestServerHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(channelHandlerContext.channel().remoteAddress() + ", " + s);channelHandlerContext.writeAndFlush("form server: " + UUID.randomUUID());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
TestClient
public class TestClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
TestClientInitializer
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/*
* 1) lengthFieldOffset:长度字段的偏差
* 2) lengthFieldLength:长度字段占的字节数
* 3) lengthAdjustment:添加到长度字段的补偿值
* 4) initialBytesToStrip:从解码帧中第一次去除的字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));// 计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 将byte数据解码成String
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 将字符串编码成byte数据
pipeline.addLast(new TestClientHandler());
}
}
TestClientHandler
public class TestClientHandler extends SimpleChannelInboundHandler<String> {
// 读取服务端数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(channelHandlerContext.channel().remoteAddress() + ", client output = " + s);
// channelHandlerContext.writeAndFlush("form client"+ LocalDateTime.now());
}
// 通道就绪
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush("来自客户端的问候" + i);
}
}
//有异常发生
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}
聊天室
TestServer
public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
System.out.println("Server is starting on 8989...");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// ChannelInboundHandler,基于分隔符的解码器
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 字节 转 String
pipeline.addLast(new TestServerHandler());
// ChannelOutboundHandler
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // String 转 字节
}
}
TestServerHandler
public class TestServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// channel 助手类(拦截器)的添加
// 对于上线:handlerAdded 先执行,channelActive 后执行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// group.writeAndFlush 会发送给组内的所有人
group.writeAndFlush(channel.remoteAddress() + " 加入了\n");
group.add(channel); // 不转发给自己,所以这行代码在下面
}
// channel活跃 通道准备就绪事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 上线了");
System.out.println("在线人数 = " + group.size() + "\n");
}
// channel读取数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel channel = channelHandlerContext.channel();
// 因为自己已经在群组里了,所以要循环判断,不发送给自己
group.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush(channel.remoteAddress() + ":" + s + "\r\n");
}
});
// 向下传播
channelHandlerContext.fireChannelRead(s);
}
// channel不活跃 通道关闭事件
// 对于下线:channelInactive 先执行,handlerRemoved 后执行
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// ctx.channel 会自动从 group 中 remove 掉 channel
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线了");
System.out.println("在线人数 = " + group.size() + "\n");
}
// channel 助手类(拦截器)移除
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
group.writeAndFlush(channel.remoteAddress() + " 离开了\n");
}
// channel注册事件
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
super.channelRegistered(ctx);
}
// channel取消注册事件
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
super.channelUnregistered(ctx);
}
// 发生异常回调
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TestClient
public class TestClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();
Channel channel = channelFuture.channel();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
TestClientInitializer
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));//基于分隔符的解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new TestClientHandler());
}
}
TestClientHandler
public class TestClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim() + "\n");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
心跳检测
利用 netty 自带的 IdleStateHandler 来做心跳检测机制
TestServer
public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接收客户端连接的线程组
EventLoopGroup workGroup = new NioEventLoopGroup(); // 真正处理读写事件的线程组
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
TestServerInitializer
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 2. 配置handler
// 2.1 websocket 基于 http 协议,需要 http 编码和解码工具
pipeline.addLast(new HttpServerCodec());
// 2.2 对于大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 2.3 对于 http 的消息进行聚合,聚合成 FullHttpRequest 或者 FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// 针对客户端,如果 1 分钟之内没有向服务器发送读写心跳,则主动断开
pipeline.addLast(new IdleStateHandler(40, 50, 45));
// 自定义的读写空闲状态检测
pipeline.addLast(new HeartBeatHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 定义自己的handler,主要是对请求进行处理和发送
pipeline.addLast(new ChatHandler());
}
}
TestServerHandler
public class TestServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String str = "";
switch (idleStateEvent.state()) {
case READER_IDLE:
str = "读空闲";
System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
break;
case WRITER_IDLE:
str = "写空闲";
System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
break;
case ALL_IDLE:
str = "读写空闲";
System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
ctx.channel().close();
break;
}
}
}
}
HeartBeatHandler
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
/**
* 用户事件触发的处理器
*
* @param ctx 上下文
* @param evt 事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断 evt 是否属于 IdleStateEvent,用于触发用户事件,包含读空闲,写空闲,读写空闲
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 读空闲,不做处理
System.out.println("进入读空闲");
} else if (event.state() == IdleState.WRITER_IDLE) {
// 写空闲,不做处理
System.out.println("进入写空闲");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
// 关闭channel
Channel channel = ctx.channel();
channel.close();
System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
}
}
}
}
