经过上面对Netty的概述,以及Netty工作原理图,以及线程模式的分析,我们先写一个Netty-TCP服务的小案例,更深一步理解一下Netty的工作原理,以及Netty的线程模型。
案例
实例要求:使用 IDEA 创建 Netty 项目:
1、Netty服务器在6668端口监听,客户端能发送消息给服务器”hello,服务器~”
2、服务器可以回复消息给客户端”hello,客户端”
实例目的:对 netty线程模型有一个初步认识,便于理解Netty模型理论
NettyServer代码说明:
package com.atguigu.netty.simple;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author : [王振宇]* @version : [v1.0]* @className : NettyServer* @description : [Netty快速开始-TCP服务-服务端]* @createTime : [2021/8/4 9:53]* @updateUser : [王振宇]* @updateTime : [2021/8/4 9:53]* @updateRemark : [描述说明本次修改内容]*/public class NettyServer {public static void main(String[] args) throws Exception {//创建BossGroup 和 WorkerGroup//说明//1. 创建两个线程组 bossGroup 和 workerGroup//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成//3. 两个都是无限循环//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数// 默认实际 cpu核数 * 2// 这个跟一下构造函数一直点下去就会发现// private static final int DEFAULT_EVENT_LOOP_THREADS =// Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); //8try {//创建服务器端的启动对象,配置参数ServerBootstrap bootstrap = new ServerBootstrap();//使用链式编程来进行设置bootstrap.group(bossGroup, workerGroup) //设置两个线程组.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)//给pipeline 设置处理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("客户socketchannel hashcode=" + ch.hashCode());//可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueuech.pipeline().addLast(new NettyServerHandler());}}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器System.out.println(".....服务器 is ready...");//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象//启动服务器(并绑定端口)ChannelFuture cf = bootstrap.bind(6668).sync();//给cf 注册监听器,监控我们关心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("监听端口 6668 成功");} else {System.out.println("监听端口 6668 失败");}}});//对关闭通道进行监听cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
NettyServerHandler代码说明:
package com.atguigu.netty.simple;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;/*** @author : [王振宇]* @version : [v1.0]* @className : NettyServer* @description : [Netty快速开始-TCP服务-server端Handler]* @createTime : [2021/8/4 9:53]* @updateUser : [王振宇]* @updateTime : [2021/8/4 9:53]* @updateRemark : [描述说明本次修改内容]*/public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 说明* 1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)* 2. 这时我们自定义一个Handler , 才能称为一个handler*///读取数据实际(这里我们可以读取客户端发送的消息)/*** 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址* 2. Object msg: 就是客户端发送的数据 默认Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());System.out.println("server ctx =" + ctx);System.out.println("看看channel 和 pipeline的关系");Channel channel = ctx.channel();ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站//将 msg 转成一个 ByteBuf//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.ByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:" + channel.remoteAddress());}//数据读取完毕@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//writeAndFlush 是 write + flush//将数据写入到缓存,并刷新//一般讲,我们对这个发送的数据进行编码ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));}//处理异常, 一般是需要关闭通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
NettyClient代码说明:
package com.atguigu.netty.simple;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws Exception {//客户端需要一个事件循环组EventLoopGroup group = new NioEventLoopGroup();try {//创建客户端启动对象//注意客户端使用的不是 ServerBootstrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();//设置相关参数bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器}});System.out.println("客户端 ok..");//启动客户端去连接服务器端//关于 ChannelFuture 要分析,涉及到netty的异步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//给关闭通道进行监听channelFuture.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}}
NettyClientHandler代码说明:
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/*** @author : [王振宇]* @version : [v1.0]* @className : NettyCliendHandler* @description : [自定义客户端pipeLine 的 handler]* @createTime : [2021/8/4 15:24]* @updateUser : [王振宇]* @updateTime : [2021/8/4 15:24]* @updateRemark : [描述说明本次修改内容]*/public class NettyCliendHandler extends ChannelInboundHandlerAdapter {/*** 当通道就绪就会触发该方法* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("cliend "+ctx);//super.channelActive(ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务器,喵", CharsetUtil.UTF_8));}/*** 当通道有数据读取(有读取事件触发)* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// super.channelRead(ctx, msg);ByteBuf buf = (ByteBuf) msg;System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));System.out.println("服务器端地址:"+ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();//super.exceptionCaught(ctx, cause);}}
运行结果图文:
开启服务器:
开启客户端1:
开启客户端2:
服务端输出:
总结
经过我们对Netty 服务端、服务端处理器、客户端、客户端处理器 代码的编写。可以看出来,服务端这边维护了两组线程,bossGroup 和 workGroup通过上篇文章中对Netty工作原理图文介绍,我们知道bossGroup只是复制链接客户端,实际工作的是workGroup,然后每一个链接都有一个通道pipeline,通道pipeline中我们可以自定义很多处理器,然后结合Netty的ByteBuf进行消息处理分析,经过这个小案例,我们了解了简单的对于Netty的TCP服务,通过两个线程组优美的处理了客户端链。然后一个链接一个Channel和pipline,每个Pipline N个处理器Handler。
