前言:

上篇文件了解了netty的演变过程,然后结合了两个demo感受了下,本篇文章重点介绍他的一些核心组件。

一 Bootstrap ,ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
常见方法:

方法名称 方法介绍
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 该方法用于服务器端,用来设置两个EventLoop
public B group(EventLoopGroup group) 该方法用于客户端,用来设置一个EventLoop
public B channel(Class<? extends C> channelClass) 该方法用来设置一个服务器端的通道实现
public B option(ChannelOption option, T value) 用来给 ServerChannel 添加配置
public ServerBootstrap childOption(ChannelOption childOption, T value) 用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler) 该方法用来设置业务处理类(自定义的 handler)
public ChannelFuture bind(int inetPort) 该方法用于服务器端,用来设置占用的端口号
public ChannelFuture connect(String inetHost, int inetPort) 该方法用于客户端,用来连接服务器

二 Future、ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
常见的方法有

方法名 方法介绍
Channel channel() 返回当前正在进行 IO 操作的通道
ChannelFuture sync() 等待异步操作执行完毕,相当于将阻塞在当前。

三 Channel

  1. Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  2. 通过Channel 可获得当前网络连接的通道的状态
  3. 通过Channel 可获得 网络连接的配置参数 (例如接收缓冲区大小)
  4. Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
  5. 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方
  6. 支持关联 I/O 操作与对应的处理程序
  7. 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应

常用的 Channel 类型

名称 介绍
NioSocketChannel 异步的客户端 TCP Socket 连接。
NioServerSocketChannel 异步的服务器端 TCP Socket 连接
NioDatagramChannel 异步的 UDP 连接。
NioSctpChannel 异步的客户端 Sctp 连接。
NioSctpServerChannel 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

