@Slf4jpublic class EchoServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup(),new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; log.info(ch.remoteAddress() + "消息:" + byteBuf.toString(Charset.defaultCharset())); String serverMsg = "服务端已接收到" + ch.remoteAddress() + "消息:" + byteBuf.toString(Charset.defaultCharset()); ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(serverMsg.getBytes(StandardCharsets.UTF_8))) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(!channelFuture.isSuccess()){ log.info("服务端消息发送失败: 客户端信息--- {} ,消息内容--- {}",ch.remoteAddress() ,serverMsg); } } }); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info(ctx.channel().remoteAddress() + "客户端连接成功"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info(ctx.channel().remoteAddress() + "客户端断开连接"); super.channelUnregistered(ctx); } }); } }).bind(8989); }}
@Slf4jpublic class EchoClient { public static void main(String[] args) throws InterruptedException { final Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客户端接收消息:" + ch.remoteAddress() + "消息:" + byteBuf.toString(Charset.defaultCharset())); byteBuf.release(); } }).addLast(new StringEncoder()); } }).connect(new InetSocketAddress("127.0.0.1", 8989)).sync().channel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String str = scanner.next(); if(!"quit".equals(str)){ channel.writeAndFlush(str); }else { channel.close().sync(); log.info("客户端断开"); } } }}