TCP服务

server端
  1. public class NettyServer {
  2. public static void main(String[] args) throws Exception {
  3. //创建BossGroup和WorkerGroup
  4. /*
  5. 1.创建两个线程组bossGroup和workerGroup
  6. 2.bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给workerGroup
  7. 3.两个都是无线循环
  8. 4.bossGroup 和 workerGroup 的构造器参数为含有的子线程(NioEventLoop)的个数--默认大小为CPU核数*2
  9. */
  10. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  11. EventLoopGroup workerGroup = new NioEventLoopGroup();
  12. try {
  13. //创建服务器端的启动对象,配置参数
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. //使用链式编程来进行设置
  16. bootstrap.group(bossGroup, workerGroup) //设置两个线程组
  17. .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器通道
  18. .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列等待连接的个数
  19. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
  20. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  21. //给pipeline设置处理器
  22. @Override
  23. protected void initChannel(SocketChannel ch) throws Exception {
  24. ch.pipeline().addLast(new NettyServerHandler());
  25. }
  26. }); //给我们的workerGroup的某一个 EventLoop设置对应的处理器
  27. System.out.println("服务器准备好了");
  28. //绑定一个端口并且同步
  29. //启动服务器
  30. ChannelFuture cf = bootstrap.bind(6668).sync();
  31. //对关闭通道进行监听
  32. cf.channel().closeFuture().sync();
  33. } finally {
  34. bossGroup.shutdownGracefully();
  35. workerGroup.shutdownGracefully();
  36. }
  37. }
  38. }

serverHandler处理器
  1. /*
  2. 我们定义的Handler需要继承netty规定好的某个HandlerAdapter
  3. */
  4. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  5. //读取数据的事件(可以读取客户端发送的消息)
  6. /*
  7. ChannelHandlerContext:上下文对象,含有管道pipeline,通道channel,地址
  8. Object msg:就是客户端发送的数据,默认为Object
  9. */
  10. @Override
  11. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  12. System.out.println("服务器读取线程" + Thread.currentThread().getName());
  13. System.out.println("server txt = " + ctx);
  14. System.out.println("channel和pipeline的关系:");
  15. Channel channel = ctx.channel();
  16. ChannelPipeline pipeline = ctx.pipeline(); //底层是一个双向链表
  17. //将msg转成一个ByteBuf
  18. //ByteBuf是由Netty提供的
  19. ByteBuf buf = (ByteBuf) msg;
  20. System.out.println("客户端发送消息是" + buf.toString(StandardCharsets.UTF_8));
  21. System.out.println("客户端地址" + ctx.channel().remoteAddress());
  22. }
  23. //数据读取完毕
  24. @Override
  25. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  26. //把数据写入缓冲区并刷新
  27. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端", StandardCharsets.UTF_8));
  28. }
  29. //异常处理,一般是需要关闭通道
  30. @Override
  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  32. ctx.close();
  33. }
  34. }

client端
  1. public class NettyClient {
  2. public static void main(String[] args) throws Exception {
  3. //需要一个事件循环组
  4. EventLoopGroup group = new NioEventLoopGroup();
  5. try {
  6. //创建启动对象
  7. //主要客户端使用的不是ServerBootStrap而是BootStrap
  8. Bootstrap bootstrap = new Bootstrap();
  9. //设置相关参数
  10. bootstrap.group(group) //设置线程组
  11. .channel(NioSocketChannel.class) //设置客户端通道的实现类(反射)
  12. .handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ch.pipeline().addLast(new NettyClientHandler()); //加入自定义的处理器
  16. }
  17. });
  18. System.out.println("客户端准备好了");
  19. //启动客户端去连接服务端
  20. //关于ChannelFuture后面分析,设计到Netty的异步模型
  21. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
  22. //给关闭通道增加一个监听
  23. channelFuture.channel().closeFuture().sync();
  24. } finally {
  25. group.shutdownGracefully();
  26. }
  27. }
  28. }

clientHandler处理器
  1. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  2. //当通道就绪时就会出发该方法
  3. @Override
  4. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  5. System.out.println("client " + ctx);
  6. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server ", StandardCharsets.UTF_8));
  7. }
  8. //当通道有读取事件时会出发
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  11. ByteBuf buf = (ByteBuf) msg;
  12. System.out.println("服务器回复的消息" + buf.toString(StandardCharsets.UTF_8));
  13. System.out.println("服务器地址=" + ctx.channel().remoteAddress());
  14. }
  15. //异常处理
  16. @Override
  17. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  18. cause.printStackTrace();
  19. ctx.close();
  20. }
  21. }