经过上面对Netty的概述,以及Netty工作原理图,以及线程模式的分析,我们先写一个Netty-TCP服务的小案例,更深一步理解一下Netty的工作原理,以及Netty的线程模型。文章开头.jpg

案例

实例要求:使用 IDEA 创建 Netty 项目:
1、Netty服务器在6668端口监听,客户端能发送消息给服务器”hello,服务器~”
2、服务器可以回复消息给客户端”hello,客户端”

实例目的:对 netty线程模型有一个初步认识,便于理解Netty模型理论

NettyServer代码说明:

  1. package com.atguigu.netty.simple;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelFutureListener;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.ChannelOption;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;
  11. /**
  12. * @author : [王振宇]
  13. * @version : [v1.0]
  14. * @className : NettyServer
  15. * @description : [Netty快速开始-TCP服务-服务端]
  16. * @createTime : [2021/8/4 9:53]
  17. * @updateUser : [王振宇]
  18. * @updateTime : [2021/8/4 9:53]
  19. * @updateRemark : [描述说明本次修改内容]
  20. */
  21. public class NettyServer {
  22. public static void main(String[] args) throws Exception {
  23. //创建BossGroup 和 WorkerGroup
  24. //说明
  25. //1. 创建两个线程组 bossGroup 和 workerGroup
  26. //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
  27. //3. 两个都是无限循环
  28. //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
  29. // 默认实际 cpu核数 * 2
  30. // 这个跟一下构造函数一直点下去就会发现
  31. // private static final int DEFAULT_EVENT_LOOP_THREADS =
  32. // Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
  33. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  34. EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
  35. try {
  36. //创建服务器端的启动对象,配置参数
  37. ServerBootstrap bootstrap = new ServerBootstrap();
  38. //使用链式编程来进行设置
  39. bootstrap.group(bossGroup, workerGroup) //设置两个线程组
  40. .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
  41. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
  42. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
  43. // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
  44. .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
  45. //给pipeline 设置处理器
  46. @Override
  47. protected void initChannel(SocketChannel ch) throws Exception {
  48. System.out.println("客户socketchannel hashcode=" + ch.hashCode());
  49. //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
  50. ch.pipeline().addLast(new NettyServerHandler());
  51. }
  52. }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
  53. System.out.println(".....服务器 is ready...");
  54. //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
  55. //启动服务器(并绑定端口)
  56. ChannelFuture cf = bootstrap.bind(6668).sync();
  57. //给cf 注册监听器,监控我们关心的事件
  58. cf.addListener(new ChannelFutureListener() {
  59. @Override
  60. public void operationComplete(ChannelFuture future) throws Exception {
  61. if (cf.isSuccess()) {
  62. System.out.println("监听端口 6668 成功");
  63. } else {
  64. System.out.println("监听端口 6668 失败");
  65. }
  66. }
  67. });
  68. //对关闭通道进行监听
  69. cf.channel().closeFuture().sync();
  70. }finally {
  71. bossGroup.shutdownGracefully();
  72. workerGroup.shutdownGracefully();
  73. }
  74. }
  75. }

NettyServerHandler代码说明:

  1. package com.atguigu.netty.simple;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.Channel;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.channel.ChannelPipeline;
  8. import io.netty.util.CharsetUtil;
  9. /**
  10. * @author : [王振宇]
  11. * @version : [v1.0]
  12. * @className : NettyServer
  13. * @description : [Netty快速开始-TCP服务-server端Handler]
  14. * @createTime : [2021/8/4 9:53]
  15. * @updateUser : [王振宇]
  16. * @updateTime : [2021/8/4 9:53]
  17. * @updateRemark : [描述说明本次修改内容]
  18. */
  19. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  20. /**
  21. * 说明
  22. * 1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)
  23. * 2. 这时我们自定义一个Handler , 才能称为一个handler
  24. */
  25. //读取数据实际(这里我们可以读取客户端发送的消息)
  26. /**
  27. * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
  28. * 2. Object msg: 就是客户端发送的数据 默认Object
  29. */
  30. @Override
  31. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  32. System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
  33. System.out.println("server ctx =" + ctx);
  34. System.out.println("看看channel 和 pipeline的关系");
  35. Channel channel = ctx.channel();
  36. ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
  37. //将 msg 转成一个 ByteBuf
  38. //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
  39. ByteBuf buf = (ByteBuf) msg;
  40. System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
  41. System.out.println("客户端地址:" + channel.remoteAddress());
  42. }
  43. //数据读取完毕
  44. @Override
  45. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  46. //writeAndFlush 是 write + flush
  47. //将数据写入到缓存,并刷新
  48. //一般讲,我们对这个发送的数据进行编码
  49. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
  50. }
  51. //处理异常, 一般是需要关闭通道
  52. @Override
  53. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  54. ctx.close();
  55. }
  56. }

