任务队列中的Task有三种典型使用场景

1.用户自定义的普通任务
  1. //比如这里我们有一个很耗时间的业务 --》异步执行,提交该channel对应的NIOEvent的taskQueue中
  2. //方案1 自定义普通任务
  3. ctx.channel().eventLoop().execute(() -> {
  4. try {
  5. Thread.sleep(10 * 1000);
  6. ctx.writeAndFlush(Unpooled.copiedBuffer("任务队列", StandardCharsets.UTF_8));
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. });

2.用户自定义定时任务
  1. //方案二 自定义定时任务 --》该任务提交到了scheduleTaskQueue
  2. ctx.channel().eventLoop().schedule(() -> {
  3. try {
  4. Thread.sleep(10 * 1000);
  5. ctx.writeAndFlush(Unpooled.copiedBuffer("自定义延时", StandardCharsets.UTF_8));
  6. } catch (Exception e) {
  7. e.printStackTrace();
  8. }
  9. }, 5, TimeUnit.SECONDS);

主要多个任务之间要按顺序执行,因为是同一个线程

3.非当前Reactor线程调用Channel的各种方法

例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入这种场景,最终的Write会提交到任务队列中被异步消费

在创建服务器的启动对象并设置参数的时候会调用下面的方法创建一个channel初始化对象,可以在此处得到其他Reactor对应的Channel

  1. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  2. //给pipeline设置处理器
  3. @Override
  4. protected void initChannel(SocketChannel ch) throws Exception {
  5. ch.pipeline().addLast(new NettyServerHandler());
  6. }
  7. }); //给我们的workerGroup的某一个 EventLoop设置对应的处理器

Netty方案的再说明:

1.Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络读写操作。
2.NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的socket网络通道
3.NioEventLoop内部采用串行化设计,从消息的读取->解码->编码->发送,使用由IO线程NioEventLoop负责


NioEventLoopGroup 下包含多个NioEventLoop
每个 NioEventLoop中包含一个Selector,一个taskQueue
每个NioEventLoop的Selector上可以注册监听多个NioChannel
每个NioChannel只会绑定在唯一的NioEventLoop上
每个NioChannel都绑定有一个自己的ChannelPipeline