实现目标

  • 客户端建立连接,发送消息-Netty rocks!
  • 服务器报告接收到消息,并将其回送给客户端
  • 客户端报告返回的消息并退出

所有的netty服务器都需要以下两个部分:

  1. 至少一个ChannelHandler,该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑
  2. 引导,配置服务器的启动代码。它会将服务器绑定到它要监听连接请求的端口上。

ChannelHandler和业务逻辑

ChannelHandler是一个接口族的父接口,它的实现负责接收并响应事件通知,在Netty应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。
因为Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler接口,用来定义响应入站事件的方法。这个应用程序只需要用到少量的方法,所以继承ChannelInboundHandlerAdapter类足够了,它提供了ChannelInboundHandler的默认实现。
在Echo中会用到的方法:

  • channelRead(),对于每个传入的消息都要调用
  • ChannelReadCompute(),通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取数据中最后一条消息
  • exceptionCaught(),在读取操作期间,在异常抛出时会调用
  1. package cn.linguo.netty.handler;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelFutureListener;
  5. import io.netty.channel.ChannelHandler.Sharable;
  6. import io.netty.util.CharsetUtil;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.ChannelInboundHandlerAdapter;
  9. /**
  10. * 标识一个ChannelHandler可以被多个Channel安全地共享
  11. *
  12. * @author linguo
  13. *
  14. */
  15. @Sharable
  16. public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  17. /**
  18. * 数据入站调用
  19. */
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. ByteBuf in = (ByteBuf) msg;
  23. // 将消息内容打印到控制台
  24. System.out.println("server receiver:" + in.toString(CharsetUtil.UTF_8));
  25. // 将接收到的消息写给发送者,而不冲刷出站消息
  26. ctx.write(in);
  27. }
  28. @Override
  29. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  30. // 将目前暂存于ChannelOutBoundBuffer中的消息(在下一次调用flush()或者writeAndFlush()方法时将会尝试写出到套接字)冲刷到远程节点,并且关闭该channel
  31. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
  32. }
  33. /*
  34. * 捕获异常
  35. */
  36. @Override
  37. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  38. cause.printStackTrace();
  39. // 关闭该channel
  40. ctx.close();
  41. }
  42. }

ChannerRead()和ChannelReadCompute()的区别:
channelRead表示接收消息,可以看到msg转换成了ByteBuf,然后打印,也就是把Client传过来的消息打印了一下,你会发现每次打印完后,channelReadComplete也会调用,如果你试着传一个超长的字符串过来,超过1024个字母长度,你会发现channelRead会调用多次,而channelReadComplete只调用一次。
因为ByteBuf是有长度限制的,所以超长了,就会多次读取,也就是调用多次channelRead,而channelReadComplete则是每条消息只会调用一次,无论你多长,分多少次读取,只在该条消息最后一次读取完成的时候调用,所以这段代码把关闭Channel的操作放在channelReadComplete里,放到channelRead里可能消息太长了,结果第一次读完就关掉连接了,后面的消息全丢了。

编写引导服务

引导服务器具体涉及以下内容:

  • 绑定到服务器将在其上监听并接收传入连接请求的端口
  • 配置Channel,以将有关的入站消息通知给EchoServerHandler实例

具体的代码示例:

  1. package cn.linguo.netty.server;
  2. import java.net.InetSocketAddress;
  3. import cn.linguo.netty.handler.EchoServerHandler;
  4. import io.netty.bootstrap.ServerBootstrap;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;
  11. /**
  12. * 引导服务器
  13. *
  14. * @author linguo
  15. *
  16. */
  17. public class EchoServer {
  18. private final int port;
  19. public EchoServer(int port) {
  20. this.port = port;
  21. }
  22. public void start() throws InterruptedException {
  23. final EchoServerHandler serverHandler = new EchoServerHandler();
  24. // 创建EventLoopGroup
  25. EventLoopGroup group = new NioEventLoopGroup();
  26. try {
  27. // 创建ServerBootstrap
  28. ServerBootstrap b = new ServerBootstrap();
  29. b.group(group).channel(NioServerSocketChannel.class) // 指定所使用的的NIO传输Channel
  30. .localAddress(new InetSocketAddress(port)) // 使用指定的端口设置套接字地址
  31. // 添加一个EchoServerHandler到子Channel的ChannelPipeline
  32. .childHandler(new ChannelInitializer<SocketChannel>() {
  33. @Override
  34. protected void initChannel(SocketChannel ch) throws Exception {
  35. // EchoServerHandler被标注为@Sharaable,所以我们可以总是使用同样的实例
  36. // 这里对于所有的客户端连接来说,都会使用同一个EchoServerHandler,因为其被标注为@Sharable。
  37. ch.pipeline().addLast(serverHandler);
  38. }
  39. });
  40. // 异步地绑定服务器;调用sync()方法阻塞等待知道绑定完成
  41. ChannelFuture f = b.bind().sync();
  42. // 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
  43. f.channel().closeFuture().sync();
  44. } finally {
  45. // 关闭EventLoopGroup,释放所有资源
  46. group.shutdownGracefully().sync();
  47. }
  48. }
  49. public static void main(String[] args) throws Exception {
  50. if (args.length != 1) {
  51. System.out.println("Usage:" + EchoServer.class.getSimpleName() + "<port>");
  52. return;
  53. }
  54. // 设置端口置(如果端口参数不正确,抛出转换异常)
  55. int port = Integer.parseInt(args[0]);
  56. // 调用服务器的start()方法
  57. new EchoServer(port).start();
  58. }
  59. }

主要步骤回顾

  • EchoServerHandler实现了业务逻辑
  • main()方法引导了服务器

引导过程中所需要的步骤如下:

  • 创建一个ServerBootstrap的实例以引导和绑定服务器
  • 创建并分配一个NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读写数据
  • 指定服务器绑定的本地InetSocketAddress
  • 使用一个EchoServerHandler的实例初始化每一个新的Channel
  • 调用ServerBootstrap.bind()方法以绑定服务器