NettyClient代码说明:

  1. package com.atguigu.netty.simple;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. public class NettyClient {
  10. public static void main(String[] args) throws Exception {
  11. //客户端需要一个事件循环组
  12. EventLoopGroup group = new NioEventLoopGroup();
  13. try {
  14. //创建客户端启动对象
  15. //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
  16. Bootstrap bootstrap = new Bootstrap();
  17. //设置相关参数
  18. bootstrap.group(group) //设置线程组
  19. .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
  20. .handler(new ChannelInitializer<SocketChannel>() {
  21. @Override
  22. protected void initChannel(SocketChannel ch) throws Exception {
  23. ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
  24. }
  25. });
  26. System.out.println("客户端 ok..");
  27. //启动客户端去连接服务器端
  28. //关于 ChannelFuture 要分析,涉及到netty的异步模型
  29. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
  30. //给关闭通道进行监听
  31. channelFuture.channel().closeFuture().sync();
  32. } finally {
  33. group.shutdownGracefully();
  34. }
  35. }
  36. }

NettyClientHandler代码说明:

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.util.CharsetUtil;
  6. /**
  7. * @author : [王振宇]
  8. * @version : [v1.0]
  9. * @className : NettyCliendHandler
  10. * @description : [自定义客户端pipeLine 的 handler]
  11. * @createTime : [2021/8/4 15:24]
  12. * @updateUser : [王振宇]
  13. * @updateTime : [2021/8/4 15:24]
  14. * @updateRemark : [描述说明本次修改内容]
  15. */
  16. public class NettyCliendHandler extends ChannelInboundHandlerAdapter {
  17. /**
  18. * 当通道就绪就会触发该方法
  19. * @param ctx
  20. * @throws Exception
  21. */
  22. @Override
  23. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  24. System.out.println("cliend "+ctx);
  25. //super.channelActive(ctx);
  26. ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务器,喵", CharsetUtil.UTF_8));
  27. }
  28. /**
  29. * 当通道有数据读取(有读取事件触发)
  30. * @param ctx
  31. * @param msg
  32. * @throws Exception
  33. */
  34. @Override
  35. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  36. // super.channelRead(ctx, msg);
  37. ByteBuf buf = (ByteBuf) msg;
  38. System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
  39. System.out.println("服务器端地址:"+ctx.channel().remoteAddress());
  40. }
  41. @Override
  42. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  43. cause.printStackTrace();
  44. ctx.close();
  45. //super.exceptionCaught(ctx, cause);
  46. }
  47. }

运行结果图文:
开启服务器:
image.png
开启客户端1:
image.png
开启客户端2:
image.png
服务端输出:
image.png

总结

经过我们对Netty 服务端、服务端处理器、客户端、客户端处理器 代码的编写。可以看出来,服务端这边维护了两组线程,bossGroup 和 workGroup通过上篇文章中对Netty工作原理图文介绍,我们知道bossGroup只是复制链接客户端,实际工作的是workGroup,然后每一个链接都有一个通道pipeline,通道pipeline中我们可以自定义很多处理器,然后结合Netty的ByteBuf进行消息处理分析,经过这个小案例,我们了解了简单的对于Netty的TCP服务,通过两个线程组优美的处理了客户端链。然后一个链接一个Channel和pipline,每个Pipline N个处理器Handler。