第3章 Netty入门应用


作为Netty的第一个应用程序,我们依然以第2章的时间服务器为例进行开发,通过Netty版本的时间服务器的开发,让初学者尽快学到如何搭建Netty开发环境和运行Netty应用程序。
如果你已经熟悉Netty的基础应用,可以跳过本章,继续后面知识的学习。
本章主要内容包括:
Netty开发环境的搭建
服务端程序TimeServer开发
客户端程序TimeClient开发
时间服务器的运行和调试

3.1 Netty开发环境的搭建

首先假设你已经在本机安装了JDK1.7,配置了JDK的环境变量path,同时下载并正确启动了IDE工具Eclipse。如果你是个Java初学者,从来没有在本机搭建过Java开发环境,建议你先选择一本Java基础入门的书籍或者课程进行学习。

假如你习惯于使用其他IDE工具进行Java开发,例如NetBeans IDE,也可以运行本节的入门例程。但是,你需要根据自己实际使用的IDE进行对应的配置修改和调整,本书统一使用eclipse-jee-kepler-SR1-win32作为Java开发工具。 下面我们开始学习如何搭建Netty的开发环境。

3.1.1 下载Netty的软件包

访问Netty的官网http://netty.io/,从【Downloads】标签页选择下载5.0.0.Alpha1安装包,安装包不大,8.95M左右,下载之后的安装包如图3-1所示。
图3-1 Netty 5.0压缩包
通过解压缩工具打开压缩包,目录如图3-2所示。
图3-2 Netty 5.0压缩包内部目录 这时会发现里面包含了各个模块的.jar包和源码,由于我们直接以二进制类库的方式使用Netty,所以只需要获取netty-all-5.0.0.Alpha1.jar即可。

3.1.2 搭建Netty应用工程

使用Eclipse创建普通的Java工程,同时创建Java源文件的package,如图3-3所示。
图3-3 Netty应用工程
创建第三方类库存放文件夹lib,同时将netty-all-5.0.0.Alpha1.jar复制到lib目录下,如图3-4所示。 图3-4 配置引用的netty jar包 右键单击netty-all-5.0.0.Alpha1.jar,在弹出的菜单中,选择将.jar包添加到Build Path中,操作如图3-5所示。

图3-5 将Netty.jar包添加到ClassPath中 到此结束,我们的Netty应用开发环境已经搭建完成,下面的小节将演示如何基于Netty开发时间服务器程序。

3.2 Netty服务端开发

作为第一个Netty的应用例程,为了让读者能够将精力集中在Netty的使用上,我们依然选择第2章的时间服务器为例进行源码开发和代码讲解。

TimeServer开发

