概述

入门程序

  1. /*
  2. Netty实现一个服务器
  3. */
  4. public class HelloServer {
  5. public static void main(String[] args) {
  6. // 1.启动器,组装netty组件,启动服务器
  7. ServerBootstrap bootstrap = new ServerBootstrap()
  8. // 2.循环 监听事件
  9. .group(new NioEventLoopGroup()) // 可监听多种事件,accpet read等
  10. // 3.选择服务器的 ServerSocketChannel
  11. .channel(NioServerSocketChannel.class)
  12. // 4.添加处理器handler
  13. .childHandler(
  14. // 5.通道初始化器,用来添加额外的handler
  15. new ChannelInitializer<NioSocketChannel>() {
  16. @Override
  17. protected void initChannel(NioSocketChannel ch) throws Exception {
  18. ch.pipeline().addLast(new StringDecoder());// 内置Handler,将ByteBuf 装换为字符串
  19. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义handler,Inbound为入站handler
  20. // 读事件
  21. @Override
  22. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  23. super.channelRead(ctx, msg);
  24. // 将上一步中的转换好的字符串输出
  25. System.out.println(msg);
  26. }
  27. });
  28. }
  29. });
  30. // 7.绑定端口
  31. bootstrap.bind(8080);
  32. }
  33. }

Netty组件

EventLoop

EventLoop

事件循环对象
本质是一个单线程执行器,维护了一个Selector,有一个run()方法处理Channel上源源不断的io事件。
采用了两个基本的API:并发网络编程
一条线继承j.u.c.ScheduledExecutorService(此接口指定以一个parent()方法,用于放回当前EventLoop实现的实例所属的EventLoopGroup的引用)
另一条线继承自netty的OrderedEventExecutor
image.pngimage.png

EventLoop的任务调度

Netty线程模型的卓越性能取决于对于当前执行的 Thread 的身份的确定

通过调用 EventLoop 的 inEventLoop(Thread)方法实现(接口EventExecutor中定义了
boolean inEventLoop(_Thread thread)_方法)

  • 如果当前线程是支撑EventLoop的线程,那么代码会直接执行;
  • 如果不是,EventLoop将调度任务放入内部队列稍后执行

    Netty任务调度有两种方式

  • 用于异步传输

  • 用于阻塞传输

EventLoopGroup

时间循环对象组
维护了一组EventLoop。
new NioEventLoopGroup():处理IO任务、普通任务、定时任务
new DefaultEventLoopGroup():普通任务、定时任务

方法与属性

NioEventLoopGroup group = new NioEventLoopGroup() 构造方法;

创建默认个数的EventLoop 默认个数如何确定?默认个数为 主动设置的eventLoopThreads系统参数,或物理内核数的两倍 nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));

NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);

创建指定个数的EventLoop

group.next()

获取下一个事件循环对象

  1. // 循环遍历group中的EventLoop
  2. DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
  3. System.out.println(group.next());
  4. System.out.println(group.next());
  5. System.out.println(group.next());

执行任务:IO任务、普通任务、定时任务

执行普通任务

  1. 可以异步执行任务 ```java NioEventLoopGroup group = new NioEventLoopGroup();
    1. log.debug("main1");
    2. group.next().submit(() -> {
    3. Thread.sleep(1000);
    4. log.debug("submit finished");
    }); log.debug(“main2”);

// 结果: 14:42:15.787 [main] DEBUG com.sky.netty.ch03.TestEventLoop - main1 14:42:15.840 [main] DEBUG com.sky.netty.ch03.TestEventLoop - main2 14:42:16.847 [nioEventLoopGroup-2-1] DEBUG com.sky.netty.ch03.TestEventLoop - submit finished

<a name="WCrYW"></a>
#### 执行定时任务
```java
// 每秒钟打印一次 ok
group.next().scheduleAtFixedRate(()->{
    log.debug("ok");
},0,1, TimeUnit.SECONDS);

执行IO任务

演示代码:
// Server端
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                String res = buf.toString(Charset.defaultCharset());
                                log.debug("服务端收到:"+res);
                            }
                        });
                    }
                })
        .bind(8080);
    }
}

// 客户端
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                }).connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
    }
}

演示步骤:
  1. 启动服务器
  2. 设置客户端应用 Allow Parallel Run,这样可以启多个同样的客户端

image.png

  1. 客户端拦截断点后,执行代码 channel.writeAndFlush("1"),可以向服务端发送数据

    客户端断点类型需要设置Thread: 这样断点只拦截main线程,不拦截netty维护的nioEventLoopGroup线程(此线程用来进行网络IO通信)