四 Selector

  1. Netty 基于Selector对象实现 I/O 多路复用,通过Selector 一个线程可以监听多个连接的 Channel 事件。
  2. 当向一个Selector 中注册Channel 后,Selector内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
  3. 同时,Netty中对selector中的selectedKey集合进行了替换,它替换成了一个它自己实现的一个set集合,这样效率更高。

    五 ChannelHandler及其实现类

  4. ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。

  5. ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  6. 我们经常需要自定义一个 Handler类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法 ```java public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    //通道注册事件 @Skip @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    1. ctx.fireChannelRegistered();

    }

    //通道取消注册事件 @Skip @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    1. ctx.fireChannelUnregistered();

    }

    //通道就绪事件 @Skip @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {

    1. ctx.fireChannelActive();

    }

    /**

    • Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
    • to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. *
    • Sub-classes may override this method to change behavior. */ @Skip @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); }

      //通道读取数据事件 @Skip @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }

      //通道数据读取完毕事件 @Skip @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); }

      /**

    • Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
    • to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. *
    • Sub-classes may override this method to change behavior. */ @Skip @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); }

      /**

    • Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
    • to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. *
    • Sub-classes may override this method to change behavior. */ @Skip @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); }

      //通道发生异常事件 @Skip @Override @SuppressWarnings(“deprecation”) public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

      1. throws Exception {

      ctx.fireExceptionCaught(cause); } }

  1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601075440273-97fa646f-5950-4e9a-8504-d2764f31be88.png#height=370&id=Qqelm&margin=%5Bobject%20Object%5D&name=image.png&originHeight=740&originWidth=1234&originalType=binary&ratio=1&size=190169&status=done&style=none&width=617)
  2. - ChannelInboundHandler 用于处理入站 I/O 事件。
  3. - ChannelOutboundHandler 用于处理出站 I/O 操作。
  4. 适配器
  5. - ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
  6. - ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
  7. - ChannelDuplexHandler 用于处理入站和出站事件。
  8. <a name="0Th5h"></a>
  9. # 六 Pipeline和ChannelPipeline
  10. 1. ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是保存ChannelHandlerList,用于处理或拦截 Channel 的入站事件和出站操作)
  11. 1. ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互
  12. 1. Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下
  13. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601075692411-193de8b1-3a8c-4219-8a6d-705f0eeb724f.png#height=221&id=XZIwW&margin=%5Bobject%20Object%5D&name=image.png&originHeight=442&originWidth=1774&originalType=binary&ratio=1&size=740244&status=done&style=none&width=887)
  14. - 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
  15. - 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰
  16. **常用方法:**
  17. | 方法名 | 介绍 |
  18. | --- | --- |
  19. | ChannelPipeline addFirst(ChannelHandler... handlers) | 把一个业务处理类(handler)添加到链中的第一个位置 |
  20. | ChannelPipeline addLast(ChannelHandler handlers) | 把一个业务处理类(handler)添加到链中的最后一个位置 |
  21. <a name="Xzy0v"></a>
  22. # 七 ChannelHandlerContext
  23. 1. 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
  24. 1. ChannelHandlerContext ChannelHandler ChannelHandlerContext 中也绑定了对应的 pipeline Channel 的信息,方便对 ChannelHandler进行调用.
  25. **常用方法:**
  26. | 方法名 | 介绍 |
  27. | --- | --- |
  28. | ChannelFuture close() | 关闭通道 |
  29. | ChannelOutboundInvoker flush() | 刷新 |
  30. | ChannelFuture writeAndFlush(Object msg) | 将数据写到ChannelPipeline中当前ChannelHandler的下一个ChannelHandler开始处理(出站) |
  31. <a name="Cnhqu"></a>
  32. # 八 ChannelOption
  33. Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。<br />**ChannelOption 参数如下:**
  34. 2. ChannelOption.SO_BACKLOG
  35. 1. 对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。
  36. 1. 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定了队列的大小。
  37. 4. ChannelOption.SO_KEEPALIVE
  38. 1. 一直保持连接活动状态
  39. <a name="6uJJv"></a>
  40. # 九 EventLoopGroup和其实现类NioEventLoopGroup
  41. 1. EventLoopGroup是一组EventLoop的抽象,Netty为了更好的利用多核 CPU资源,一般会有多 EventLoop同时工作,每个EventLoop维护着一个 Selector实例。
  42. 1. EventLoopGroup 提供next 接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup WorkerEventLoopGroup
  43. 1. 通常一个服务端口即一个ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop负责接收客户端的连接,并将 SocketChannel交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示
  44. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601076468894-f3c62d68-f043-462a-979b-1cfa7b688c01.png#height=323&id=oPUqP&margin=%5Bobject%20Object%5D&name=image.png&originHeight=646&originWidth=1382&originalType=binary&ratio=1&size=160992&status=done&style=none&width=691)
  45. - BossEventLoopGroup 通常是一个单线程的 EventLoopEventLoop 维护着一个注册了ServerSocketChannel Selector 实例BossEventLoop 不断轮询 Selector 将连接事件分离出来
  46. - 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup
  47. - WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理
  48. **常用方法:**
  49. | 方法名 | 介绍 |
  50. | --- | --- |
  51. | public NioEventLoopGroup() | 构造方法 |
  52. | public Future<?> shutdownGracefully() | 断开连接,关闭线程 |
  53. <a name="S7R9W"></a>
  54. # 十 Unpooled
  55. Netty 提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类<br />**常用方法**
  56. | 方法名 | 介绍 |
  57. | --- | --- |
  58. | public static ByteBuf copiedBuffer(CharSequence string, Charset charset) | 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 但有区别) |
  59. **代码示例**
  60. ```java
  61. //创建一个ByteBuf
  62. //1、创建对象,该对象包含一个数组,是一个byte[10]
  63. //2、在netty的buffer中,写入数据后再读取数据不需要使用 flip 进行反转
  64. // 底层维护了 readerIndex 和 writeIndex
  65. //往buffer中写的范围为 [writeIndex, capacity)
  66. //往buffer中可读的范围为 [readerIndex, writeIndex)。使用 buf.readByte() 会往后移动 readerIndex 指针,使用 buf.getByte(i) 通过索引获取就不会移动该指针
  67. ByteBuf byteBuf = Unpooled.buffer(10);
  68. for (int i = 0; i < 10; i++) {
  69. byteBuf.writeByte(i);
  70. }
  71. //获取该buf的大小
  72. int capacity = byteBuf.capacity();
  73. //输出
  74. for (int i = 0; i < byteBuf.capacity(); i++) {
  75. System.out.println(byteBuf.getByte(i));
  76. System.out.println(byteBuf.readByte());
  77. }
  78. byte[] content = byteBuf.array();
  79. //将content转成字符串
  80. String c = new String(content, StandardCharsets.UTF_8);
  81. //数组偏移量
  82. int offset = byteBuf.arrayOffset();
  83. //获取读取偏移量
  84. int readerIndex = byteBuf.readerIndex();
  85. //获取写偏移量
  86. int writerIndex = byteBuf.writerIndex();
  87. //获取容量
  88. int capacity = byteBuf.capacity();
  89. //获取可读取的字节数
  90. int readableBytes = byteBuf.readableBytes();
  91. //通过索引获取某个位置的字节
  92. byte aByte = byteBuf.getByte(0);
  93. //获取Buf中某个范围的字符序列
  94. CharSequence charSequence = byteBuf.getCharSequence(0, 4, StandardCharsets.UTF_8);

