实现简单的服务端与客户端的通信的小demo
服务端
服务端启动类
package com.jili.helloword.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* @author: Jili
* @date: Created on 2022/4/22 15:52
* 服务器端启动类
*/
public class AppServerHello {
/** netty的Reactor线程池,初始化一个NioEventLoop数组,用来处理I/O
* boss 线程组用于处理连接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 线程组用于数据处理
*/
private EventLoopGroup work = new NioEventLoopGroup();
private int port;
public AppServerHello(int port){
this.port=port;
}
public void run() throws Exception{
try{
//启动NIO服务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss)
//通过工厂方法设计模式实例化channel
.channel(NioServerSocketChannel.class)
//设置端口
.localAddress(new InetSocketAddress(port))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
//ChannelInitializer是一个特殊的类,他的目的是榜示使用者配置一个新的Channel,用于把许多自定义的处理类增加带pipline上来
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在这里配置具体数据接收方法的处理
socketChannel.pipeline().addLast(new HandlerServerHello());
}
});
//绑定服务器,该实力将提供有关的io操作结果或状态信息
ChannelFuture channelFuture = bootstrap.bind().sync();
System.out.println("在"+channelFuture.channel().localAddress()+"startup");
//阻塞操作,closeFuture()开启一个channel的监听器,直到链路断开
channelFuture.channel().closeFuture().sync();
}finally {
//关闭EventLoopGroup并释放所有资源,包括所有创建线程
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new AppServerHello(18080).run();
}
}
服务端处理类
package com.jili.helloword.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author: Jili
* @date: Created on 2022/4/20 16:59
*/
@ChannelHandler.Sharable
public class HandlerServerHello extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//处理收到消息数据,并反馈给客户端
ByteBuf in = (ByteBuf)msg;
System.out.println("收到客户端发来的消息:"+in.toString(CharsetUtil.UTF_8));
//写入并发消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,收到消息!",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//出现异常的时候执行(打印并关闭通道)
cause.printStackTrace();
ctx.close();
}
}
客户端
客户端启动类
package com.jili.helloword.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.sctp.nio.NioSctpChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
/**
* @author: Jili
* @date: Created on 2022/4/20 16:03
* 客户端启动类
*/
public class AppClientHello {
private final String host;
private final int port;
public AppClientHello(String host,int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
/**
* 配置相应的参数,提供连接到远端
*/
EventLoopGroup group = new NioEventLoopGroup();//io线程池
EventLoopGroup work = new NioEventLoopGroup();
try {
Bootstrap bs = new Bootstrap();//启动辅助对象
bs.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new ChannelInitializer<SocketChannel>() { //进行通道初始化配置
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new HandlerClientHello()); //添加自定义handler
}
});
//连接到远程节点,等待连接完成
ChannelFuture future = bs.connect().sync();
//发送消息到服务器
future.channel().writeAndFlush(Unpooled.copiedBuffer("发送消息:hello world", CharsetUtil.UTF_8));
//阻塞操作,closeFuture开启一个channel的监听器(这期间channel进行各种工作),直到链路断开
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception{
new AppClientHello("127.0.0.1",18080).run();
}
}
客户端处理类
package com.jili.helloword.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author: Jili
* @date: Created on 2022/4/20 15:54
*/
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
/**
* 处理接受到的消息
*/
System.out.println("客户端接受到的消息"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
/**
* 处理异常
*/
cause.printStackTrace();
ctx.close();
}
}
输出结果
代码:
https://gitee.com/jili_lyj/nettystudy
拓展
真实案例
污水数采仪程序
plc数采仪发送报文通过tcp协议到程序,程序通过netty进行解析存放在数据库中
https://www.yuque.com/jilige/nh4eg4/er9azu