包含自定义任务、定时任务等方法,当异步任务较多时需要添加将任务到线程池,通常有添加到Handler和Context两种方式

自定义任务:

当客户端的Channel与EventLoopGroup绑定后,在通道没有关闭之前,对应通道都会一直使用当初绑定的线程进行业务操作
image.png
Channel和EventLoop线程绑定关系

自定义普通任务:

在服务端读取客户端通道数据时往往需要做业务操作,当操作较为耗时时需要使用异步方式进行处理(异步任务执行的线程依旧是使用服务端ServerBootstrap定义的workGroup,只是操作是异步的),在添加多个异步任务时需要等待
对服务端自定义Handler的channelRead方法进行修改即可

代码:

  1. package simple;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.concurrent.TimeUnit;
  8. /** 自定义管道 Handler
  9. * 1、自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAddapter
  10. * 2、这时我们自定义一个 Handler,才能称为一个 Handler
  11. */
  12. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  13. /*当有读取事件时该方法将被触发
  14. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  15. * 参数二: Object 客户端发送的数据,默认是Object需要转换
  16. */
  17. @Override
  18. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  19. /* 用户自定义普通任务 : 使用异步处理 > 找到客户端 Channel 对应的 NioEventLoop 的 TaskQueue 中
  20. * 用于处理耗时非常长的业务处理方式
  21. */
  22. ctx.channel().eventLoop().execute(()->{
  23. try {
  24. System.out.println("ExecuteThread1 is: "+Thread.currentThread().getName());
  25. TimeUnit.SECONDS.sleep(3);
  26. ctx.writeAndFlush(Unpooled.copiedBuffer("3秒的异步任务已完成!",StandardCharsets.UTF_8));
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. });
  31. ctx.channel().eventLoop().execute(()->{
  32. try {
  33. System.out.println("ExecuteThread2 is: "+Thread.currentThread().getName());
  34. TimeUnit.SECONDS.sleep(10);
  35. ctx.writeAndFlush(Unpooled.copiedBuffer("10秒的异步任务已完成!",StandardCharsets.UTF_8));
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. });
  40. System.out.println("异步任务开始处理!"+LocalDateTime.now()+" 当前线程是: "+Thread.currentThread().getName());
  41. }
  42. /* 数据读取完毕后触发
  43. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  44. */
  45. @Override
  46. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  47. //对发送数据进行编码后写入到缓存并刷新
  48. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!",StandardCharsets.UTF_8));
  49. }
  50. /* 处理异常,一般为关闭通道 */
  51. @Override
  52. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  53. ctx.close(); //和 ctx.channel().close() 是一个意思
  54. }
  55. }

测试:

image.png
服务端使用异步方法轮流处理任务

自定义定时任务:

指定延时时间运行任务,定时任务被添加到 ScheduledTaskQueue 中,优先级比普通任务的 TaskQueue 低(异步任务执行的线程依旧是使用服务端ServerBootstrap定义的workGroup,只是操作是异步的
定时时间以添加到队列中为开始,当之前存在普通任务时,存在普通任务执行结束后直接执行定时任务的情况,因此需要设置好定时时间
对服务端自定义Handler的channelRead方法进行修改即可

代码:

  1. package simple;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import java.nio.charset.StandardCharsets;
  7. import java.time.LocalDateTime;
  8. import java.util.concurrent.TimeUnit;
  9. /** 自定义管道 Handler
  10. * 1、自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAddapter
  11. * 2、这时我们自定义一个 Handler,才能称为一个 Handler
  12. */
  13. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  14. /*当有读取事件时该方法将被触发
  15. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  16. * 参数二: Object 客户端发送的数据,默认是Object需要转换
  17. */
  18. @Override
  19. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  20. ctx.channel().eventLoop().execute(()->{
  21. try {
  22. System.out.println("ExecuteThread2 is: "+Thread.currentThread().getName());
  23. TimeUnit.SECONDS.sleep(10);
  24. ctx.writeAndFlush(Unpooled.copiedBuffer("10秒的异步任务已完成!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. });
  29. System.out.println("异步任务开始处理!"+LocalDateTime.now()+" 当前线程是: "+Thread.currentThread().getName());
  30. /* 用户自定义定时任务,任务被添加到 NioEventLoop 的 ScheduledTaskQueue 中 */
  31. ctx.channel().eventLoop().schedule(()->{
  32. System.out.println("ExecuteThread3 is: "+Thread.currentThread().getName());
  33. ctx.writeAndFlush(Unpooled.copiedBuffer("自定义定时任务已完成!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  34. },5,TimeUnit.SECONDS);
  35. }
  36. /* 数据读取完毕后触发
  37. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  38. */
  39. @Override
  40. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  41. //对发送数据进行编码后写入到缓存并刷新
  42. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  43. }
  44. /* 处理异常,一般为关闭通道 */
  45. @Override
  46. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  47. ctx.close(); //和 ctx.channel().close() 是一个意思
  48. }
  49. }

测试:

image.png
定时任务队列排在普通任务后


线程池处理异步任务:

当客户端的Channel与EventLoopGroup和执行异步任务的DefaultEventLoopGroup绑定后,在通道没有关闭之前都会一直使用当初绑定的线程进行业务操作
image.png
Channel与EventLoop中线程绑定关系

Handler方式:

在handler创建线程组进行异步提交,异步方法使用的是单独创建的EventExecutorGroup内的线程而不是之前服务端中ServerBootstrap定义的workGroup,具体可参考源码中的 execute1
直接执行的方法使用的是workGroup线程,异步提交使用的是线程组线程
源码为 AbstractChannelHandlerContext 中的 write 方法

服务端Handler:

  1. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  2. static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); //充当业务线程池
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. group.submit(()->{
  6. try {
  7. System.out.println("ExecuteThread1 is: "+Thread.currentThread().getName());
  8. TimeUnit.SECONDS.sleep(3);
  9. /* 堆内存方式创建ByteBuf*/
  10. //ctx.writeAndFlush(Unpooled.copiedBuffer("3秒的异步任务已完成!"+ LocalDateTime.now(),StandardCharsets.UTF_8));
  11. /* 直接内存方式创建ButeBuf*/
  12. ByteBuf buffer = ctx.alloc().buffer();
  13. buffer.writeBytes( ("3秒的异步任务已完成!"+LocalDateTime.now()).getBytes(StandardCharsets.UTF_8) );
  14. ctx.writeAndFlush(buffer);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. });
  19. group.submit(()->{
  20. try {
  21. System.out.println("ExecuteThread2 is: "+Thread.currentThread().getName());
  22. TimeUnit.SECONDS.sleep(10);
  23. ctx.writeAndFlush(Unpooled.copiedBuffer("10秒的异步任务已完成!"+ LocalDateTime.now(),StandardCharsets.UTF_8));
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. });
  28. group.submit(()->{
  29. try {
  30. System.out.println("ExecuteThread3 is: "+Thread.currentThread().getName());
  31. TimeUnit.SECONDS.sleep(12);
  32. ctx.writeAndFlush(Unpooled.copiedBuffer("12秒的异步任务已完成!"+ LocalDateTime.now(),StandardCharsets.UTF_8));
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. System.out.println("异步任务开始处理!"+LocalDateTime.now()+" 当前线程是: "+Thread.currentThread().getName());
  38. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端开始异步任务开始处理~~~", CharsetUtil.UTF_8));
  39. }
  40. /* 数据读取完毕后触发
  41. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  42. */
  43. @Override
  44. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  45. //对发送数据进行编码后写入到缓存并刷新
  46. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  47. }
  48. /* 处理异常,一般为关闭通道 */
  49. @Override
  50. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  51. ctx.close();
  52. }
  53. }

测试:
异步任务使用内部线程组调用
image.png

Context方式:

将handler添加到pipeline的时候指定线程组即可,具体可参考源码中的 execute2
使用提交方法用的是workGroup线程执行任务,直接执行的是异步任务

服务端:

  1. package simple.execute2;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.util.concurrent.DefaultEventExecutorGroup;
  9. import io.netty.util.concurrent.EventExecutorGroup;
  10. import java.nio.charset.StandardCharsets;
  11. import java.time.LocalDateTime;
  12. import java.util.concurrent.TimeUnit;
  13. /* 异步任务添加到Handler版服务端代码 */
  14. public class NettyServer {
  15. static final EventExecutorGroup group1 = new DefaultEventExecutorGroup(2); //创建业务1线程池
  16. static final EventExecutorGroup group2 = new DefaultEventExecutorGroup(2); //创建业务2线程池
  17. public static void main(String[] args) throws InterruptedException {
  18. /* 创建 BossGroup 和 WorkerGroup 线程组
  19. * BossGroup 只处理连接请求,真正的和客户端业务处理会交给 WorkGrouop 完成
  20. * bossGroup 和 workerGroup 含有的子线程的个数默认为 CPU最大线程数 X 2,可以手动指定
  21. */
  22. EventLoopGroup bossGroup = new NioEventLoopGroup(1); //创建子线程的个数为1的 bossGroup
  23. EventLoopGroup workerGroup = new NioEventLoopGroup(8); //创建子线程的个数为8的 workerGroup
  24. try {
  25. //创建服务器端的启动对象,配置参数
  26. ServerBootstrap bootstrap = new ServerBootstrap();
  27. //使用链式参数配置启动参数
  28. bootstrap.group(bossGroup,workerGroup) //设置两个线程组
  29. .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
  30. .option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
  31. .childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
  32. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  33. //给PipeLine设置处理器
  34. @Override
  35. protected void initChannel(SocketChannel ch) throws Exception {
  36. ChannelPipeline pipeline = ch.pipeline();
  37. pipeline.addLast(group1,new ChannelInboundHandlerAdapter(){
  38. @Override
  39. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  40. /* 使用的是workerGroup的线程 */
  41. ctx.channel().eventLoop().execute(()->{
  42. try {
  43. System.out.println("ExecuteThread1 is: "+Thread.currentThread().getName());
  44. TimeUnit.SECONDS.sleep(3);
  45. ctx.writeAndFlush(Unpooled.copiedBuffer("耗时3秒的主要任务已完成!"+ LocalDateTime.now(),StandardCharsets.UTF_8));
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. });
  50. /* 非execute执行的使用的是自定义的group线程组线程 */
  51. System.out.println("异步任务开始处理!"+LocalDateTime.now()+" 当前线程是: "+Thread.currentThread().getName());
  52. TimeUnit.SECONDS.sleep(10);
  53. ctx.writeAndFlush(Unpooled.copiedBuffer("10秒的异步任务已完成!"+ LocalDateTime.now(),StandardCharsets.UTF_8));
  54. }
  55. });
  56. pipeline.addLast(group2,new ChannelInboundHandlerAdapter(){
  57. @Override
  58. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  59. //对发送数据进行编码后写入到缓存并刷新
  60. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!"+ LocalDateTime.now()+"当前线程是: "+Thread.currentThread().getName(), StandardCharsets.UTF_8));
  61. }
  62. });
  63. }
  64. });
  65. System.out.println("服务器 is ready .....");
  66. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  67. ChannelFuture cf = bootstrap.bind(8889).sync();
  68. //给CF注册监听器,监控关心的事件
  69. cf.addListener(new ChannelFutureListener() {
  70. @Override
  71. public void operationComplete(ChannelFuture channelFuture) throws Exception {
  72. if(cf.isSuccess()){
  73. System.out.println("监听端口成功");
  74. }else{
  75. System.out.println("监听端口失败");
  76. }
  77. }
  78. });
  79. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  80. cf.channel().closeFuture().sync();
  81. } finally {
  82. bossGroup.shutdownGracefully(); //关闭资源
  83. workerGroup.shutdownGracefully(); //关闭
  84. }
  85. }
  86. }

测试:
异步任务执行成功
image.png

优缺点对比:

第一种方式在handler中添加异步,可能更加的自由,比如如果需要访问数据库,那就异步,如果不需要,就不异步,异步会拖长接口啊应时间。因为需要将任务放进mpscTask 中。如果I0时间很短,Task 很多,可能一个循环下来,都没时间执行整个task,导致响应时间达不到指标
第二种方式是Netty标准方式(即加入到队列),不同的事件(业务)操作可以指定不同的线程池,但是会将整个handler都交给业务线程池,不论耗时不耗时,都加入到队列里,不够灵活

源码实现:

由 AbstractChannelHandlerContext 的 invokeChannelRead 对具体调用的线程进行判断
image.png