在开始使用Netty开发TimeServer之前,先回顾一下使用NIO进行服务端开发的步骤。
(1)创建ServerSocketChannel,配置它为非阻塞模式;
(2)绑定监听,配置TCP参数,例如backlog大小;
(3)创建一个独立的I/O线程,用于轮询多路复用器Selector;
(4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT; (5)启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
(6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;
(7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数;
(8)将SocketChannel注册到Selector,监听OP_READ操作位;
(9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
(10)如果轮询的Channel为OP_WRITE,说明还有数据没有发送完成,需要继续发送。

一个简单的NIO服务端程序,如果我们直接使用JDK的NIO类库进行开发,竟然需要经过烦琐的十多步操作才能完成最基本的消息读取和发送,这也是我们要选择Netty等NIO框架的原因了,下面我们看看使用Netty是如何轻松搞定服务端开发的。 代码清单
3-1 Netty时间服务器服务端TimeServer

  1. public class TimeServer {
  2. public void bind(int port) throws Exception {
  3. // 配置服务端的NIO线程组,一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写
  4. EventLoopGroup bossGroup = new NioEventLoopGroup();
  5. EventLoopGroup workerGroup = new NioEventLoopGroup();
  6. try {
  7. // ServerBootstrap,Netty用于启动NIO的辅助启动类,目的是降低服务端的开发复杂度
  8. ServerBootstrap b = new ServerBootstrap();
  9. // 将两个线程组传递到ServerBootstrap中,接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK
  10. // NIO类库中的ServerSocketChannel类。
  11. // 然后配置NioServerSocketChannel的TCP参数,最后绑定I/O事件处理类ChildChannelHandler,它的作用类似于Reactor模式中的Handler类,主要用于处理网络I/O事件,例如
  12. // 记录日志,对消息进行编解码等
  13. b.group(bossGroup, workerGroup)
  14. .channel(NioServerSocketChannel.class)
  15. .option(ChannelOption.SO_BACKLOG, 1024)
  16. .childHandler(new ChildChannelHandler());
  17. // 绑定端口,同步等待成功
  18. ChannelFuture f = b.bind(port).sync();
  19. // 等待服务端监听端口关闭之后,main函数才退出
  20. f.channel().closeFuture().sync();
  21. } finally {
  22. // 优雅退出,释放线程池资源
  23. bossGroup.shutdownGracefully();
  24. workerGroup.shutdownGracefully();
  25. }
  26. }
  27. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  28. @Override
  29. protected void initChannel(SocketChannel arg0) throws Exception {
  30. arg0.pipeline().addLast(new TimeServerHandler());
  31. }
  32. }
  33. public static void main(String[] args) throws Exception {
  34. int port = 8080;
  35. if (args != null && args.length > 0) {
  36. try {
  37. port = Integer.valueOf(args[0]);
  38. } catch (NumberFormatException e) {
  39. // 采用默认值
  40. }
  41. }
  42. new TimeServer().bind(port);
  43. }
  44. }

由于本章的重点是讲解Netty的应用开发,所以对于一些Netty的类库和用法仅仅做基础性的讲解,我们从黑盒的角度理解这些概念即可。后续源码分析章节会专门对Netty核心的类库和功能进行分析,感兴趣的同学可以跳到源码分析章节进行后续的学习。

我们从bind方法开始学习,在代码第20~21行创建了两个NioEventLoopGroup实例。NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组。这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。第23行我们创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。第24行调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。

服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成。完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。

第32行使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。

第34~36行调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。 下面看看TimeServerHandler类是如何实现的。

代码清单3-2 Netty时间服务器服务端TimeServerHandler

  1. public class TimeServerHandler extends ChannelHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg)
  4. throws Exception {
  5. // ByteBuf相当于JDK的java.nio.ByteBuffer对象,不过它提供了更加强大的功能。
  6. ByteBuf buf = (ByteBuf) msg;
  7. // 根据ByteBuf的readableBytes可以获取缓冲区可读的字数,根据可读的字节数据创建byte数组
  8. byte[] req = new byte[buf.readableBytes()];
  9. // 通过ByteBuf的readBytes方法将缓冲区中的字节数组复制到新建的byte数组中
  10. buf.readBytes(req);
  11. // 最后通过new String构造函数获取请求消息
  12. String body = new String(req, "UTF-8");
  13. System.out.println("The time server receive order: " + body)// 对请求消息进行判断,如果是"QUERY TIME ORDER"就创建应答消息,通过ChannelHandlerContext的write方法异步发送应答消息给客户端
  14. String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
  15. System.currentTimeMillis()).toString() : "BAD ORDER";
  16. ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  17. ctx.write(resp);
  18. }
  19. /**
  20. * ChannelHandlerContext调用flash方法,它的作用是将消息发送队列中的消息写入到SocketChannel中发送给对方。
  21. * 从性能角度考虑,为了防止频繁地唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入SocketChannel中,
  22. * 调用write方法只是把待发送的消息放到发送缓冲数组中,再通过调用flush方法,将发送缓冲区中的消息全部写到SocketChannel中。
  23. */
  24. @Override
  25. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  26. ctx.flush();
  27. }
  28. /**
  29. * 当发生异常时,关闭ChannelHandlerContext,释放和ChannelHandlerContext相关联的句柄等资源
  30. */
  31. @Override
  32. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  33. ctx.close();
  34. }
  35. }

TimeServerHandler继承自ChannelHandlerAdapter,它用于对网络事件进行读写操作,通常我们只需要关注channelRead和exceptionCaught方法。下面对这两个方法进行简单说明。

第17行做类型转换,将msg转换成Netty的ByteBuf对象。ByteBuf类似于JDK中的java.nio.ByteBuffer 对象,不过它提供了更加强大和灵活的功能。通过ByteBuf的readableBytes方法可以获取缓冲区可读的字节数,根据可读的字节数创建byte数组,通过ByteBuf的readBytes方法将缓冲区中的字节数组复制到新建的byte数组中,最后通过new String构造函数获取请求消息。这时对请求消息进行判断,如果是”QUERY TIME ORDER”则创建应答消息,通过ChannelHandlerContext的write方法异步发送应答消息给客户端。

第30行我们发现还调用了ChannelHandlerContext的flush方法,它的作用是将消息发送队列中的消息写入到SocketChannel中发送给对方。从性能角度考虑,为了防止频繁地唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入SocketChannel中,调用write方法只是把待发送的消息放到发送缓冲数组中,再通过调用flush方法,将发送缓冲区中的消息全部写到SocketChannel中。

第35行,当发生异常时,关闭ChannelHandlerContext,释放和ChannelHandlerContext相关联的句柄等资源。 通过对代码进行统计分析可以看出,不到30行的业务逻辑代码,即完成了NIO服务端的开发,相比于传统基于JDK NIO原生类库的服务端,代码量大大减少,开发难度也降低了很多。 下面我们继续学习客户端的开发,并使用Netty改造TimeClient。

