Netty是一个异步的、基于事件驱动的网络应用程序,用于快速开发高性能、高可靠性的网络IO程序。Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景。

    • BIO适用于连接数目比较小且固定的架构,它对于服务器资源的要求较高
    • NIO适用于连接数目较多且连接比较短的架构,编程复杂
    • AIO适用于连接数目多且连接比较长的架构,编程复杂

    BIO的线程池改进案例
    **

    1. public class BIOServer {
    2. public static void main(String[] args) throws IOException {
    3. //1、创建一个线程池
    4. //2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
    5. ExecutorService executorService = Executors.newCachedThreadPool();
    6. //创建ServerSocket
    7. ServerSocket serverSocket = new ServerSocket(6666);
    8. System.out.println("服务器启动了");
    9. while (true) {
    10. System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
    11. //监听,等待客户端连接
    12. System.out.println("等待连接");
    13. final Socket socket = serverSocket.accept();
    14. System.out.println("连接到一个客户端");
    15. //创建一个线程,与之通讯
    16. executorService.execute(() -> {
    17. //重写Runnable方法,与客户端进行通讯
    18. handler(socket);
    19. });
    20. }
    21. }
    22. //编写一个Handler方法,和客户端通讯
    23. public static void handler(Socket socket) {
    24. try {
    25. System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
    26. byte[] bytes = new byte[1024];
    27. //通过socket获取输入流
    28. InputStream inputStream = socket.getInputStream();
    29. //循环的读取客户端发送的数据
    30. while (true){
    31. System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
    32. System.out.println("read....");
    33. int read = inputStream.read(bytes);
    34. if (read != -1){
    35. System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
    36. } else {
    37. break;
    38. }
    39. }
    40. } catch (IOException e) {
    41. e.printStackTrace();
    42. } finally {
    43. System.out.println("关闭和client的连接");
    44. try {
    45. socket.close();
    46. } catch (IOException e) {
    47. e.printStackTrace();
    48. }
    49. }
    50. }
    51. }

    使用Java NIO实现简单的客户端和服务器通信:

    1. @Test
    2. public void Server() throws IOException {
    3. //创建ServerSocketChannel -> ServerSocket
    4. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    5. //得到一个Selector对象
    6. Selector selector = Selector.open();
    7. //绑定一个端口6666
    8. serverSocketChannel.socket().bind(new InetSocketAddress(6666));
    9. //设置非阻塞
    10. serverSocketChannel.configureBlocking(false);
    11. //把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接
    12. SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    13. System.out.println();
    14. //循环等待客户端连接
    15. while (true) {
    16. //等待1秒,如果没有事件发生,就返回
    17. if (selector.select(1000) == 0) {
    18. System.out.println("服务器等待了1秒,无连接");
    19. continue;
    20. }
    21. //如果返回的 > 0,表示已经获取到关注的事件
    22. // 就获取到相关的 selectionKey 集合,反向获取通道
    23. Set<SelectionKey> selectionKeys = selector.selectedKeys();
    24. //遍历 Set<SelectionKey>,使用迭代器遍历
    25. Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
    26. while (keyIterator.hasNext()) {
    27. //获取到SelectionKey
    28. SelectionKey key = keyIterator.next();
    29. //根据 key 对应的通道发生的事件,做相应的处理
    30. if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接
    31. //该客户端生成一个 SocketChannel
    32. SocketChannel socketChannel = serverSocketChannel.accept();
    33. System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());
    34. //将SocketChannel设置为非阻塞
    35. socketChannel.configureBlocking(false);
    36. //将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个Buffer
    37. socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
    38. }
    39. if (key.isReadable()) {
    40. //通过key,反向获取到对应的Channel
    41. SocketChannel channel = (SocketChannel) key.channel();
    42. //获取到该channel关联的Buffer
    43. ByteBuffer buffer = (ByteBuffer) key.attachment();
    44. channel.read(buffer);
    45. System.out.println("from 客户端:" + new String(buffer.array()));
    46. }
    47. //手动从集合中移除当前的 selectionKey,防止重复操作
    48. keyIterator.remove();
    49. }
    50. }
    51. }
    52. @Test
    53. public void Client() throws IOException {
    54. //得到一个网络通道
    55. SocketChannel socketChannel = SocketChannel.open();
    56. //设置非阻塞
    57. socketChannel.configureBlocking(false);
    58. //提供服务器端的IP和端口
    59. InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
    60. //连接服务器
    61. if (!socketChannel.connect(socketAddress)){ //如果不成功
    62. while (!socketChannel.finishConnect()){
    63. System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");
    64. }
    65. }
    66. //如果连接成功,就发送数据
    67. String str = "hello, 尚硅谷";
    68. ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
    69. //发送数据,实际上就是将buffer数据写入到channel
    70. socketChannel.write(byteBuffer);
    71. System.in.read();
    72. }

    原生NIO存在的问题:

    • 类库和API繁杂,使用麻烦
    • 必须熟悉多线程和网络编程
    • 开发难度高
    • 存在BUG:例如Epoll Bug,它会导致Selector空轮询,最终导致CPU100%被占用

    Netty的优点:

    • 适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池
    • 完整的 SSL/TLS 和 StartTLS 支持
    • 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制

    Netty主要是由基于主从的Reactor多线程模型进行了改进,其中主从Reactor都由一个变成了多个。Netty模型示意图如下所示:
    image.png

    如上所示,Netty抽象出两组线程池:

    • BossGroup:负责接收客户端的连接
    • WorkerGroup:负责读写请求

    它们都是NioEventGroup类型,分别维护者多个BioEventLoop线程。初始化这两个Group线程组时,默认会在每个Group中生成 CPU个数2倍的NioEventLoop线程。当客户端请求到来时,Group默认会按照连接请求的顺序分别将这些连接分给各个NioEventLoop处理,通过Group还负责管理NioEventLoop的生命周期。

    NioEventLoop表示一个不断循环执行处理任务的线程,它维护了一个线程和任务队列。每个NioEventLoop都包含一个Selector,用于监听绑定在上面的Channel对应的Socket请求。没增加一个Socket连接,NioEventLoopGroup就将这个请求一次分发给它下面的NioEventLoop处理。

    每个NioEventLoop循环执行分为如下的三步:

    • 轮询accept事件
    • 处理accept事件,与Client建立连接,创建NioSocketChannel,并将其注册到Worker Group的某个NioEventLoopGroup的Selector上
    • 处理任务队列的任务,即runAllTasks

    对于Worker Group中的每个NioEventLoop来说,它执行的步骤如下所示:

    • 轮询read、write事件
    • 在对应的NioSocketChannel中处理事件
    • 处理任务队列中的任务,即runAllTasks

    每个 Worker Group中的NioEventLoop处理业务时,会使用pipeline(管道),pipeline中维护了一个ChannelHandlerContext链表,而ChannelHandlerContext则保存了Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。如图所示,Channel和pipeline一一对应,ChannelHandler和ChannelHandlerContext一一对应。
    image.png

    ChannelHandler是一个接口,负责处理或拦截IO操作,并将其转发到Pipeline中的下一个Handler中进行处理。

    1. I/O Request
    2. via Channel or
    3. ChannelHandlerContext
    4. |
    5. +---------------------------------------------------+---------------+
    6. | ChannelPipeline | |
    7. | \|/ |
    8. | +---------------------+ +-----------+----------+ |
    9. | | Inbound Handler N | | Outbound Handler 1 | |
    10. | +----------+----------+ +-----------+----------+ |
    11. | /|\ | |
    12. | | \|/ |
    13. | +----------+----------+ +-----------+----------+ |
    14. | | Inbound Handler N-1 | | Outbound Handler 2 | |
    15. | +----------+----------+ +-----------+----------+ |
    16. | /|\ . |
    17. | . . |
    18. | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
    19. | [ method call] [method call] |
    20. | . . |
    21. | . \|/ |
    22. | +----------+----------+ +-----------+----------+ |
    23. | | Inbound Handler 2 | | Outbound Handler M-1 | |
    24. | +----------+----------+ +-----------+----------+ |
    25. | /|\ | |
    26. | | \|/ |
    27. | +----------+----------+ +-----------+----------+ |
    28. | | Inbound Handler 1 | | Outbound Handler M | |
    29. | +----------+----------+ +-----------+----------+ |
    30. | /|\ | |
    31. +---------------+-----------------------------------+---------------+
    32. | \|/
    33. +---------------+-----------------------------------+---------------+
    34. | | | |
    35. | [ Socket.read() ] [ Socket.write() ] |
    36. | |
    37. | Netty Internal I/O Threads (Transport Implementation) |
    38. +-------------------------------------------------------------------+

    下面使用Netty来实现一个简单的服务器和客户端,其中服务器的实现如下:

    1. public class NettyServer {
    2. public static void main(String[] args) throws InterruptedException {
    3. //创建BossGroup 和 WorkerGroup
    4. EventLoopGroup bossGroup = new NioEventLoopGroup();
    5. EventLoopGroup worderGroup = new NioEventLoopGroup();
    6. try {
    7. //创建服务器端的启动对象,配置参数
    8. ServerBootstrap bootstrap = new ServerBootstrap();
    9. //使用链式编程来进行设置,配置
    10. bootstrap.group(bossGroup, worderGroup) //设置两个线程组
    11. .channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现
    12. .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
    13. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
    14. .childHandler(new ChannelInitializer<SocketChannel>() { //为accept channel的pipeline预添加的handler
    15. //给 pipeline 添加处理器,每当有连接accept时,就会运行到此处。
    16. @Override
    17. protected void initChannel(SocketChannel socketChannel) throws Exception {
    18. socketChannel.pipeline().addLast(new NettyServerHandler());
    19. }
    20. }); //给我们的 workerGroup 的 EventLoop 对应的管道设置处理器
    21. System.out.println("........服务器 is ready...");
    22. //绑定一个端口并且同步,生成了一个ChannelFuture 对象
    23. //启动服务器(并绑定端口)
    24. ChannelFuture future = bootstrap.bind(6668).sync();
    25. //对关闭通道进行监听
    26. future.channel().closeFuture().sync();
    27. } finally {
    28. bossGroup.shutdownGracefully();
    29. worderGroup.shutdownGracefully();
    30. }
    31. }
    32. }

    服务器Handler的实现如下:

    1. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    2. /**
    3. *读取客户端发送过来的消息
    4. * @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
    5. * @param msg 就是客户端发送的数据,默认Object
    6. * @throws Exception
    7. */
    8. @Override
    9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    10. System.out.println("服务器读取线程:" + Thread.currentThread().getName());
    11. System.out.println("server ctx = " + ctx);
    12. //看看Channel和Pipeline的关系
    13. Channel channel = ctx.channel();
    14. ChannelPipeline pipeline = ctx.pipeline(); //本质是个双向链表,出栈入栈
    15. //将msg转成一个ByteBuf,比NIO的ByteBuffer性能更高
    16. ByteBuf buf = (ByteBuf)msg;
    17. System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
    18. System.out.println("客户端地址:" + ctx.channel().remoteAddress());
    19. }
    20. //数据读取完毕
    21. @Override
    22. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    23. //它是 write + flush,将数据写入到缓存buffer,并将buffer中的数据flush进通道
    24. //一般讲,我们对这个发送的数据进行编码
    25. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
    26. }
    27. //处理异常,一般是关闭通道
    28. @Override
    29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    30. ctx.close();
    31. }
    32. }

    客户端的实现如下:

    1. public class NettyClient {
    2. public static void main(String[] args) throws InterruptedException {
    3. //客户端需要一个事件循环组
    4. EventLoopGroup group = new NioEventLoopGroup();
    5. try {
    6. //创建客户端启动对象
    7. //注意:客户端使用的不是 ServerBootStrap 而是 Bootstrap
    8. Bootstrap bootstrap = new Bootstrap();
    9. //设置相关参数
    10. bootstrap.group(group) //设置线程组
    11. .channel(NioSocketChannel.class) //设置客户端通道的实现类(使用反射)
    12. .handler(new ChannelInitializer<SocketChannel>() {
    13. @Override
    14. protected void initChannel(SocketChannel socketChannel) throws Exception {
    15. socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
    16. }
    17. });
    18. System.out.println("客户端 OK...");
    19. //启动客户端去连接服务器端
    20. //关于 channelFuture 涉及到 netty 的异步模型
    21. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    22. //给关闭通道进行监听
    23. channelFuture.channel().closeFuture().sync();
    24. } finally {
    25. group.shutdownGracefully();
    26. }
    27. }
    28. }

    客户端的Handler实现如下:

    1. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    2. /**
    3. * 当通道就绪就会触发
    4. * @param ctx
    5. * @throws Exception
    6. */
    7. @Override
    8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    9. System.out.println("client: " + ctx);
    10. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
    11. }
    12. /**
    13. * 当通道有读取事件时,会触发
    14. * @param ctx
    15. * @param msg
    16. * @throws Exception
    17. */
    18. @Override
    19. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    20. ByteBuf buf = (ByteBuf)msg;
    21. System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
    22. System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
    23. }
    24. @Override
    25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    26. cause.printStackTrace();
    27. ctx.close();
    28. }
    29. }

    任务队列
    **
    任务队列由NioEventLoop维护并不断执行。当我们就收到请求之后,在当前channel对应的pipeline中的各个Handler里面进行业务处理和请求过滤。当某些业务需要耗费大量时间的时候,我们可以将任务提交到由NioEventLoop维护的taskQueue或scheduleTaskQueue中,让当前的NioEventLoop线程在空闲时间去执行这些任务。

    提交任务有3种方式:

    • 用户自定义的普通任务:该方式会将任务提交到taskQueue队列中。提交到该队列中的任务会按照提交顺序依次执行

      1. channelHandlerContext.channel().eventLoop().execute(new Runnable(){
      2. @Override
      3. public void run() {
      4. //...
      5. }
      6. });
    • 用于自定义的定时任务:该方式会将任务提交到scheduleTaskQueue定时任务队列中。该队列是底层是优先队列PriorityQueue实现的,固该队列中的任务会按照时间的先后顺序定时执行

      1. channelHandlerContext.channel().eventLoop().schedule(new Runnable() {
      2. @Override
      3. public void run() {
      4. }
      5. }, 60, TimeUnit.SECONDS);
    • 为其他EventLoop线程对应的Channel添加任务:可以在ChannelInitializer中,将刚创建的各个Channel以及对应的标识加入到统一的集合中去,然后可以根据标识获取Channel以及其对应的NioEventLoop,然后就课程调用execute()或者schedule()方法