本实例可以直接从Netty Example出找到源代码. 代码地址 TelnetServer.java
Telnet协议实现
public class NettyServerExample {
private static final StringDecoder DECODER = new StringDecoder();//解码器
private static final StringEncoder ENCODER = new StringEncoder();//编码器
//biz handler
private static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler();
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);//boss线程
EventLoopGroup workerGroup = new NioEventLoopGroup();//worker线程
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
//todo 这里是重点 Channel初始化的时候添加handler,每一个channel都会注册这些
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
});
b.bind(8023).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
biz handler
@ChannelHandler.Sharable
public class TelnetServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send greeting for a new connection.
ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
ctx.write("It is " + new Date() + " now.\r\n");
ctx.flush();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
// Generate and write a response.
String response;
boolean close = false;
System.out.println("request:"+request);
if (request == null) {
response = "Please type something.\r\n";
} else if ("bye".equals(request.toLowerCase())) {
response = "Have a good day!\r\n";
close = true;
} else {
response = "Did you say '" + request + "'?\r\n";
}
// We do not need to write a ChannelBuffer here.
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
ChannelFuture future = ctx.write(response);
// Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
看着 Netty 一个简单的官方实例,对于 Netty 的使用会有一个大概的了解。基本结构就是使用Decoder和Encoder对TCP的字节流做反序列化,然后传递到后边的handler事件(双向链表) 触发完handler之后再返回(理论上是可有可无)对应数据.
这里暂不讨论 EventLoopGroup 和 EventLoop
常用Class
总结如下:
- 每一个
ChannelHandler
会和一个AbstractChannelHandlerContext
进行绑定,他们是1对1
的关系. - 每一个
Channel
都会有多个ChannelHandler
,这些ChannelHandler
在Channel
初始化的时候设置上去 ,详情请看 TelnetServerInitializer.java - 每一个
ChannelPipeline
都会将AbstractChannelHandlerContext
串联成链表,head和tail由netty分配, 通过AbstractChannelHandlerContext
就可以将ChannelHandler
串联起来,详情看下图 - 每一个
ChannelPipeline
都会分配到一个NioEventLoop
上,这个EventLoop
可以自己传递进去,也可以是worker
线程 (每一个NioEventloop都是单线程的),一般都是自己传递进去.
Note:
channel
不和NioEventLoop
进行绑定(但是实际上是有这个关系的),这个关系由pipeline
维系
EventLoop
EventLoopGroup中的一个子线程(worker)会去处理绑定在自身的所有Channel发生的所有IO事件,避免了线程切换. 说起来比较绕,看下英文.
Will handle all the I/O operations for a Channel once registered,One EventLoop instance will usually handle more than one Channel but this may depend onimplementation details and internals
理解起来就是Channel(A)初始化的时候,会把这个A分配给一个EventLoop(B)进行绑定,后续这个A发生的所有IO事件都会交由B去完成(异步)
ChannelPipeline
使用 Pipeline
将 Handler
串连起来,形成一组处理器,将Netty串联是一个整体,非常核心的一个Class了
I/O Request
via {@link Channel} or
{@link ChannelHandlerContext}
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
Channel、ChannelHandlerContext、Handler
Channel
类比于一个TCP连接,所有的IO操作都是异步的,结果 ChannelFuture
会立即返回,但是不保证成功. Handler
是所有 Inbound Handler
和 OutBound Handler
的一个统称,使用 ChannelPipeline
将其串联成 双向链表
的数据结构ChannelHandlerContext
是交互的上下文,它的核心就是鉴定当前 ChannelHandler
是 Inbound
还是 Outbound
,已经将 prev
和 next
链接成串. 看下参数即明白
volatile AbstractChannelHandlerContext next; 上一个AbstractChannelHandlerContext
volatile AbstractChannelHandlerContext prev; 下一个AbstractChannelHandlerContext
private final boolean inbound; 进站
private final boolean outbound; 出战
private final DefaultChannelPipeline pipeline; 流水线pipeline
private final String name; handler 名字
总结
理解Filter有助于理解pipeline,理解了pipeline基本上就了解了它的整个流程。Intercepting Filter