3.3 Netty客户端开发

Netty客户端的开发相比于服务端更简单,下面我们就看下客户端的代码如何实现。 TimeClient开发
代码清单3-3 Netty时间服务器客户端TimeClient

  1. public class TimeClient {
  2. public void connect(int port, String host) throws Exception {
  3. // 创建客户端处理I/O读写的NioEventLoopGroup线程组
  4. EventLoopGroup group = new NioEventLoopGroup();
  5. try {
  6. Bootstrap b = new Bootstrap();
  7. b.group(group).channel(NioSocketChannel.class)
  8. .option(ChannelOption.TCP_NODELAY, true)
  9. .handler(new ChannelInitializer<SocketChannel>() {
  10. /**
  11. * initChannel作用是当创建NioSocketChannel成功之后,
  12. * 在初始化它的时候将它的ChannelHandler设置到
  13. * ChannelPipeline中,用于处理网络I/O事件
  14. */
  15. @Override
  16. public void initChannel(SocketChannel ch)
  17. throws Exception {
  18. ch.pipeline().addLast(new TimeClientHandler());
  19. }
  20. });
  21. // 客户端启动辅助类设置完成之后,调用connect方法发起异步连接,然后调用同步方法等待连接成功。
  22. ChannelFuture f = b.connect(host, port).sync();
  23. // 最后,当客户端连接关闭之后,客户端主函数退出,在退出之前,释放NIO线程组的资源。
  24. f.channel().closeFuture().sync();
  25. } finally {
  26. // 优雅退出,释放NIO线程组
  27. group.shutdownGracefully();
  28. }
  29. }
  30. public static void main(String[] args) throws Exception {
  31. int port = 8080;
  32. if (args != null && args.length > 0) {
  33. try {
  34. port = Integer.valueOf(args[0]);
  35. } catch (Exception e) {
  36. // 采用默认值
  37. }
  38. }
  39. new TimeClient().connect(port, "127.0.0.1");
  40. }
  41. }

我们从connect方法讲起,在第20行首先创建客户端处理I/O读写的NioEventLoop Group线程组,然后继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。与服务端不同的是,它的Channel需要设置为NioSocketChannel,然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,其作用是当创建NioSocketChannel成功之后,在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。

客户端启动辅助类设置完成之后,调用connect方法发起异步连接,然后调用同步方法等待连接成功。

最后,当客户端连接关闭之后,客户端主函数退出,在退出之前,释放NIO线程组的资源。 下面我们继续看下TimeClientHandler的代码如何实现。
代码清单3-4 
Netty时间服务器客户端TimeClientHandler

  1. public class TimeClientHandler extends ChannelHandlerAdapter {
  2. private static final Logger logger = Logger
  3. .getLogger(TimeClientHandler.class.getName());
  4. private final ByteBuf firstMessage;
  5. /**
  6. * Creates a client-side handle
  7. */
  8. public TimeClientHandler() {
  9. byte[] req = "QUERY TIME ORDER".getBytes();
  10. firstMessage = Unpooled.buffer(req.length);
  11. firstMessage.writeBytes(req);
  12. }
  13. /**
  14. * 当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,发送查询
  15. * 时间的指令给服务端,调用ChannelHandlerContext的writeAndFlush方法将请求消息发送给服务器。
  16. */
  17. @Override
  18. public void channelActive(ChannelHandlerContext ctx) {
  19. ctx.writeAndFlush(firstMessage);
  20. }
  21. /**
  22. * 当服务端返回应答消息时,channelRead方法被调用
  23. */
  24. @Override
  25. public void channelRead(ChannelHandlerContext ctx, Object msg)
  26. throws Exception {
  27. ByteBuf buf = (ByteBuf) msg;
  28. byte[] req = new byte[buf.readableBytes()];
  29. buf.readBytes(req);
  30. String body = new String(req, "UTF-8");
  31. System.out.println("Now is : " + body);
  32. }
  33. /**
  34. * 当发生异常时,打印异常日志,释放客户端资源
  35. */
  36. @Override
  37. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  38. // 释放资源
  39. logger.warning("Unexpected exception from downstream : "
  40. + cause.getMessage());
  41. ctx.close();
  42. }
  43. }

这里重点关注三个方法:channelActive、channelRead和exceptionCaught。当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,发送查询时间的指令给服务端,调用ChannelHandlerContext的writeAndFlush方法将请求消息发送给服务端。

当服务端返回应答消息时,channelRead方法被调用,第39~43行从Netty的ByteBuf中读取并打印应答消息。 第47~52行,当发生异常时,打印异常日志,释放客户端资源。

3.4 运行和调试

3.4.1 服务端和客户端的运行

