实现简单的服务端与客户端的通信的小demo

image.png

服务端

服务端启动类

  1. package com.jili.helloword.server;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import java.net.InetSocketAddress;
  11. /**
  12. * @author: Jili
  13. * @date: Created on 2022/4/22 15:52
  14. * 服务器端启动类
  15. */
  16. public class AppServerHello {
  17. /** netty的Reactor线程池,初始化一个NioEventLoop数组,用来处理I/O
  18. * boss 线程组用于处理连接工作
  19. */
  20. private EventLoopGroup boss = new NioEventLoopGroup();
  21. /**
  22. * work 线程组用于数据处理
  23. */
  24. private EventLoopGroup work = new NioEventLoopGroup();
  25. private int port;
  26. public AppServerHello(int port){
  27. this.port=port;
  28. }
  29. public void run() throws Exception{
  30. try{
  31. //启动NIO服务
  32. ServerBootstrap bootstrap = new ServerBootstrap();
  33. bootstrap.group(boss)
  34. //通过工厂方法设计模式实例化channel
  35. .channel(NioServerSocketChannel.class)
  36. //设置端口
  37. .localAddress(new InetSocketAddress(port))
  38. //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
  39. .option(ChannelOption.SO_BACKLOG, 1024)
  40. //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
  41. .childOption(ChannelOption.SO_KEEPALIVE, true)
  42. //将小的数据包包装成更大的帧进行传送,提高网络的负载
  43. .childOption(ChannelOption.TCP_NODELAY, true)
  44. .childHandler(new ChannelInitializer<SocketChannel>() {
  45. //ChannelInitializer是一个特殊的类,他的目的是榜示使用者配置一个新的Channel,用于把许多自定义的处理类增加带pipline上来
  46. @Override
  47. protected void initChannel(SocketChannel socketChannel) throws Exception {
  48. //在这里配置具体数据接收方法的处理
  49. socketChannel.pipeline().addLast(new HandlerServerHello());
  50. }
  51. });
  52. //绑定服务器,该实力将提供有关的io操作结果或状态信息
  53. ChannelFuture channelFuture = bootstrap.bind().sync();
  54. System.out.println("在"+channelFuture.channel().localAddress()+"startup");
  55. //阻塞操作,closeFuture()开启一个channel的监听器,直到链路断开
  56. channelFuture.channel().closeFuture().sync();
  57. }finally {
  58. //关闭EventLoopGroup并释放所有资源,包括所有创建线程
  59. boss.shutdownGracefully().sync();
  60. work.shutdownGracefully().sync();
  61. }
  62. }
  63. public static void main(String[] args) throws Exception {
  64. new AppServerHello(18080).run();
  65. }
  66. }

服务端处理类

  1. package com.jili.helloword.server;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandler;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.util.CharsetUtil;
  8. /**
  9. * @author: Jili
  10. * @date: Created on 2022/4/20 16:59
  11. */
  12. @ChannelHandler.Sharable
  13. public class HandlerServerHello extends ChannelInboundHandlerAdapter {
  14. @Override
  15. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  16. //处理收到消息数据,并反馈给客户端
  17. ByteBuf in = (ByteBuf)msg;
  18. System.out.println("收到客户端发来的消息:"+in.toString(CharsetUtil.UTF_8));
  19. //写入并发消息给客户端
  20. ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,收到消息!",CharsetUtil.UTF_8));
  21. }
  22. @Override
  23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  24. //出现异常的时候执行(打印并关闭通道)
  25. cause.printStackTrace();
  26. ctx.close();
  27. }
  28. }

客户端

客户端启动类

  1. package com.jili.helloword.client;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.sctp.nio.NioSctpChannel;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioSocketChannel;
  11. import io.netty.util.CharsetUtil;
  12. import java.net.InetSocketAddress;
  13. /**
  14. * @author: Jili
  15. * @date: Created on 2022/4/20 16:03
  16. * 客户端启动类
  17. */
  18. public class AppClientHello {
  19. private final String host;
  20. private final int port;
  21. public AppClientHello(String host,int port) {
  22. this.host = host;
  23. this.port = port;
  24. }
  25. public void run() throws Exception{
  26. /**
  27. * 配置相应的参数,提供连接到远端
  28. */
  29. EventLoopGroup group = new NioEventLoopGroup();//io线程池
  30. EventLoopGroup work = new NioEventLoopGroup();
  31. try {
  32. Bootstrap bs = new Bootstrap();//启动辅助对象
  33. bs.group(group)
  34. .channel(NioSocketChannel.class)
  35. .remoteAddress(new InetSocketAddress(host,port))
  36. .handler(new ChannelInitializer<SocketChannel>() { //进行通道初始化配置
  37. @Override
  38. protected void initChannel(SocketChannel socketChannel) throws Exception {
  39. socketChannel.pipeline().addLast(new HandlerClientHello()); //添加自定义handler
  40. }
  41. });
  42. //连接到远程节点,等待连接完成
  43. ChannelFuture future = bs.connect().sync();
  44. //发送消息到服务器
  45. future.channel().writeAndFlush(Unpooled.copiedBuffer("发送消息:hello world", CharsetUtil.UTF_8));
  46. //阻塞操作,closeFuture开启一个channel的监听器(这期间channel进行各种工作),直到链路断开
  47. future.channel().closeFuture().sync();
  48. } finally {
  49. group.shutdownGracefully().sync();
  50. }
  51. }
  52. public static void main(String[] args) throws Exception{
  53. new AppClientHello("127.0.0.1",18080).run();
  54. }
  55. }

客户端处理类

  1. package com.jili.helloword.client;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandler;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.util.CharsetUtil;
  7. /**
  8. * @author: Jili
  9. * @date: Created on 2022/4/20 15:54
  10. */
  11. @ChannelHandler.Sharable
  12. public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf> {
  13. @Override
  14. protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
  15. /**
  16. * 处理接受到的消息
  17. */
  18. System.out.println("客户端接受到的消息"+byteBuf.toString(CharsetUtil.UTF_8));
  19. }
  20. @Override
  21. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  22. /**
  23. * 处理异常
  24. */
  25. cause.printStackTrace();
  26. ctx.close();
  27. }
  28. }

输出结果
image.png

image.png

代码:
https://gitee.com/jili_lyj/nettystudy

拓展

真实案例

污水数采仪程序

plc数采仪发送报文通过tcp协议到程序,程序通过netty进行解析存放在数据库中
https://www.yuque.com/jilige/nh4eg4/er9azu