十一 Netty群聊系统

实例要求:

  • 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  • 实现多人群聊
  • 服务器端:可以监测用户上线,离线,并实现消息转发功能
  • 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
  • 目的:进一步理解Netty非阻塞网络编程机制

    11.1 服务端代码

    ```java /*启动类**/ public class GroupChatServer { private int port; //监听端口 public GroupChatServer(int port){

    1. this.port = port;

    } //编写run 方法,处理客户端的请求 public void run() throws InterruptedException {

    1. //创建两个线程组
    2. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    3. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    4. try {
    5. ServerBootstrap serverBootstrap = new ServerBootstrap();
    6. serverBootstrap.group(bossGroup, workerGroup)
    7. .channel(NioServerSocketChannel.class)
    8. .option(ChannelOption.SO_BACKLOG, 128)
    9. .childOption(ChannelOption.SO_KEEPALIVE, true)
    10. .childHandler(new ChannelInitializer<SocketChannel>() {
    11. @Override
    12. protected void initChannel(SocketChannel socketChannel) throws Exception {
    13. ChannelPipeline pipeline = socketChannel.pipeline();
    14. pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入解码器
    15. pipeline.addLast("encoder", new StringEncoder()); //加入编码器
    16. pipeline.addLast(new GroupChatServerHandler());
    17. }
    18. });
    19. System.out.println("netty服务器启动");
    20. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
    21. //监听关闭事件
    22. channelFuture.channel().closeFuture().sync();
    23. } finally {
    24. bossGroup.shutdownGracefully();
    25. workerGroup.shutdownGracefully();
    26. }

    } public static void main(String[] args) throws InterruptedException {

    1. GroupChatServer groupChatServer = new GroupChatServer(7000);
    2. groupChatServer.run();

    } }

/*Handler**/ public class GroupChatServerHandler extends SimpleChannelInboundHandler {

  1. //定义一个Channel组,管理所有的channel
  2. //GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
  3. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  4. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  5. //此方法表示连接建立,一旦建立连接,就第一个被执行
  6. @Override
  7. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  8. Channel channel = ctx.channel();
  9. //该方法会将 channelGroup 中所有 channel 遍历,并发送消息,而不需要我们自己去遍历
  10. channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + sdf.format(new Date()) + "加入聊天\n");
  11. //将当前的Channel加入到 ChannelGroup
  12. channelGroup.add(channel);
  13. }
  14. //表示 channel 处于活动状态,提示 xxx 上线
  15. @Override
  16. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  17. System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "上线了~");
  18. }
  19. //表示 channel 处于不活动状态,提示 xxx 离线
  20. @Override
  21. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  22. System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "离线了~");
  23. }
  24. //表示 channel 断开连接,将xx客户离开信息推送给当前在线客户
  25. @Override
  26. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  27. Channel channel = ctx.channel();
  28. channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() +" "+ sdf.format(new Date()) + "离开了\n");
  29. System.out.println("当前channelGroup大小 :" + channelGroup.size());
  30. }
  31. //读取数据,并进行消息转发
  32. @Override
  33. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  34. //获取当前channel
  35. Channel channel = ctx.channel();
  36. //这时,遍历channelGroup,根据不同的情况,回送不同的消息
  37. channelGroup.forEach(item -> {
  38. if (item != channel) {
  39. item.writeAndFlush("[客户]" + channel.remoteAddress() + "发送了消息:" + msg + "\n");
  40. } else { //把自己发送的消息发送给自己
  41. item.writeAndFlush("[自己]发送了消息:" + msg + "\n");
  42. }
  43. });
  44. }
  45. @Override
  46. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  47. ctx.close();
  48. }

}

  1. <a name="Z77Ue"></a>
  2. ## 11.2 客户端代码:
  3. ```java
  4. /*********************启动类******************/
  5. public class GroupChatClient {
  6. //属性
  7. private final String host;
  8. private final int port;
  9. public GroupChatClient(String host, int port) {
  10. this.port = port;
  11. this.host = host;
  12. }
  13. public void run() throws InterruptedException {
  14. NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
  15. try {
  16. Bootstrap bootstrap = new Bootstrap()
  17. .group(eventExecutors)
  18. .channel(NioSocketChannel.class)
  19. .handler(new ChannelInitializer<SocketChannel>() {
  20. @Override
  21. protected void initChannel(SocketChannel ch) throws Exception {
  22. ChannelPipeline pipeline = ch.pipeline();
  23. //加入Handler
  24. pipeline.addLast("decoder", new StringDecoder());
  25. pipeline.addLast("encoder", new StringEncoder());
  26. pipeline.addLast(new GroupChatClientHandler());
  27. }
  28. });
  29. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
  30. //得到channel
  31. Channel channel = channelFuture.channel();
  32. System.out.println("--------" + channel.localAddress() + "---------");
  33. //客户端需要输入信息,创建一个扫描器
  34. Scanner scanner = new Scanner(System.in);
  35. while (scanner.hasNextLine()){
  36. String msg = scanner.nextLine();
  37. //通过channel发送到服务器端
  38. channel.writeAndFlush(msg + "\r\n");
  39. }
  40. } finally {
  41. eventExecutors.shutdownGracefully();
  42. }
  43. }
  44. public static void main(String[] args) throws InterruptedException {
  45. new GroupChatClient("127.0.0.1", 7000).run();
  46. }
  47. }
  48. /*****************Handler*****************/
  49. public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
  50. @Override
  51. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  52. System.out.println(msg.trim());
  53. }
  54. }