在Eclipse开发环境中运行和调试Java程序非常简单,下面我们看下如何运行TimeServer:将光标定位到TimeServer类中,单击右键,在弹出菜单中选择Run As→Java Application,或者直接使用快捷键 Alt+Shift+X执行,如图3-6所示。 图3-6 运行TimeServer 客户端的执行类似,可以看到以下执行结果。 服务端运行结果如图3-7所示。 图3-7 TimeServer运行结果 客户端运行结果如图3-8所示。 图3-8 TimeClient运行结果 运行结果正确。可以发现,通过Netty开发的NIO服务端和客户端非常简单,短短几十行代码,就能完成之前NIO程序需要几百行才能完成的功能。基于Netty的应用开发不但API使用简单、开发模式固定,而且扩展性和定制性非常好,后面,我们会通过更多应用来介绍Netty的强大功能。 需要指出的是,本例程依然没有考虑读半包的处理,对于功能演示或者测试,上述程序没有问题,但是稍加改造进行性能或者压力测试,它就不能正确地工作了。在下一个章节我们会给出能够正确处理半包消息的应用实例。

3.4.2 打包和部署

基于Netty开发的都是非Web的Java应用,它的打包形态非常简单,就是一个普通的.jar包,通常情况下,在正式的商业开发中,我们会使用三种打包方式对源码进行打包:
(1)Eclipse提供的导出功能。它可以将指定的Java工程或者源码包、代码输出成指定的.jar包,它属于手工操作,当项目模块多之后非常不方便,所以一般不使用这种方式;
(2)使用ant脚本对工程进行打包。将Netty的应用程序打包成指定的.jar包,一般会输出一个软件安装包:xxxx_install.gz;
(3)使用Maven进行工程构建。它可以对模块间的依赖进行管理,支持版本的自动化测试、编译和构建,是目前主流的项目管理工具。

3.5 总结

本章节讲解了Netty的入门应用,通过使用Netty重构时间服务器程序,可以发现相比于传统的NIO程序,Netty的代码更加简洁、开发难度更低,扩展性也更好,非常适合作为基础通信框架被用户集成和使用。 在介绍Netty服务端和客户端时,简单地对代码进行了讲解,由于后续会有专门章节对Netty进行源码分析,所以在Netty应用部分我们不进行详细的源码解读和分析。 第4章会讲解一个稍微复杂的应用,它利用Netty提供的默认编解码功能解决了我们之前没有解决的读半包问题。事实上,对于读半包问题,Netty提供了很多种好的解决方案。下面一起学习一下如何利用Netty默认的编解码功能解决半包读取问题。

第5章 分隔符和定长解码器的应用


