记快乐符号:
利用netty 进行设备通信 也可进行数据传输
搭建简易的 客户端与服务端通信
1、pom.xml引入
<_dependency>
<_groupId_>io.netty
<_artifactId_>netty-all
<_version_>4.1.36.Final
_
<_dependency>
<_groupId_>org.projectlombok
<_artifactId_>lombok
<_version_>1.16.10
_
2、创建消息处理器 - ( 客户端与服务端可共用一套代码 )
主要集成 ChannelInboundHandlerAdapter
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;import java.util.Date;/*** @description: NettyServerHandler-netty服务端处理器 <br>* @since: 2021/12/24 10:09 上午 <br>* @author: 释槐~ <br>* @version: <br>*/@Slf4jpublic class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 客户端连接会触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("Channel active......");}/*** 客户端发消息会触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/** 根据编码器不同数据类型也不一致 **/if(msg instanceof String){log.info("服务器收到消息: {}", msg.toString());}else {/** 默认传递bytebuf 数据 **/ByteBuf buffer = (ByteBuf) msg;byte[] bytes = new byte[buffer.readableBytes()];buffer.readBytes(bytes);log.info("服务器收到消息: {}", new String(bytes));}/** 可以定义接受到消息后返回 **/// ctx.write("你也好哦");ctx.flush();}/*** 发生异常触发*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/*** 定义心跳处理机制可根据业务进行设备的状态更新*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.WRITER_IDLE) {SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss z");Date date = new Date(System.currentTimeMillis());log.info(formatter.format(date));log.info("已经10s没有发送消息给服务端");/** 发送心跳消息,并在发送失败时关闭该连接 **/// ctx.writeAndFlush("心跳").addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}}
�3、定义netty 服务初始化配置 、也可以定义自己的解析器
�
import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.Delimiters;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.timeout.IdleStateHandler;import io.netty.util.CharsetUtil;/*** @description: ServerChannelInitializer- netty服务初始化器 并且可定义自己的解析器 <br>* @since: 2021/12/24 10:09 上午 <br>* @author: 释槐~ <br>* @version: <br>*/public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {/** 可以定义自己的编码器 **/socketChannel.pipeline().addLast("decoder", new XlOverDelimiterBasedFrameDecoder(8192,Delimiters.lineDelimiter()));// socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));/** 编码器 **/socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));/** 定义心跳等机制 此处定义后在心跳侧代码才会生效 **/socketChannel.pipeline().addLast(new IdleStateHandler(3,10,4));/** 添加消息处理器 **/socketChannel.pipeline().addLast(new NettyServerHandler());}}
�
4、定义服务端启动服务
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.timeout.IdleStateHandler;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;/*** @description: NettyServer <br>* @since: 2021/12/24 10:10 上午 <br>* @author: 释槐~ <br>* @version: <br>*/@Slf4jpublic class NettyServer {/**服务端启动方法**/public void start(InetSocketAddress socketAddress) {/** 定义主线程组 **/EventLoopGroup bossGroup = new NioEventLoopGroup(1);/** 定义工作线程组 **/EventLoopGroup workGroup = new NioEventLoopGroup(200);ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ServerChannelInitializer()).localAddress(socketAddress)/** 设置队列大小 **/.option(ChannelOption.SO_BACKLOG, 1024)/** 加入心跳包如果通信两端超过2个小时没有交换数据,那么开启keep-alive的一端会自动发一个keep-alive包给对端如果对端正常的回应ACK包,那么一切都没问题,再等个2小时后发包(如果这两个小时仍然没有数据交换)如果对端回应RST包,表明对端程序崩溃或重启,这边socket产生ECONNRESET错误,并且关闭如果对端一直没回应,这边会每75秒再发包给对端,总共发8次共11分钟15秒。最后socket产生 ETIMEDOUT 错误,并且关闭。或者收到ICMP错误,表明主机不可到达,则会产生 EHOSTUNREACH 错误**/.childOption(ChannelOption.SO_KEEPALIVE, true);try {/** 绑定端口,开始接收进来的连接 **/ChannelFuture future = bootstrap.bind(socketAddress).sync();log.info("服务器启动开始监听端口: {}", socketAddress.getPort());future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {/** 关闭主线程组 **/bossGroup.shutdownGracefully();/**关闭工作线程组 **/workGroup.shutdownGracefully();}}/**客户端启动方法**/public ChannelFuture startCustomer(InetSocketAddress socketAddress) {/** 首先,netty通过ServerBootstrap启动服务端 **/Bootstrap client = new Bootstrap();/** 创建工作线程组 **/EventLoopGroup workGroup = new NioEventLoopGroup(200);client.group(workGroup);/** 绑定客户端通道 **/client.channel(NioSocketChannel.class);/** 给NIoSocketChannel初始化handler, 处理读写事件 **/client.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {/** 和服务端初始化一致 **/ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));ch.pipeline().addLast(new IdleStateHandler(3,10,4));ch.pipeline().addLast(new NettyServerHandler());}});try {ChannelFuture future = client.connect(socketAddress).sync();log.info("服务器启动开始连接端口: {}", socketAddress.getPort());return future;} catch (InterruptedException e) {e.printStackTrace();} finally {// /** 将发送消息类返回出去后 如果关闭 工作线程组是否关闭未知: 关闭工作线程组**/// workGroup.shutdownGracefully();}return null;}}
5、客户端或服务端启动代码例子
/** 启动服务端 **/NettyServer nettyServer = new NettyServer();nettyServer.start(new InetSocketAddress("127.0.0.1", 8090));/** 启动客户端 **/NettyServer nettyServer = new NettyServer();customerChannelFuture = nettyServer.startCustomer(new InetSocketAddress("127.0.0.1", 8090));customerChannelFuture.channel().writeAndFlush(str);/** 发送消息 **/customerChannelFuture.channel().writeAndFlush(str);
6、处理粘包、拆包个人见解
目前个人是重写DelimiterBasedFrameDecoder 以及 LineBasedFrameDecoder在控制最大长度的情况下对报文进行除特色字符外的 包头包尾 进行切割缓存 仅供参考未提供完整代码 主要重写一个方法
public class XlOverDelimiterBasedFrameDecoder extends ByteToMessageDecoder/** 修改其中 的 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer)方法 **/public class XlOverLineBasedFrameDecoder extends ByteToMessageDecoder
保证在进入消息处理器前为完整的一条数据; 当然也可以在 消息处理器来处理消息完整性
/*** 功能描述:重写解析数据方法* @param ctx - 传入类* @param buffer - 传入数据* @return : java.lang.Object* @throws Exception:* @author: 释槐~* @date: 2021/2/1 17:18*/protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {try{/*** 0x0D + 0x0A 回车换行 \r\n把光标置于下一行行首 ***/byte[] bytes = new byte[buffer.readableBytes()];buffer.readBytes(bytes);log.info("【消息入口】 收到消息: {},{},{}" ,new String(bytes,CharsetUtil.UTF_8) , Integer.toHexString(buffer.getByte(3) & 0xFF) );if( null == type || type.equals("")){/** 此处判断报文头方法待多设备验证;目前已有设备验证成功但是使用mac电脑启动客户端发送数据无法进行验证;待研究 **/if((bytes[0] == ( (byte) 0X78) ) && (bytes[1] == ( (byte) 0X78))){/** 根据报文头判断 **/type = "设备1";}else{type = "设备2";}}if( type.equals("设备2") ){/** 设备2消息处理方法 此处尾原始 解析方法 也就是特色字符分割 **/ByteBuf beidouBuffer = Unpooled.buffer();beidouBuffer.writeBytes(bytes);return oldDecode(ctx,beidouBuffer);}else if( type.equals("设备2") ){// log.info("【消息类别】收到消息: {},协议号 : {} " ,ZbdUtil.bytesToHex(bytes) , Integer.toHexString(buffer.getByte(3) & 0xFF) );/** 设备1消息处理方法 **/loopBytesMsg(ctx,bytes);}}catch (Exception e){e.printStackTrace();log.info(e.getMessage());}return replyHandler();}
