本实例可以直接从Netty Example出找到源代码. 代码地址 TelnetServer.java
Telnet协议实现
public class NettyServerExample {private static final StringDecoder DECODER = new StringDecoder();//解码器private static final StringEncoder ENCODER = new StringEncoder();//编码器//biz handlerprivate static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler();public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);//boss线程EventLoopGroup workerGroup = new NioEventLoopGroup();//worker线程try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) {//todo 这里是重点 Channel初始化的时候添加handler,每一个channel都会注册这些ChannelPipeline pipeline = socketChannel.pipeline();// Add the text line codec combination first,pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));// the encoder and decoder are static as these are sharablepipeline.addLast(DECODER);pipeline.addLast(ENCODER);// and then business logic.pipeline.addLast(SERVER_HANDLER);}});b.bind(8023).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
biz handler
@ChannelHandler.Sharablepublic class TelnetServerHandler extends SimpleChannelInboundHandler<String> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// Send greeting for a new connection.ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");ctx.write("It is " + new Date() + " now.\r\n");ctx.flush();}@Overridepublic void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {// Generate and write a response.String response;boolean close = false;System.out.println("request:"+request);if (request == null) {response = "Please type something.\r\n";} else if ("bye".equals(request.toLowerCase())) {response = "Have a good day!\r\n";close = true;} else {response = "Did you say '" + request + "'?\r\n";}// We do not need to write a ChannelBuffer here.// We know the encoder inserted at TelnetPipelineFactory will do the conversion.ChannelFuture future = ctx.write(response);// Close the connection after sending 'Have a good day!'// if the client has sent 'bye'.if (close) {future.addListener(ChannelFutureListener.CLOSE);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}}
看着 Netty 一个简单的官方实例,对于 Netty 的使用会有一个大概的了解。基本结构就是使用Decoder和Encoder对TCP的字节流做反序列化,然后传递到后边的handler事件(双向链表) 触发完handler之后再返回(理论上是可有可无)对应数据.
这里暂不讨论 EventLoopGroup 和 EventLoop
常用Class
总结如下:
- 每一个
ChannelHandler会和一个AbstractChannelHandlerContext进行绑定,他们是1对1的关系. - 每一个
Channel都会有多个ChannelHandler,这些ChannelHandler在Channel初始化的时候设置上去 ,详情请看 TelnetServerInitializer.java - 每一个
ChannelPipeline都会将AbstractChannelHandlerContext串联成链表,head和tail由netty分配, 通过AbstractChannelHandlerContext就可以将ChannelHandler串联起来,详情看下图 - 每一个
ChannelPipeline都会分配到一个NioEventLoop上,这个EventLoop可以自己传递进去,也可以是worker线程 (每一个NioEventloop都是单线程的),一般都是自己传递进去.
Note:
channel不和NioEventLoop进行绑定(但是实际上是有这个关系的),这个关系由pipeline维系
EventLoop
EventLoopGroup中的一个子线程(worker)会去处理绑定在自身的所有Channel发生的所有IO事件,避免了线程切换. 说起来比较绕,看下英文.
Will handle all the I/O operations for a Channel once registered,One EventLoop instance will usually handle more than one Channel but this may depend onimplementation details and internals
理解起来就是Channel(A)初始化的时候,会把这个A分配给一个EventLoop(B)进行绑定,后续这个A发生的所有IO事件都会交由B去完成(异步)
ChannelPipeline
使用 Pipeline 将 Handler 串连起来,形成一组处理器,将Netty串联是一个整体,非常核心的一个Class了
I/O Requestvia {@link Channel} or{@link ChannelHandlerContext}|+---------------------------------------------------+---------------+| ChannelPipeline | || \|/ || +---------------------+ +-----------+----------+ || | Inbound Handler N | | Outbound Handler 1 | || +----------+----------+ +-----------+----------+ || /|\ | || | \|/ || +----------+----------+ +-----------+----------+ || | Inbound Handler N-1 | | Outbound Handler 2 | || +----------+----------+ +-----------+----------+ || /|\ . || . . || ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|| [ method call] [method call] || . . || . \|/ || +----------+----------+ +-----------+----------+ || | Inbound Handler 2 | | Outbound Handler M-1 | || +----------+----------+ +-----------+----------+ || /|\ | || | \|/ || +----------+----------+ +-----------+----------+ || | Inbound Handler 1 | | Outbound Handler M | || +----------+----------+ +-----------+----------+ || /|\ | |+---------------+-----------------------------------+---------------+| \|/+---------------+-----------------------------------+---------------+| | | || [ Socket.read() ] [ Socket.write() ] || || Netty Internal I/O Threads (Transport Implementation) |+-------------------------------------------------------------------+
Channel、ChannelHandlerContext、Handler
Channel 类比于一个TCP连接,所有的IO操作都是异步的,结果 ChannelFuture 会立即返回,但是不保证成功. Handler 是所有 Inbound Handler 和 OutBound Handler 的一个统称,使用 ChannelPipeline 将其串联成 双向链表 的数据结构ChannelHandlerContext 是交互的上下文,它的核心就是鉴定当前 ChannelHandler 是 Inbound 还是 Outbound ,已经将 prev 和 next 链接成串. 看下参数即明白
volatile AbstractChannelHandlerContext next; 上一个AbstractChannelHandlerContextvolatile AbstractChannelHandlerContext prev; 下一个AbstractChannelHandlerContextprivate final boolean inbound; 进站private final boolean outbound; 出战private final DefaultChannelPipeline pipeline; 流水线pipelineprivate final String name; handler 名字
总结
理解Filter有助于理解pipeline,理解了pipeline基本上就了解了它的整个流程。Intercepting Filter