image.png
image.png

分工细化

上面演示的服务端只有一个EventLoopGroup

Channel

常用方法:

  • close() 关闭channel
  • closeFuture() 用来处理channel的关闭
  • write() 将数据写入缓冲区
  • writerAndFlush() 将数据写入缓冲区并发送

channelFuture

处理结果

connect()方法返回ChannelFuture对象。
有同步、异步两种方式处理结果

    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 1.连接到服务器
                // 异步非阻塞
                // main线程发起连接调用,真正执行connect的是nio线程
                .connect(new InetSocketAddress("localhost", 8080));
        // 方式一:使用sync同步处理结果
        // sync()的作用: 阻塞住当前线程,等nio线程 connect()方法异步执行完后再执行后面的方法
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        // 2. 向服务器发送数据
        channel.writeAndFlush("hello");

        /*
         方式二:
         使用addListener 方法异步处理结果
         */
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // nio线程连接建立好后,会调用 operationComplete
                Channel channel1 = future.channel();
                log.debug("{}",channel1);
                channel1.writeAndFlush("from channel1");
            }
        });

    }

处理关闭

public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        log.debug("{}",channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String nextLine = scanner.nextLine();
                if ("q".equals(nextLine)){
                    channel.close();
                    break;
                }else {
                    channel.writeAndFlush(nextLine);
                }
            }
        }).start();


        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("等待关闭");
        closeFuture.sync();
        log.debug("处理关闭后的操作");

        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("关闭后的操作");
            }
        });

    }

同步方式关闭channel

ChannelFuture closeFuture = channel.closeFuture();
// sync 方法会 阻塞住当前线程,直到channel.close()结束
closeFuture.sync();
log.debug("处理关闭后的操作");


// 日志
14:07:39.076 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x21db7584, L:/127.0.0.1:52195 - R:localhost/127.0.0.1:8080] CLOSE
14:07:39.077 [main] DEBUG com.sky.netty.ch04_channel.TestChannelClose - 处理关闭后的操作
可以看到,最后这一行,打印日志从操作是main线程执行的

异步方式关闭channel
添加一个异步的监听器,channel.close()执行后会回调此监听

        ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("关闭后的操作");
            }
        });

//
14:13:39.864 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xcce04947, L:/127.0.0.1:53484 - R:localhost/127.0.0.1:8080] CLOSE
14:13:39.866 [nioEventLoopGroup-2-1] DEBUG com.sky.netty.ch04_channel.TestChannelClose - 关闭后的操作
14:13:39.867 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xcce04947, L:/127.0.0.1:53484 ! R:localhost/127.0.0.1:8080] INACTIVE
14:13:39.867 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xcce04947, L:/127.0.0.1:53484 ! R:localhost/127.0.0.1:8080] UNREGISTERED
第二行可以看到,打印日志操作时在nioEventLoopGroup-2-1线程执行的

问题:以上示例,在channel.close()后,java程序仍然没有停止?

NioEventLoopGroup的线程仍然没有结束,需要手动调用group.shutdownGracefully()

ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("关闭后的操作");
                // 关闭nio线程
                group.shutdownGracefully();
            }
        });


14:21:06.351 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x4d9bf82a, L:/127.0.0.1:54970 - R:localhost/127.0.0.1:8080] CLOSE
14:21:06.353 [nioEventLoopGroup-2-1] DEBUG com.sky.netty.ch04_channel.TestChannelClose - 关闭后的操作
14:21:06.364 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x4d9bf82a, L:/127.0.0.1:54970 ! R:localhost/127.0.0.1:8080] INACTIVE
14:21:06.365 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x4d9bf82a, L:/127.0.0.1:54970 ! R:localhost/127.0.0.1:8080] UNREGISTERED
14:21:08.459 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.PoolThreadCache - Freed 1 thread-local buffer(s) from thread: nioEventLoopGroup-2-1
Disconnected from the target VM, address: '127.0.0.1:54961', transport: 'socket'

Future & Promise

Handler & Pipline

ChannelHandler 用来处理Channel上的各种事件,分为入站、出站。所有ChannelHandler连成一串就是Pipline。
ChannelInboundHandlerAdapter: 入站handler,所有handler按顺序执行
ChannelOutboundHandlerAdapter: 出站handler,服务器对外写数据时,按逆序执行

ByteBuf

netty对NIO 中ByteBuffer的进一步封装

创建

  1. 创建池化基于堆的ByteBuf