[TOC]
TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式。
(1)消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息;将计数器置位,重新开始读取下一个数据报;
(2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;
(3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符;
(4)通过在消息头中定义长度字段来标识消息的总长度。
Netty对上面四种应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。
第4章我们介绍了如何利用LineBasedFrameDecoder解决TCP的粘包问题,本章我们继续学习另外两种实用的解码器——
DelimiterBasedFrameDecoder和FixedLengthFrameDecoder,前者可以自动完成以分隔符做结束标志的消息的解码,后者可以自动完成对定长消息的解码,它们都能解决TCP粘包/拆包导致的读半包问题。
本章主要内容包括:
DelimiterBasedFrameDecoder服务端开发
DelimiterBasedFrameDecoder客户端开发
运行DelimiterBasedFrameDecoder服务端和客户端
FixedLengthFrameDecoder服务端开发
通过telnet命令行调试FixedLengthFrameDecoder服务端

5.1 DelimiterBasedFrameDecoder应用开发

通过对DelimiterBasedFrameDecoder的使用,我们可以自动完成以分隔符作为码流结束标识的消息的解码,下面通过一个演示程序来学习下如何使用DelimiterBased FrameDecoder进行开发。
演示程序以经典的Echo服务为例。EchoServer接收到EchoClient的请求消息后,将其打印出来,然后将原始消息返回给客户端,消息以“ $_ ”作为分隔符。

5.1.1 DelimiterBasedFrameDecoder服务端开发

下面我们直接看EchoServer的源代码:
代码清单5-1 EchoServer服务端EchoServer

  1. 22. public class EchoServer { 23. public void bind(int port) throws Exception { 24. // 配置服务端的NIO线程组 25. EventLoopGroup bossGroup = new NioEventLoopGroup(); 26. EventLoopGroup workerGroup = new NioEventLoopGroup(); 27. try { 28. ServerBootstrap b = new ServerBootstrap(); 29. b.group(bossGroup, workerGroup) 30. .channel(NioServerSocketChannel.class) 31. .option(ChannelOption.SO_BACKLOG, 100) 32. .handler(new LoggingHandler(LogLevel.INFO)) 33. .childHandler(new ChannelInitializer<SocketChannel>() { 34. @Override 35. public void initChannel(SocketChannel ch) 36. throws Exception { 37. ByteBuf delimiter = Unpooled.copiedBuffer("$_" 38. .getBytes()); 39. ch.pipeline().addLast( 40. new DelimiterBasedFrameDecoder(1024, 41. delimiter)); 42. ch.pipeline().addLast(new StringDecoder()); 43. ch.pipeline().addLast(new EchoServerHandler()); 44. } 45. }); 46. 47. // 绑定端口,同步等待成功 48. ChannelFuture f = b.bind(port).sync(); 49. 50. // 等待服务端监听端口关闭 51. f.channel().closeFuture().sync(); 52. } finally { 53. // 优雅退出,释放线程池资源 54. bossGroup.shutdownGracefully(); 55. workerGroup.shutdownGracefully(); 56. } 57. } 58. 59. public static void main(String[] args) throws Exception { 60. int port = 8080; 61. if (args != null && args.length > 0) { 62. try { 63. port = Integer.valueOf(args[0]); 64. } catch (NumberFormatException e) { 65. // 采用默认值 66. } 67. } 68. new EchoServer().bind(port); 69. } 70. }

我们重点看37~41行,首先创建分隔符缓冲对象ByteBuf,本例程中使用“ $_ ” 作为分隔符。第40行,创建DelimiterBasedFrameDecoder对象,将其加入到ChannelPipeline中。
DelimiterBasedFrameDecoder有多个构造方法,这里我们传递两个参数,第一个1024表示单条消息的最大长度,当达到该长度后仍然没有查找到分隔符,就抛出TooLongFrame Exception异常,防止由于异常码流缺失分隔符导致的内存溢出,这是Netty解码器的可靠性保护;第二个参数就是分隔符缓冲对象。
下面继续看EchoServerHandler的实现。
代码清单5-2 EchoServer服务端EchoServerHandler

  1. 13. @Sharable 14. public class EchoServerHandler extends ChannelHandlerAdapter { 15. 16. int counter = 0; 17. 18. @Override 19. public void channelRead(ChannelHandlerContext ctx, Object msg) 20. throws Exception { 21. String body = (String) msg; 22. System.out.println("This is " + ++counter + " times receive client : [" 23. + body + "]"); 24. body += "$_"; 25. ByteBuf echo = Unpooled.copiedBuffer(body.getBytes()); 26. ctx.writeAndFlush(echo); 27. } 28. 29. @Override 30. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 31. cause.printStackTrace(); 32. ctx.close();// 发生异常,关闭链路 33. } 34. }

第21~23行直接将接收的消息打印出来,由于DelimiterBasedFrameDecoder自动对请求消息进行了解码,后续的ChannelHandler接收到的msg对象就是个完整的消息包;第二个ChannelHandler是StringDecoder,它将ByteBuf解码成字符串对象;第三个EchoServerHandler接收到的msg消息就是解码后的字符串对象。
由于我们设置DelimiterBasedFrameDecoder过滤掉了分隔符,所以,返回给客户端时需要在请求消息尾部拼接分隔符“ $_ ”,最后创建ByteBuf,将原始消息重新返回给客户端。
下面我们继续看下客户端的实现。

5.1.2 DelimiterBasedFrameDecoder客户端开发

首先看下EchoClient的实现。
代码清单5-3 EchoClient客户端EchoClient

  1. 20. public class EchoClient { 21. 22. public void connect(int port, String host) throws Exception { 23. // 配置客户端NIO线程组 24. EventLoopGroup group = new NioEventLoopGroup(); 25. try { 26. Bootstrap b = new Bootstrap(); 27. b.group(group).channel(NioSocketChannel.class) 28. .option(ChannelOption.TCP_NODELAY, true) 29. .handler(new ChannelInitializer<SocketChannel>() { 30. @Override 31. public void initChannel(SocketChannel ch) 32. throws Exception { 33. ByteBuf delimiter = Unpooled.copiedBuffer("$_" 34. .getBytes()); 35. ch.pipeline().addLast( 36. new DelimiterBasedFrameDecoder(1024, 37. delimiter)); 38. ch.pipeline().addLast(new StringDecoder()); 39. ch.pipeline().addLast(new EchoClientHandler()); 40. } 41. }); 42. 43. // 发起异步连接操作 44. ChannelFuture f = b.connect(host, port).sync(); 45. 46. // 等待客户端链路关闭 47. f.channel().closeFuture().sync(); 48. } finally { 49. // 优雅退出,释放NIO线程组 50. group.shutdownGracefully(); 51. } 52. } 53. 54. /** 55. * @param args 56. * @throws Exception 57. */ 58. public static void main(String[] args) throws Exception { 59. int port = 8080; 60. if (args != null && args.length > 0) { 61. try { 62. port = Integer.valueOf(args[0]); 63. } catch (NumberFormatException e) { 64. // 采用默认值 65. } 66. } 67. new EchoClient().connect(port, "127.0.0.1"); 68. } 69. }

与服务端类似,分别将DelimiterBasedFrameDecoder和StringDecoder添加到客户端ChannelPipeline中,最后添加客户端I/O事件处理类EchoClientHandler,下面继续看EchoClientHandler的实现。
代码清单5-4 EchoClient客户端EchoClientHandler

  1. 11. public class EchoClientHandler extends ChannelHandlerAdapter { 12. 13. private int counter; 14. 15. static final String ECHO_REQ = "Hi, Lilinfeng. Welcome to Netty.$_"; 16. 17. /** 18. * Creates a client-side handler. 19. */ 20. public EchoClientHandler() { 21. } 22. 23. @Override 24. public void channelActive(ChannelHandlerContext ctx) { 25. for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); 26. } 27. } 28. 29. @Override 30. public void channelRead(ChannelHandlerContext ctx, Object msg) 31. throws Exception { 32. System.out.println("This is " + ++counter + " times receive server : [" 33. + msg + "]"); 34. } 35. 36. @Override 37. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 38. ctx.flush(); 39. } 40. 41. @Override 42. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 43. cause.printStackTrace(); 44. ctx.close(); 45. } 46. }

第25~26行在TCP链路建立成功之后循环发送请求消息给服务端,第32~33行打印接收到的服务端应答消息同时进行计数。 下个小节,运行上面开发的服务端和客户端,看看运行结果是否正确。

5.1.3 运行DelimiterBasedFrameDecoder服务端和客户端

服务端运行结果如下。

  1. This is 1 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  2. This is 2 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  3. This is 3 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  4. This is 4 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  5. This is 5 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  6. This is 6 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  7. This is 7 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  8. This is 8 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  9. This is 9 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
  10. This is 10 times receive client : [Hi, Lilinfeng. Welcome to Netty.]

客户端运行结果如下。

  1. This is 1 times receive server : [Hi, Lilinfeng. Welcome to Netty.]
  2. This is 2 times receive server : [Hi, Lilinfeng. Welcome to Netty.] This is 3 times receive server : [Hi, Lilinfeng. Welcome to Netty.]
  3. This is 4 times receive server : [Hi, Lilinfeng. Welcome to Netty.] This is 5 times receive server : [Hi, Lilinfeng. Welcome to Netty.]
  4. This is 6 times receive server : [Hi, Lilinfeng. Welcome to Netty.] This is 7 times receive server : [Hi, Lilinfeng. Welcome to Netty.]
  5. This is 8 times receive server : [Hi, Lilinfeng. Welcome to Netty.] This is 9 times receive server : [Hi, Lilinfeng. Welcome to Netty.]
  6. This is 10 times receive server : [Hi, Lilinfeng. Welcome to Netty.]

服务端成功接收到了客户端发送的10条“Hi, Lilinfeng. Welcome to Netty.”请求消息,客户端成功接收到了服务端返回的10条“Hi, Lilinfeng. Welcome to Netty.”应答消息。测试结果表明使用DelimiterBasedFrameDecoder可以自动对采用分隔符做码流结束标识的消息进行解码。

本例程运行10次的原因是模拟TCP粘包/拆包,在笔者的机器上,连续发送10条Echo请求消息会发生粘包,如果没有DelimiterBasedFrameDecoder解码器的处理,服务端和客户端程序都将运行失败。下面我们将服务端的DelimiterBasedFrameDecoder注释掉,最终代码如图5-1所示。
图5-1 删除掉DelimiterBasedFrameDecoder后的服务端代码
服务端运行结果如下。

  1. This is 1 times receive client : [Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_Hi, Lilinfeng. Welcome to Netty.$_]

由于没有分隔符解码器,导致服务端一次读取了客户端发送的所有消息,这就是典型的没有考虑TCP粘包导致的问题。

5.2 FixedLengthFrameDecoder应用开发

FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常实用。下面我们通过一个应用实例对其用法进行讲解。

5.2.1 FixedLengthFrameDecoder服务端开发

在服务端的ChannelPipeline中新增FixedLengthFrameDecoder,长度设置为20,然后再依次增加字符串解码器和EchoServerHandler,代码如下。
代码清单5-5 EchoServer服务端 EchoServer

  1. 20. public class EchoServer { 21. public void bind(int port) throws Exception { 22. // 配置服务端的NIO线程组 23. EventLoopGroup bossGroup = new NioEventLoopGroup(); 24. EventLoopGroup workerGroup = new NioEventLoopGroup(); 25. try { 26. ServerBootstrap b = new ServerBootstrap(); 27. b.group(bossGroup, workerGroup) 28. .channel(NioServerSocketChannel.class) 29. .option(ChannelOption.SO_BACKLOG, 100) 30. .handler(new LoggingHandler(LogLevel.INFO)) 31. .childHandler(new ChannelInitializer<SocketChannel>() { 32. @Override 33. public void initChannel(SocketChannel ch) 34. throws Exception { 35. ch.pipeline().addLast( 36. new FixedLengthFrameDecoder(20)); 37. ch.pipeline().addLast(new StringDecoder()); 38. ch.pipeline().addLast(new EchoServerHandler()); 39. } 40. }); 41. 42. // 绑定端口,同步等待成功 43. ChannelFuture f = b.bind(port).sync(); 44. 45. // 等待服务端监听端口关闭 46. f.channel().closeFuture().sync(); 47. } finally { 48. // 优雅退出,释放线程池资源 49. bossGroup.shutdownGracefully(); 50. workerGroup.shutdownGracefully(); 51. } 52. } 53. 54. public static void main(String[] args) throws Exception { 55. int port = 8080; 56. if (args != null && args.length > 0) { 57. try { 58. port = Integer.valueOf(args[0]); 59. } catch (NumberFormatException e) { 60. // 采用默认值 61. } 62. } 63. new EchoServer().bind(port); 64. } 65. }

EchoServerHandler的功能比较简单,直接将读取到的消息打印出来,代码如下。
代码清单5-6 EchoServer服务端 EchoServerHandler

  1. 11. @Sharable 12. public class EchoServerHandler extends ChannelHandlerAdapter { 13. 14. @Override 15. public void channelRead(ChannelHandlerContext ctx, Object msg) 16. throws Exception { 17. System.out.println("Receive client : [" + msg + "]"); 18. } 19. 20. @Override 21. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 22. cause.printStackTrace(); 23. ctx.close();// 发生异常,关闭链路 24. } 25. }

利用FixedLengthFrameDecoder解码器,无论一次接收到多少数据报,它都会按照构造函数中设置的固定长度进行解码,如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。
下面的章节我们通过telnet命令行来测试EchoServer 服务端,看它能否按照预期进行工作。

5.2.2 利用telnet命令行测试EchoServer服务端

由于客户端代码比较简单,所以这次我们通过telnet命令行对服务端进行测试。
测试场景:在Windows操作系统上打开CMD命令行窗口,通过telnet命令行连接服务端,在控制台输入如下内容。
Lilinfeng welcome to Netty at Nanjing
然后看服务端打印的内容,预期输出的请求消息为“Lilinfeng welcome to”。
下面我们就具体看下详细的测试步骤。
(1)在【运行】菜单中输入cmd命令,打开命令行窗口,如图5-2所示。
图5-2 通过cmd命令打开CMD窗口
(2)在命令行中输入“telnet localhost 8080”,通过telnet连接服务端,如图5-3所示。
图5-3 通过telnet命令连接服务端
(3)通过set localecho命令打开本地回显功能,输入命令行内容,如图5-4所示。
图5-4 输入Lilinfeng welcome to Netty at Nanjing
(4)EchoServer服务端运行结果如图5-5所示。
图5-5 服务端运行结果 根据图5-5所示内容,服务端运行结果完全符合预期,FixedLengthFrameDecoder解码器按照20个字节
长度对请求消息进行截取,输出结果为“Lilinfeng welcome to”。

5.3 总结

本章我们学习了两个非常实用的解码器:DelimiterBasedFrameDecoder和FixedLength FrameDecoder。
DelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码,FixedLengthFrameDecoder用于对固定长度的消息进行自动解码。
有了上述两种解码器,再结合其他的解码器,如字符串解码器等,可以轻松地完成对很多消息的自动解码,而且不再需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。

应用DelimiterBasedFrameDecoder和FixedLengthFrameDecoder进行开发非常简单,在绝大数情况下,只要将DelimiterBasedFrameDecoder或FixedLengthFrameDecoder添加到对应ChannelPipeline的起始位即可。

熟悉了Netty的NIO基础应用开发之后,从第三部分开始,我们继续学习编解码技术。在了解编解码基础知识之后,继续学习Netty内置的编解码框架的使用,例如Java序列化、二进制编解码、谷歌的protobuf和JBoss的Marshalling序列化框架。

第18章 EventLoop和EventLoopGroup


从本章开始我们将学习Netty的线程模型。Netty框架的主要线程就是I/O线程,线程模型设计的好坏,决定了系统的吞吐量、并发性和安全性等架构质量属性。 Netty的线程模型被精心地设计,既提升了框架的并发性能,又能在很大程度避免锁,局部实现了无锁化设计。

从本章开始,我们将介绍Netty的线程模型,同时对它的NIO线程NioEventLoop进行详尽地源码分析,让读者能够学习到更多I/O相关的多线程设计原理和实现。

本章主要内容包括:
Netty的线程模型
NioEventLoop源码分析

18.1 Netty的线程模型

当我们讨论Netty线程模型的时候,一般首先会想到的是经典的Reactor线程模型,尽管不同的NIO框架对于Reactor模式的实现存在差异,但本质上还是遵循了Reactor的基础线程模型。

下面让我们一起回顾经典的Reactor线程模型。

18.1.1 Reactor单线程模型

Reactor单线程模型,是指所有的I/O操作都在同一个NIO线程上面完成。NIO线程的职责如下。

  • 作为NIO服务端,接收客户端的TCP连接;
  • 作为NIO客户端,向服务端发起TCP连接;
  • 读取通信对端的请求或者应答消息;
  • 向通信对端发送消息请求或者应答消息。

Reactor单线程模型如图18-1所示。

图18-1 Reactor单线程模型
由于Reactor模式使用的是异步非阻塞I/O,所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor类接收客户端的TCP连接请求消息,当链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上,进行消息解码。用户线程消息编码后通过NIO线程将消息发送给客户端。

在一些小容量应用场景下,可以使用单线程模型。但是这对于高负载、大并发的应用场景却不合适,主要原因如下。 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送。 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。 为了解决这些问题,演进出了Reactor多线程模型。下面我们一起学习下Reactor多线程模型。

18.1.2 Reactor多线程模型

Rector多线程模型与单线程模型最大的区别就是有一组NIO线程来处理I/O操作,它的原理如图18-2所示。 图18-2 Reactor多线程模型 Reactor多线程模型的特点如下。 有专门一个NIO线程——Acceptor线程用于监听服务端,接收客户端的TCP连接请求。 网络I/O操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和 N 个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。 一个NIO线程可以同时处理 N 条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题。 在绝大多数场景下,Reactor多线程模型可以满足性能需求。但是,在个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足的问题,为了解决性能问题,产生了第三种Reactor线程模型——主从Reactor多线程模型。

18.1.3 主从Reactor多线程模型

主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作。 它的线程模型如图18-3所示。 图18-3 主从Reactor多线程模型 利用主从NIO线程模型,可以解决一个服务端监听线程无法有效处理所有客户端连接的性能不足问题。因此,在Netty的官方demo中,推荐使用该线程模型。

18.1.4 Netty的线程模型

Netty的线程模型并不是一成不变的,它实际取决于用户的启动参数配置。通过设置不同的启动参数,Netty可以同时支持Reactor单线程模型、多线程模型和主从Reactor多线层模型。 下面让我们通过一张原理图(图18-4)来快速了解Netty的线程模型。

图18-4 Netty的线程模型
可以通过如图18-5所示的Netty服务端启动代码来了解它的线程模型。 图18-5 Netty服务端启动 服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池。一个用于接收客户端的TCP连接,另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
Netty用于接收客户端请求的线程池职责如下。
(1)接收客户端TCP连接,初始化Channel参数;
(2)将链路状态变更事件通知给ChannelPipeline。

Netty处理I/O操作的Reactor线程池职责如下。
(1)异步读取通信对端的数据报,发送读事件到ChannelPipeline;
(2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;
(3)执行系统调用Task;
(4)执行定时任务Task,例如链路空闲状态监测定时任务。 通过调整线程池的线程个数、是否共享线程池等方式,Netty的Reactor线程模型可以在单线程、多线程和主从多线程间切换,这种灵活的配置方式可以最大程度地满足不同用户的个性化定制。 为了尽可能地提升性能,Netty在很多地方进行了无锁化的设计,例如在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列—多个工作线程的模型性能更优。 它的设计原理如图18-6所示。

图18-6 Netty Reactor线程模型 Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead (Object msg)。只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

18.1.5 最佳实践

Netty的多线程编程最佳实践如下。
(1)创建两个NioEventLoopGroup,用于逻辑隔离NIO Acceptor和NIO I/O线程。 (2)尽量不要在ChannelHandler中启动用户线程(解码后用于将POJO消息派发到后端业务线程的除外)。
(3)解码要放在NIO线程调用的解码Handler中进行,不要切换到用户线程中完成消息的解码。
(4)如果业务逻辑操作非常简单,没有复杂的业务逻辑计算,没有可能会导致线程被阻塞的磁盘操作、数据库操作、网路操作等,可以直接在NIO线程上完成业务逻辑编排,不需要切换到用户线程。
(5)如果业务逻辑处理复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其他的I/O操作。

推荐的线程数量计算公式有以下两种。
公式一:线程数量=(线程总时间/瓶颈资源时间)×瓶颈资源的线程并行数;
公式二:QPS=1000/线程总时间×线程数。 由于用户场景的不同,对于一些复杂的系统,实际上很难计算出最优线程配置,只能是根据测试数据和用户场景,结合公式给出一个相对合理的范围,然后对范围内的数据进行性能测试,选择相对最优值。