1、Channel主要成员和方法
几乎所有的Netty通道都继承了AbstractChannel
抽象类,都拥有parent
和pipeline
两个成员
主要成员:
- parent
- 对于连接监听通道(如NioServerSocketChannel),其parent属性为null
- 对于传输通道(如NioSocketChannel),其parent属性为接收到该连接的通道
- pipeline
- 表示该通道的处理器流水线,该属性被初始化为
DefaultChannelPipeline
的实例 ```json //父通道 private final Channel parent; private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline;
- 表示该通道的处理器流水线,该属性被初始化为
protected AbstractChannel(Channel parent) { this.parent = parent; this.id = this.newId(); //新建一个底层的NIO通道,完成实际的IO操作 this.unsafe = this.newUnsafe(); //新建一条通道流水线 this.pipeline = this.newChannelPipeline(); }
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
**接口中定义的重要方法:**
- `ChannelFuture connect(SocketAddress remoteAddress)`
- 连接远程服务器,调用后立即返回,返回值为执行连接操作的异步任务
- `ChannelFuture bind(SocketAddress localAddress)`
- 绑定监听地址<br />
- `ChannelFuture close()`
- 关闭通道连接,返回连接关闭的异步任务
- `ChannelFuture closeFuture() `
- 用来处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭;而 addListener 方法是异步等待 channel 关闭
- `Channel read()`
- 读取通道数据,并启动入站处理<br />
- `ChannelFuture write(Object msg)`
- 启动出站流水处理,将最终处理数据结果写到底层通道<br />
- `Channel flush()`
- 将缓冲区的数据立即写出到对端<br />
<a name="NQIDj"></a>
# 2、EmbeddedChannel
开发人员可以通过EmbeddedChannel方便快速的进行ChannelHandler业务处理器的单元测试,避免每开发一个业务处理器都进行服务器和客户端的重复启动。
![](https://cdn.nlark.com/yuque/0/2021/png/1911003/1622950440129-4f8bc5e5-cd37-445c-8cd3-407a289c4186.png#align=left&display=inline&height=436&margin=%5Bobject%20Object%5D&originHeight=436&originWidth=920&size=0&status=done&style=none&width=920)
```json
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("入站处理器h1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("入站处理器h2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("出站处理器h3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("出站处理器h4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1,h2,h3,h4);
embeddedChannel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
embeddedChannel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}