记快乐符号:
利用netty 进行设备通信 也可进行数据传输
搭建简易的 客户端与服务端通信
1、pom.xml引入
<_dependency>
<_groupId_>io.netty
<_artifactId_>netty-all
<_version_>4.1.36.Final
_
<_dependency>
<_groupId_>org.projectlombok
<_artifactId_>lombok
<_version_>1.16.10
_
2、创建消息处理器 - ( 客户端与服务端可共用一套代码 )
主要集成 ChannelInboundHandlerAdapter
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @description: NettyServerHandler-netty服务端处理器 <br>
* @since: 2021/12/24 10:09 上午 <br>
* @author: 释槐~ <br>
* @version: <br>
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/** 根据编码器不同数据类型也不一致 **/
if(msg instanceof String){
log.info("服务器收到消息: {}", msg.toString());
}else {
/** 默认传递bytebuf 数据 **/
ByteBuf buffer = (ByteBuf) msg;
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
log.info("服务器收到消息: {}", new String(bytes));
}
/** 可以定义接受到消息后返回 **/
// ctx.write("你也好哦");
ctx.flush();
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 定义心跳处理机制可根据业务进行设备的状态更新
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss z");
Date date = new Date(System.currentTimeMillis());
log.info(formatter.format(date));
log.info("已经10s没有发送消息给服务端");
/** 发送心跳消息,并在发送失败时关闭该连接 **/
// ctx.writeAndFlush("心跳").addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
�3、定义netty 服务初始化配置 、也可以定义自己的解析器
�
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
/**
* @description: ServerChannelInitializer- netty服务初始化器 并且可定义自己的解析器 <br>
* @since: 2021/12/24 10:09 上午 <br>
* @author: 释槐~ <br>
* @version: <br>
*/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/** 可以定义自己的编码器 **/
socketChannel.pipeline().addLast("decoder", new XlOverDelimiterBasedFrameDecoder(8192,Delimiters.lineDelimiter()));
// socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
/** 编码器 **/
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
/** 定义心跳等机制 此处定义后在心跳侧代码才会生效 **/
socketChannel.pipeline().addLast(new IdleStateHandler(3,10,4));
/** 添加消息处理器 **/
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
�
4、定义服务端启动服务
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @description: NettyServer <br>
* @since: 2021/12/24 10:10 上午 <br>
* @author: 释槐~ <br>
* @version: <br>
*/
@Slf4j
public class NettyServer {
/*
*服务端启动方法
**/
public void start(InetSocketAddress socketAddress) {
/** 定义主线程组 **/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
/** 定义工作线程组 **/
EventLoopGroup workGroup = new NioEventLoopGroup(200);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer())
.localAddress(socketAddress)
/** 设置队列大小 **/
.option(ChannelOption.SO_BACKLOG, 1024)
/** 加入心跳包
如果通信两端超过2个小时没有交换数据,那么开启keep-alive的一端会自动发一个keep-alive包给对端
如果对端正常的回应ACK包,那么一切都没问题,再等个2小时后发包(如果这两个小时仍然没有数据交换)
如果对端回应RST包,表明对端程序崩溃或重启,这边socket产生ECONNRESET错误,并且关闭
如果对端一直没回应,这边会每75秒再发包给对端,总共发8次共11分钟15秒。最后socket产生 ETIMEDOUT 错误,并且关闭。
或者收到ICMP错误,表明主机不可到达,则会产生 EHOSTUNREACH 错误
**/
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
/** 绑定端口,开始接收进来的连接 **/
ChannelFuture future = bootstrap.bind(socketAddress).sync();
log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/** 关闭主线程组 **/
bossGroup.shutdownGracefully();
/**关闭工作线程组 **/
workGroup.shutdownGracefully();
}
}
/*
*客户端启动方法
**/
public ChannelFuture startCustomer(InetSocketAddress socketAddress) {
/** 首先,netty通过ServerBootstrap启动服务端 **/
Bootstrap client = new Bootstrap();
/** 创建工作线程组 **/
EventLoopGroup workGroup = new NioEventLoopGroup(200);
client.group(workGroup);
/** 绑定客户端通道 **/
client.channel(NioSocketChannel.class);
/** 给NIoSocketChannel初始化handler, 处理读写事件 **/
client.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
/** 和服务端初始化一致 **/
ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(3,10,4));
ch.pipeline().addLast(new NettyServerHandler());
}
});
try {
ChannelFuture future = client.connect(socketAddress).sync();
log.info("服务器启动开始连接端口: {}", socketAddress.getPort());
return future;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// /** 将发送消息类返回出去后 如果关闭 工作线程组是否关闭未知: 关闭工作线程组**/
// workGroup.shutdownGracefully();
}
return null;
}
}
5、客户端或服务端启动代码例子
/** 启动服务端 **/
NettyServer nettyServer = new NettyServer();
nettyServer.start(new InetSocketAddress("127.0.0.1", 8090));
/** 启动客户端 **/
NettyServer nettyServer = new NettyServer();
customerChannelFuture = nettyServer.startCustomer(new InetSocketAddress("127.0.0.1", 8090));
customerChannelFuture.channel().writeAndFlush(str);
/** 发送消息 **/
customerChannelFuture.channel().writeAndFlush(str);
6、处理粘包、拆包个人见解
目前个人是重写DelimiterBasedFrameDecoder 以及 LineBasedFrameDecoder在控制最大长度的情况下对报文进行除特色字符外的 包头包尾 进行切割缓存 仅供参考未提供完整代码 主要重写一个方法
public class XlOverDelimiterBasedFrameDecoder extends ByteToMessageDecoder
/** 修改其中 的 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer)方法 **/
public class XlOverLineBasedFrameDecoder extends ByteToMessageDecoder
保证在进入消息处理器前为完整的一条数据; 当然也可以在 消息处理器来处理消息完整性
/**
* 功能描述:重写解析数据方法
* @param ctx - 传入类
* @param buffer - 传入数据
* @return : java.lang.Object
* @throws Exception:
* @author: 释槐~
* @date: 2021/2/1 17:18
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
try{
/*** 0x0D + 0x0A 回车换行 \r\n把光标置于下一行行首 ***/
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
log.info("【消息入口】 收到消息: {},{},{}" ,new String(bytes,CharsetUtil.UTF_8) , Integer.toHexString(buffer.getByte(3) & 0xFF) );
if( null == type || type.equals("")){
/** 此处判断报文头方法待多设备验证;目前已有设备验证成功但是使用mac电脑启动客户端发送数据无法进行验证;待研究 **/
if((bytes[0] == ( (byte) 0X78) ) && (bytes[1] == ( (byte) 0X78))){
/** 根据报文头判断 **/
type = "设备1";
}else{
type = "设备2";
}
}
if( type.equals("设备2") ){
/** 设备2消息处理方法 此处尾原始 解析方法 也就是特色字符分割 **/
ByteBuf beidouBuffer = Unpooled.buffer();
beidouBuffer.writeBytes(bytes);
return oldDecode(ctx,beidouBuffer);
}else if( type.equals("设备2") ){
// log.info("【消息类别】收到消息: {},协议号 : {} " ,ZbdUtil.bytesToHex(bytes) , Integer.toHexString(buffer.getByte(3) & 0xFF) );
/** 设备1消息处理方法 **/
loopBytesMsg(ctx,bytes);
}
}catch (Exception e){
e.printStackTrace();
log.info(e.getMessage());
}
return replyHandler();
}