基于堆的可以受GC管控,自动进行垃圾回收
ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(10);

  1. 创建池化基于直接内存的ByteBuf

直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer(10);

池化 VS 非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置 -Dio.netty.allocator.type={unpooled|pooled}

结构

image.png`

  • 分别设置读写指针:随意读写,不需要向NIO 的Buffer那样调用flip方法切换读写模式
  • 还要不超过最大容量,可以自动扩容

    写入

扩容

扩容规则

  • 如何写入后数据大小未超过 512字节,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512字节,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是

210 =1024,(29 = 512 已经不够了)

  • 扩容不能超过 max capacity 会报错

    读取

retain & release

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。不同的实现需要不同方法回收

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty采用引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口。
此接口提供了引用计数相关的方法

  • 每个ByteBuf对象的初始计数为1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

    ByteBuf由谁释放?

    ByteBuf在Pipline中各个处理器之间传递,由最后使用的处理器释放。Pipline中有Head、Tail两个默认处理器,由他们俩个释放 ```java public class DefaultChannelPipeline implements ChannelPipeline {

    final class HeadContext extends AbstractChannelHandlerContext

      implements ChannelOutboundHandler, ChannelInboundHandler {
      ......
      @Override
      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
          // 此方法中判断是否释放内存
          unsafe.write(msg, promise);
      }        
      ......    
    

    }

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    ......
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 此方法中判断是否释放内存
        onUnhandledInboundMessage(ctx, msg);
    }
    ......

}

}

<a name="rd1k4"></a>
### slice 切割
![image.png](https://cdn.nlark.com/yuque/0/2022/png/111742/1642838349705-ca3aee3e-3f7f-45df-b3de-ece6c31f0842.png#clientId=u8aa03da1-a962-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=129&id=u4faba4fb&margin=%5Bobject%20Object%5D&name=image.png&originHeight=257&originWidth=434&originalType=binary&ratio=1&rotation=0&showTitle=false&size=19671&status=done&style=none&taskId=ufc22bd51-e61b-4ab2-8e5c-fd11327f5d4&title=&width=217)
<a name="eZIgC"></a>
### duplicate 复制
![image.png](https://cdn.nlark.com/yuque/0/2022/png/111742/1642838357409-52862a8e-4ec8-421f-9798-eaffe0105318.png#clientId=u8aa03da1-a962-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=128&id=u5b5547f7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=256&originWidth=354&originalType=binary&ratio=1&rotation=0&showTitle=false&size=12760&status=done&style=none&taskId=u68c63696-6b6a-4ae2-9d82-51e991f8e50&title=&width=177)
<a name="ozZBE"></a>
### copy 拷贝
开辟一块新的内存,将数据进行深拷贝,读写操作都与原数据无关。

<a name="pKxTy"></a>
### CompositeByteBuf 合并
将多个ByteBuf **逻辑**上合并为一个,实际上并没有发生内存复制,数据还是存储在原内存中
```java
CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(buf1,buf2);

ByteBuf 优势总结

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

BootStrap 引导类

Netty通过引导将各个组件结合在一个并启用。Netty的处理方式可以使网络层和应用程序相隔离。

Bootstrap 类

AbstractBootstrap
具体的引导类分别看作用于服务器和客户端的引导;

  • 两种应用程序类型之间通用的引导步骤由 AbstractBootstrap 处理
  • 服务器通过ServerBootStrap引导。服务器程序创建一个父 Channel 来接受来自客户端的连接,并创建子 Channel 以用于它们之间的通信
  • 客户端通过BootStrap引导。只需要一个 单独的、没有父 Channel 的 Channel 来用于所有的网络交互。

    为什么引导类是 Cloneable 的? 有时需要创建多个类似配置的Channel。在一个已经配置完成的引导类实例上调用 clone() 方法将返回另一个可以立即使用的引 导类实例。这种方式只会创建引导类实例的 EventLoopGroup 的一个浅拷贝,所以,这个EventLoopGroup将在所有克 隆的 Channel 实例之间共享。这是可以接受的,因为通常这些克隆的 Channel 的生命周期都很短暂,一 个典型的场景是——创建一个 Channel 以进行一次HTTP请求。

引导客户端

引导的过程中,调用bind()或者connect()前,必须配置以下组件:

  • group(),配置EventLoopGroup;
  • channel()或者channelFactory(),配置channle实现类
  • handler(),配置用于处理Channel I/O事件的处理器

引导服务器

netty程序:
启动器+初始化器+处理器