TCP服务
server端
public class NettyServer { public static void main(String[] args) throws Exception { //创建BossGroup和WorkerGroup /* 1.创建两个线程组bossGroup和workerGroup 2.bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给workerGroup 3.两个都是无线循环 4.bossGroup 和 workerGroup 的构造器参数为含有的子线程(NioEventLoop)的个数--默认大小为CPU核数*2 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器通道 .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列等待连接的个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象 //给pipeline设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerHandler()); } }); //给我们的workerGroup的某一个 EventLoop设置对应的处理器 System.out.println("服务器准备好了"); //绑定一个端口并且同步 //启动服务器 ChannelFuture cf = bootstrap.bind(6668).sync(); //对关闭通道进行监听 cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
serverHandler处理器
/*我们定义的Handler需要继承netty规定好的某个HandlerAdapter */public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据的事件(可以读取客户端发送的消息) /* ChannelHandlerContext:上下文对象,含有管道pipeline,通道channel,地址 Object msg:就是客户端发送的数据,默认为Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程" + Thread.currentThread().getName()); System.out.println("server txt = " + ctx); System.out.println("channel和pipeline的关系:"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //底层是一个双向链表 //将msg转成一个ByteBuf //ByteBuf是由Netty提供的 ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是" + buf.toString(StandardCharsets.UTF_8)); System.out.println("客户端地址" + ctx.channel().remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //把数据写入缓冲区并刷新 ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端", StandardCharsets.UTF_8)); } //异常处理,一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}
client端
public class NettyClient { public static void main(String[] args) throws Exception { //需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建启动对象 //主要客户端使用的不是ServerBootStrap而是BootStrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) //设置客户端通道的实现类(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //加入自定义的处理器 } }); System.out.println("客户端准备好了"); //启动客户端去连接服务端 //关于ChannelFuture后面分析,设计到Netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //给关闭通道增加一个监听 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }}
clientHandler处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪时就会出发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server ", StandardCharsets.UTF_8)); } //当通道有读取事件时会出发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息" + buf.toString(StandardCharsets.UTF_8)); System.out.println("服务器地址=" + ctx.channel().remoteAddress()); } //异常处理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}