一、实例要求
实例要求:使用 IDEA 创建 Netty 项目
- Netty 服务器在 6668 端口监听,客户端能发送消息给服务器”hello,服务器~”
- 服务器可以回复消息给客户端”hello,客户端~”
- 目的:对 Netty 线程模型有一个初步认识,便于理解 Netty 模型理论
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
- @description:
- @Author: wangchao
@Date: 2021/12/6 */ public class NettyServer { public static void main(String[] args) throws InterruptedException {
// 创建 BossGroup 和 WorkerGroup
// 说明
// 1. 创建两个线程组 bossGroup 和 workerGroup
// 2. bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
// 3. 两个都是无限循环
// 4. bossgroup 和 workGroup 含有的子线程(NIOEventGroup)的个数 默认实际是cpu的核数
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 12核cpu,默认是24个线程 NettyRuntime.availableProcessors() * 2
// NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 方便测试,设置为 3 个线程
NioEventLoopGroup workerGroup = new NioEventLoopGroup(3);
try {
// 创建服务器端的启用对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup)// 设置两个现场组
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道测试对象(匿名)
// 给 pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 在管道的最后添加一个处理器(自定义的)
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println(".....服务器 is ready");
// 绑定一个端口并且同步,生成一个 channelFuture 对象
// 启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(6677).sync();
// 对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
NettyServerHandler
```java package com.supkingx.netty.simple;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;
/**
- 说明
- 1、我们自定义一个 Handler 需要继续netty 规定好的谋和 handlerAdapter(规范)
- 2、这时我们自定义一个Handler,才能称为一个handler *
- @description:
- @Author: wangchao
@Date: 2021/12/6 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 读取数据实际(这里我们可以继续读取客户端发送的消息)
/**
- @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
- @param msg 就是客户端发送的数据 默认Object
- @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(“服务器读取线程 “ + Thread.currentThread().getName()); System.out.println(“server ctx=” + ctx); System.out.println(“看看channel 和 pipeline的关系”); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline();
// 将 msg 转化成一个 ByteBuffer
// ByteBuf 是 Netty 提供的,不是NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* 可以在这里向通道写回数据
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 是 write + flush
// 将数据写入到缓存并刷新
// 一般讲,我们对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,client", CharsetUtil.UTF_8));
}
/**
* 处理异常
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务端异常了");
cause.printStackTrace();
ctx.close();
}
}
<a name="zUi9H"></a>
## 2、客户端
<a name="YGdkr"></a>
### NettyClient
```java
package com.supkingx.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @description:
* @Author: wangchao
* @Date: 2021/12/6
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环跑
NioEventLoopGroup group = new NioEventLoopGroup();
try {
// 创建客户端启动对象
// 注意客户端使用的不是 serverBootstrap 而是 BootStrap
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 在管道的最后添加一个处理器(自定义的)
channel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端。。ok");
// 启动客户端去连接服务器端
// 关于 channelFuture 要分析,涉及到 netty 的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6677).sync();
// 给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅的关闭
group.shutdownGracefully();
}
}
}
NettyClientHandler
package com.supkingx.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
/**
* @description:
* @Author: wangchao
* @Date: 2021/12/6
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪时,就会触发这个方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server!", StandardCharsets.UTF_8));
}
/**
* 当通道有时间读取时,会触发
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客户端异常了");
cause.printStackTrace();
ctx.close();
}
}
三、源码简单分析
NioEventLoopGroup
1.默认线程数是0
�
2.进入这个方法内部一直深入,直到找到如下方法,可以看到当线程数=0的时候,返回一个默认的线程
3.默认线程如下,即当前CPU核心数(NettyRuntime.availableProcessors())*2
4.debug继续,由于我的电脑是12核心,所以这时的 NioEventLoopGroup 使用 24 线程,即有 24个children
children部分展示如下,有24个
验证workgroup每次都会启动一个新线程
- 这里我们启动了两个client,一个server,可以看到server使用了两个线程来对应每个客户端。
- 由于我们24个线程,启动24个客户端过于繁琐,这里将workGroup的线程数设置为3,便于测试。
可以看到,启动了4个客户端之后,workGroup会依次使用线程,第四个客户端会去使用第一个线程。
chanel 和 pipeline
通过下图可以简单的发现。
- channel中有pipeline,pipeline中有channel,这两个相互对应
- pipeline 是一个双向链表
四、上诉实例能不能再次优化?
1、缺陷
可以在 NettyServerHandler 类的 channelRead 方法中添加如下代码,可以发现当接受客户端的消息时,在这里被 阻塞 了,这样是不行的,本文将会介绍如何优化。
try {
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,你好!", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常" + e.getMessage());
}
2、简单优化
可以在 NettyServerHandler 类的 channelRead 中添加异步任务,即可解决阻塞问题。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如我们有一个耗时很长的任务 -> 异步执行 -> 提交该channel 对应的 NIOEventloop 的 taskQueue中
// 解决方案1 用哪个户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,你好!", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常" + e.getMessage());
}
}
});
System.out.println("server....go");
}
上述方案的异步原理是,将异步任务放到 ChannelHandlerContext(DefaultChannelHandlerContext
�类型)—>pipeline(DefaultChannelPipeline�类型) —> channel(NioSocketChannel�类型) —> evenLoop(NioEventLoop类型)—>taskQueue(NioEventLoop�类型) 中,等待执行。
3、探究 taskQueue
疑问:上诉优化中,假如 channelRead 有两个异步任务,这两个异步任务会怎么执行?
例如:在 channelRead 方法中有两个线程,第一个任务阻塞 10s,第二个任务阻塞 20s,当程序运行,客户端发送消息过来时,会是怎么样的?
假如客户端 2:00:00 发送消息到客户端,两种情况 1、第一个任务 2:00:10执行,第二个任务2:00:20执行 2、第一个任务 2:00:10执行,第二个任务2:00:30执行
答案是 第二种情况,当有两个线程在 taskQueue 中时,这两个线程会顺序执行。代码如下
com.supkingx.netty.simple2.NettyServerHandler#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如我们有一个耗时很长的任务 -> 异步执行 -> 提交该channel 对应的 NIOEventloop 的 taskQueue中
// 解决方案1 用哪个户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
ctx.writeAndFlush(Unpooled.copiedBuffer(now + " hello,客户端,你好!", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常" + e.getMessage());
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20 * 1000);
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
ctx.writeAndFlush(Unpooled.copiedBuffer(now + " hello,客户端,你好!", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常" + e.getMessage());
}
}
});
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println("server....go," + now);
}
执行代码并验证
服务端在 2021-12-09 00:03:22 收到客户端的消息,由于是异步,所以先输出了 server….go
等待 10s至00:03:32时,第一个任务执行并向客户端发送消息,客户端收到消息
等待 30s至00:03:52时,第二个任务执行并向客户端发送消息,客户端收到消息
如上可验证结果。
总结
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket网络通道。
- NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个 NioEventLoop
- 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
- ChannelHandlerContext(DefaultChannelHandlerContext
�类型)—>pipeline(DefaultChannelPipeline�类型) —> channel(NioSocketChannel�类型) —> evenLoop(NioEventLoop类型)—>taskQueue(NioEventLoop�类型) 中的 taskQueue 是串行设计。