记快乐符号:
    利用netty 进行设备通信 也可进行数据传输
    搭建简易的 客户端与服务端通信

    1、pom.xml引入

    <_dependency>
    <_groupId_>io.netty
    <_artifactId_>netty-all
    <_version_>4.1.36.Final
    _


    <_dependency>
    <_groupId_>org.projectlombok
    <_artifactId_>lombok
    <_version_>1.16.10
    _
    2、创建消息处理器 - ( 客户端与服务端可共用一套代码 )
    主要集成 ChannelInboundHandlerAdapter

    1. import io.netty.buffer.ByteBuf;
    2. import io.netty.channel.ChannelHandlerContext;
    3. import io.netty.channel.ChannelInboundHandlerAdapter;
    4. import io.netty.handler.timeout.IdleState;
    5. import io.netty.handler.timeout.IdleStateEvent;
    6. import lombok.extern.slf4j.Slf4j;
    7. import java.text.SimpleDateFormat;
    8. import java.util.Date;
    9. /**
    10. * @description: NettyServerHandler-netty服务端处理器 <br>
    11. * @since: 2021/12/24 10:09 上午 <br>
    12. * @author: 释槐~ <br>
    13. * @version: <br>
    14. */
    15. @Slf4j
    16. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    17. /**
    18. * 客户端连接会触发
    19. */
    20. @Override
    21. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    22. log.info("Channel active......");
    23. }
    24. /**
    25. * 客户端发消息会触发
    26. */
    27. @Override
    28. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    29. /** 根据编码器不同数据类型也不一致 **/
    30. if(msg instanceof String){
    31. log.info("服务器收到消息: {}", msg.toString());
    32. }else {
    33. /** 默认传递bytebuf 数据 **/
    34. ByteBuf buffer = (ByteBuf) msg;
    35. byte[] bytes = new byte[buffer.readableBytes()];
    36. buffer.readBytes(bytes);
    37. log.info("服务器收到消息: {}", new String(bytes));
    38. }
    39. /** 可以定义接受到消息后返回 **/
    40. // ctx.write("你也好哦");
    41. ctx.flush();
    42. }
    43. /**
    44. * 发生异常触发
    45. */
    46. @Override
    47. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    48. cause.printStackTrace();
    49. ctx.close();
    50. }
    51. /**
    52. * 定义心跳处理机制可根据业务进行设备的状态更新
    53. */
    54. @Override
    55. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    56. if (evt instanceof IdleStateEvent) {
    57. IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
    58. if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
    59. SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss z");
    60. Date date = new Date(System.currentTimeMillis());
    61. log.info(formatter.format(date));
    62. log.info("已经10s没有发送消息给服务端");
    63. /** 发送心跳消息,并在发送失败时关闭该连接 **/
    64. // ctx.writeAndFlush("心跳").addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    65. }
    66. } else {
    67. super.userEventTriggered(ctx, evt);
    68. }
    69. }
    70. }

    �3、定义netty 服务初始化配置 、也可以定义自己的解析器

    1. import io.netty.channel.ChannelInitializer;
    2. import io.netty.channel.socket.SocketChannel;
    3. import io.netty.handler.codec.Delimiters;
    4. import io.netty.handler.codec.string.StringEncoder;
    5. import io.netty.handler.timeout.IdleStateHandler;
    6. import io.netty.util.CharsetUtil;
    7. /**
    8. * @description: ServerChannelInitializer- netty服务初始化器 并且可定义自己的解析器 <br>
    9. * @since: 2021/12/24 10:09 上午 <br>
    10. * @author: 释槐~ <br>
    11. * @version: <br>
    12. */
    13. public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    14. @Override
    15. protected void initChannel(SocketChannel socketChannel) throws Exception {
    16. /** 可以定义自己的编码器 **/
    17. socketChannel.pipeline().addLast("decoder", new XlOverDelimiterBasedFrameDecoder(8192,Delimiters.lineDelimiter()));
    18. // socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
    19. /** 编码器 **/
    20. socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
    21. /** 定义心跳等机制 此处定义后在心跳侧代码才会生效 **/
    22. socketChannel.pipeline().addLast(new IdleStateHandler(3,10,4));
    23. /** 添加消息处理器 **/
    24. socketChannel.pipeline().addLast(new NettyServerHandler());
    25. }
    26. }


    4、定义服务端启动服务

    1. import io.netty.bootstrap.Bootstrap;
    2. import io.netty.bootstrap.ServerBootstrap;
    3. import io.netty.channel.ChannelFuture;
    4. import io.netty.channel.ChannelInitializer;
    5. import io.netty.channel.ChannelOption;
    6. import io.netty.channel.EventLoopGroup;
    7. import io.netty.channel.nio.NioEventLoopGroup;
    8. import io.netty.channel.socket.nio.NioServerSocketChannel;
    9. import io.netty.channel.socket.nio.NioSocketChannel;
    10. import io.netty.handler.codec.string.StringDecoder;
    11. import io.netty.handler.codec.string.StringEncoder;
    12. import io.netty.handler.timeout.IdleStateHandler;
    13. import io.netty.util.CharsetUtil;
    14. import lombok.extern.slf4j.Slf4j;
    15. import java.net.InetSocketAddress;
    16. /**
    17. * @description: NettyServer <br>
    18. * @since: 2021/12/24 10:10 上午 <br>
    19. * @author: 释槐~ <br>
    20. * @version: <br>
    21. */
    22. @Slf4j
    23. public class NettyServer {
    24. /*
    25. *服务端启动方法
    26. **/
    27. public void start(InetSocketAddress socketAddress) {
    28. /** 定义主线程组 **/
    29. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    30. /** 定义工作线程组 **/
    31. EventLoopGroup workGroup = new NioEventLoopGroup(200);
    32. ServerBootstrap bootstrap = new ServerBootstrap()
    33. .group(bossGroup, workGroup)
    34. .channel(NioServerSocketChannel.class)
    35. .childHandler(new ServerChannelInitializer())
    36. .localAddress(socketAddress)
    37. /** 设置队列大小 **/
    38. .option(ChannelOption.SO_BACKLOG, 1024)
    39. /** 加入心跳包
    40. 如果通信两端超过2个小时没有交换数据,那么开启keep-alive的一端会自动发一个keep-alive包给对端
    41. 如果对端正常的回应ACK包,那么一切都没问题,再等个2小时后发包(如果这两个小时仍然没有数据交换)
    42. 如果对端回应RST包,表明对端程序崩溃或重启,这边socket产生ECONNRESET错误,并且关闭
    43. 如果对端一直没回应,这边会每75秒再发包给对端,总共发8次共11分钟15秒。最后socket产生 ETIMEDOUT 错误,并且关闭。
    44. 或者收到ICMP错误,表明主机不可到达,则会产生 EHOSTUNREACH 错误
    45. **/
    46. .childOption(ChannelOption.SO_KEEPALIVE, true);
    47. try {
    48. /** 绑定端口,开始接收进来的连接 **/
    49. ChannelFuture future = bootstrap.bind(socketAddress).sync();
    50. log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
    51. future.channel().closeFuture().sync();
    52. } catch (InterruptedException e) {
    53. e.printStackTrace();
    54. } finally {
    55. /** 关闭主线程组 **/
    56. bossGroup.shutdownGracefully();
    57. /**关闭工作线程组 **/
    58. workGroup.shutdownGracefully();
    59. }
    60. }
    61. /*
    62. *客户端启动方法
    63. **/
    64. public ChannelFuture startCustomer(InetSocketAddress socketAddress) {
    65. /** 首先,netty通过ServerBootstrap启动服务端 **/
    66. Bootstrap client = new Bootstrap();
    67. /** 创建工作线程组 **/
    68. EventLoopGroup workGroup = new NioEventLoopGroup(200);
    69. client.group(workGroup);
    70. /** 绑定客户端通道 **/
    71. client.channel(NioSocketChannel.class);
    72. /** 给NIoSocketChannel初始化handler, 处理读写事件 **/
    73. client.handler(new ChannelInitializer<NioSocketChannel>() {
    74. @Override
    75. protected void initChannel(NioSocketChannel ch) throws Exception {
    76. /** 和服务端初始化一致 **/
    77. ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
    78. ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
    79. ch.pipeline().addLast(new IdleStateHandler(3,10,4));
    80. ch.pipeline().addLast(new NettyServerHandler());
    81. }
    82. });
    83. try {
    84. ChannelFuture future = client.connect(socketAddress).sync();
    85. log.info("服务器启动开始连接端口: {}", socketAddress.getPort());
    86. return future;
    87. } catch (InterruptedException e) {
    88. e.printStackTrace();
    89. } finally {
    90. // /** 将发送消息类返回出去后 如果关闭 工作线程组是否关闭未知: 关闭工作线程组**/
    91. // workGroup.shutdownGracefully();
    92. }
    93. return null;
    94. }
    95. }

    5、客户端或服务端启动代码例子

    1. /** 启动服务端 **/
    2. NettyServer nettyServer = new NettyServer();
    3. nettyServer.start(new InetSocketAddress("127.0.0.1", 8090));
    4. /** 启动客户端 **/
    5. NettyServer nettyServer = new NettyServer();
    6. customerChannelFuture = nettyServer.startCustomer(new InetSocketAddress("127.0.0.1", 8090));
    7. customerChannelFuture.channel().writeAndFlush(str);
    8. /** 发送消息 **/
    9. customerChannelFuture.channel().writeAndFlush(str);

    6、处理粘包、拆包个人见解
    目前个人是重写DelimiterBasedFrameDecoder 以及 LineBasedFrameDecoder在控制最大长度的情况下对报文进行除特色字符外的 包头包尾 进行切割缓存 仅供参考未提供完整代码 主要重写一个方法

    1. public class XlOverDelimiterBasedFrameDecoder extends ByteToMessageDecoder
    2. /** 修改其中 的 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer)方法 **/
    3. public class XlOverLineBasedFrameDecoder extends ByteToMessageDecoder

    保证在进入消息处理器前为完整的一条数据; 当然也可以在 消息处理器来处理消息完整性

    1. /**
    2. * 功能描述:重写解析数据方法
    3. * @param ctx - 传入类
    4. * @param buffer - 传入数据
    5. * @return : java.lang.Object
    6. * @throws Exception:
    7. * @author: 释槐~
    8. * @date: 2021/2/1 17:18
    9. */
    10. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    11. try{
    12. /*** 0x0D + 0x0A 回车换行 \r\n把光标置于下一行行首 ***/
    13. byte[] bytes = new byte[buffer.readableBytes()];
    14. buffer.readBytes(bytes);
    15. log.info("【消息入口】 收到消息: {},{},{}" ,new String(bytes,CharsetUtil.UTF_8) , Integer.toHexString(buffer.getByte(3) & 0xFF) );
    16. if( null == type || type.equals("")){
    17. /** 此处判断报文头方法待多设备验证;目前已有设备验证成功但是使用mac电脑启动客户端发送数据无法进行验证;待研究 **/
    18. if((bytes[0] == ( (byte) 0X78) ) && (bytes[1] == ( (byte) 0X78))){
    19. /** 根据报文头判断 **/
    20. type = "设备1";
    21. }else{
    22. type = "设备2";
    23. }
    24. }
    25. if( type.equals("设备2") ){
    26. /** 设备2消息处理方法 此处尾原始 解析方法 也就是特色字符分割 **/
    27. ByteBuf beidouBuffer = Unpooled.buffer();
    28. beidouBuffer.writeBytes(bytes);
    29. return oldDecode(ctx,beidouBuffer);
    30. }else if( type.equals("设备2") ){
    31. // log.info("【消息类别】收到消息: {},协议号 : {} " ,ZbdUtil.bytesToHex(bytes) , Integer.toHexString(buffer.getByte(3) & 0xFF) );
    32. /** 设备1消息处理方法 **/
    33. loopBytesMsg(ctx,bytes);
    34. }
    35. }catch (Exception e){
    36. e.printStackTrace();
    37. log.info(e.getMessage());
    38. }
    39. return replyHandler();
    40. }