十二 Netty心跳检测机制案例

实例要求:

  • 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲
  • 当服务器超过5秒没有写操作时,就提示写空闲
  • 实现当服务器超过7秒没有读或者写操作时,就提示读写空闲 ```java public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    try {

    1. ServerBootstrap serverBootstrap = new ServerBootstrap();
    2. serverBootstrap.group(bossGroup, workerGroup)
    3. .channel(NioServerSocketChannel.class)
    4. .handler(new LoggingHandler(LogLevel.INFO)) //为BossGroup中的请求添加日志处理Handler
    5. .childHandler(new ChannelInitializer<SocketChannel>() {
    6. @Override
    7. protected void initChannel(SocketChannel ch) throws Exception {
    8. ChannelPipeline pipeline = ch.pipeline();
    9. //加入一个 netty 提供的 IdleStateHandler
    10. /**
    11. * 1、IdleStateHandler 是 netty 提供的检测空闲状态的处理器
    12. * 2、long readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包检测是否还是连接的状态
    13. * 3、long writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包检测是否还是连接的状态
    14. * 4、long allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包检测是否还是连接的状态
    15. * 5、当 IdleStateEvent 触发后,就会传递给管道的下一个 Handler,通过调用(触发)下一个Handler的 userEventTriggered,在该方法区处理这个事件。
    16. */
    17. pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
    18. //加入一个对空闲检测进一步处理的Handler(自定义)
    19. pipeline.addLast(new MyServerHandler());
    20. }
    21. });
    22. //启动服务器,设置为同步模式。
    23. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
    24. channelFuture.channel().closeFuture().sync();

    } finally {

    1. bossGroup.shutdownGracefully();
    2. workerGroup.shutdownGracefully();

    } }

  1. Handler
  2. ```java
  3. public class MyServerHandler extends ChannelInboundHandlerAdapter {
  4. /**
  5. * @param ctx 上下文
  6. * @param evt 事件
  7. * @throws Exception
  8. */
  9. @Override
  10. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  11. if (evt instanceof IdleStateEvent) {
  12. //将 evt 向下转型 IdleStateEvent
  13. IdleStateEvent event = (IdleStateEvent)evt;
  14. String eventTye = null;
  15. switch (event.state()) {
  16. case READER_IDLE:
  17. eventTye = "读空闲";
  18. break;
  19. case WRITER_IDLE:
  20. eventTye = "写空闲";
  21. break;
  22. case ALL_IDLE:
  23. eventTye = "读写空闲";
  24. break;
  25. }
  26. System.out.println(ctx.channel().remoteAddress() +"---超时时间--" + eventTye);
  27. System.out.println("服务器做相应处理。。");
  28. }
  29. }
  30. }

十三 Netty建立Websocket连接

