1、Channel主要成员和方法

几乎所有的Netty通道都继承了AbstractChannel抽象类,都拥有parentpipeline两个成员
主要成员:

  • parent
    • 对于连接监听通道(如NioServerSocketChannel),其parent属性为null
    • 对于传输通道(如NioSocketChannel),其parent属性为接收到该连接的通道
  • pipeline
    • 表示该通道的处理器流水线,该属性被初始化为DefaultChannelPipeline的实例 ```json //父通道 private final Channel parent; private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline;

protected AbstractChannel(Channel parent) { this.parent = parent; this.id = this.newId(); //新建一个底层的NIO通道,完成实际的IO操作 this.unsafe = this.newUnsafe(); //新建一条通道流水线 this.pipeline = this.newChannelPipeline(); }

  1. protected AbstractChannel(Channel parent, ChannelId id) {
  2. this.parent = parent;
  3. this.id = id;
  4. this.unsafe = this.newUnsafe();
  5. this.pipeline = this.newChannelPipeline();
  6. }
  1. **接口中定义的重要方法:**
  2. - `ChannelFuture connect(SocketAddress remoteAddress)`
  3. - 连接远程服务器,调用后立即返回,返回值为执行连接操作的异步任务
  4. - `ChannelFuture bind(SocketAddress localAddress)`
  5. - 绑定监听地址<br />
  6. - `ChannelFuture close()`
  7. - 关闭通道连接,返回连接关闭的异步任务
  8. - `ChannelFuture closeFuture() `
  9. - 用来处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭;而 addListener 方法是异步等待 channel 关闭
  10. - `Channel read()`
  11. - 读取通道数据,并启动入站处理<br />
  12. - `ChannelFuture write(Object msg)`
  13. - 启动出站流水处理,将最终处理数据结果写到底层通道<br />
  14. - `Channel flush()`
  15. - 将缓冲区的数据立即写出到对端<br />
  16. <a name="NQIDj"></a>
  17. # 2、EmbeddedChannel
  18. 开发人员可以通过EmbeddedChannel方便快速的进行ChannelHandler业务处理器的单元测试,避免每开发一个业务处理器都进行服务器和客户端的重复启动。
  19. ![](https://cdn.nlark.com/yuque/0/2021/png/1911003/1622950440129-4f8bc5e5-cd37-445c-8cd3-407a289c4186.png#align=left&display=inline&height=436&margin=%5Bobject%20Object%5D&originHeight=436&originWidth=920&size=0&status=done&style=none&width=920)
  20. ```json
  21. @Slf4j
  22. public class TestEmbeddedChannel {
  23. public static void main(String[] args) {
  24. ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
  25. @Override
  26. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  27. log.info("入站处理器h1");
  28. super.channelRead(ctx, msg);
  29. }
  30. };
  31. ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
  32. @Override
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  34. log.info("入站处理器h2");
  35. super.channelRead(ctx, msg);
  36. }
  37. };
  38. ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
  39. @Override
  40. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  41. log.info("出站处理器h3");
  42. super.write(ctx, msg, promise);
  43. }
  44. };
  45. ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
  46. @Override
  47. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  48. log.info("出站处理器h4");
  49. super.write(ctx, msg, promise);
  50. }
  51. };
  52. EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1,h2,h3,h4);
  53. embeddedChannel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
  54. embeddedChannel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
  55. }
  56. }