实例要求:

  • Http协议是无状态的, 浏览器和服务器间的请求响应一次,下一次会重新创建连接.
  • 要求:实现基于webSocket的长连接的全双工的交互
  • 改变Http协议多次请求的约束,实现长连接了, 服务器可以发送消息给浏览器
  • 客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知 ```java public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try{

    1. ServerBootstrap serverBootstrap = new ServerBootstrap();
    2. serverBootstrap.group(bossGroup, workerGroup)
    3. .channel(NioServerSocketChannel.class)
    4. .handler(new LoggingHandler(LogLevel.INFO))
    5. .childHandler(new ChannelInitializer<SocketChannel>() {
    6. @Override
    7. protected void initChannel(SocketChannel ch) throws Exception {
    8. ChannelPipeline pipeline = ch.pipeline();
    9. //因为是基于Http协议,所以要使用Http的编码和解码器
    10. pipeline.addLast(new HttpServerCodec());
    11. //是以块方式写,添加ChunkedWriter处理器
    12. pipeline.addLast(new ChunkedWriteHandler());
    13. /**
    14. * 1、http数据在传输过程中是分裂的,HttpObjectAggregator就可以将多个段聚合
    15. * 2、这就是为什么,当浏览器发送大量数据时,就会发出多次http请求
    16. */
    17. pipeline.addLast(new HttpObjectAggregator(8192));
    18. /**
    19. * 1、对于websocket,它的数据是以帧的形式传递的
    20. * 2、可以看到 WebsocketFrame 下面有六个子类
    21. * 3、浏览器请求时:ws://localhost:7000/hello 表示请求的uri
    22. * 4、WebSocketServerProtocolHandler 核心功能是将 http 协议升级为 ws 协议,保持长连接
    23. * 5、从Http协议升级到Websocket协议,是通过StatusCode 101(Switching Protocols)来切换的。
    24. */
    25. pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
    26. //自定义Handler,处理业务逻辑
    27. pipeline.addLast(new MyTextWebSocketFrameHandler());
    28. }
    29. });
    30. ChannelFuture sync = serverBootstrap.bind(7000).sync();
    31. sync.channel().closeFuture().sync();

    } finally {

    1. bossGroup.shutdownGracefully();
    2. workerGroup.shutdownGracefully();

    } }

```

十四 套路总结

  1. 创建启动类:
    1. 首先初始化两个NioEventLoopGroup。其中BossGroup一般设置线程为1
    2. 初始化一个ServerBootStrap类。并调用它设置很多参数。
      1. group():服务端设置两个Group,客户端设置一个Group
      2. chnnel():服务端传入NioServerSocketChannel,客户端传入NioSocketChannel
      3. option():服务端给BossGroup设置SO_BACKLOG任务队列大小
      4. childOption():服务端给WorkerGroup设置连接SO_KEEPALIVE保持连接状态
      5. handler():服务端给BossGroup设置Handler,客户端设置Handler
      6. childHandler():服务端给WorkerGroup设置Handler。
    3. 通过BootStrap去绑定端口,监听关闭事件。设置为同步
  2. Handler:
    1. SimpleChannelInboundHandler
      1. 可以继承它来处理很多通信。经过上面几个案例推敲,一般写自己的Handler继承它就可以了
    2. ChannelInboundHandlerAdapter
      1. 这个是上一个的父类,我们在心跳检测的时候通过继承它的userEventTriggered去判断连接状态
      2. 其实通过上面那个simple也可以继承这个trigger
    3. IdleStateHandler
      1. 在心跳检测时我们要通过这个Handler去触发上面的trigger
    4. HttpServerCodec
      1. 提供好的用于Http编码解码,一般用于Http请求
    5. ChunkedWriteHandler
      1. 提供好的Handler,以块方式写,添加ChunkedWriter处理器
      2. 我搜了一下,它一般用于发送大文件。这个东西使我们在Websocket的时候用的。
    6. HttpObjectAggregator
      1. 它会将http数据聚合在一起发送
    7. WebSocketServerProtocolHandler
      1. 传入ws路径,将Http协议升级成为ws协议
  3. Netty中通信数据实体:
    1. TextWebSocketFrame
      1. 这是我们在websocket连接的时候用的,它表示一个文本帧,是websocket进行通信的数据形式
    2. HttpObject
      1. 这是我们在建立Http连接的时候用到的,可以将它转换成一个HttpRequest

Hander常用方法:

方法名 介绍
channelRead0(ChannelHandlerContext channelHandlerContext, T t) 读取数据,并进行消息转发
handlerAdded(ChannelHandlerContext ctx) 连接建立,一旦建立连接,就第一个被执行
channelActive(ChannelHandlerContext ctx) 表示 channel 处于活动状态,提示 xxx 上线
channelInactive(ChannelHandlerContext ctx) 表示 channel 处于不活动状态,提示 xxx 离线
handlerRemoved(ChannelHandlerContext ctx) 表示 channel 断开连接,将xx客户离开信息推送给当前在线客户
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出现错误如何进行处理
userEventTriggered(ChannelHandlerContext ctx, Object evt) 事件触发器,通过判断evt的类型去判断发生了什么事件,再通过里面的属性判断事件发生的类型。我们在IdleStateHandler后面加上一个触发器,可以检测心跳。