netty资料
简介
Netty是一个java NIO框架,封装了nio复杂底层的api,提供了便捷易用的高级api。
组件介绍
Channel、EventLoop 和ChannelFuture
- Channel—Socket;
- EventLoop—控制流、多线程处理、并发;
EventLoop线程模型,一个EventLoop在一个线程里调度。一个channel可以注册到多个EventLoop。 -
ChannelHandler 和ChannelPipeline
ChannelHandler:处理IO事件,转发到下一个Handler。或者终止一个IO事件、
ChannelPipeline:handler是一个连式结构,ChannelPipeline提供了addLast(io.netty.channel.ChannelHandler…)和addFirst(io.netty.channel.ChannelHandler…)等各种方法往pipeline中添加handler。Bootstrap和EventLoopGroup
Bootstrap
用来启动配置socket,封装了socket初始化连接的过程。
server端ServerBootstrap,client端Bootstrap。服务器致力于使用一个父Channel 来接受来自客户端的连接,并创建子Channel 以用于它们之间的通信;
客户端将最可能只需要一个单独的、没有父Channel 的Channel 来用于所有的网络交互EventLoopGroup,相当于java的线程池,EventLoop相当于线程。
Example
code是来源于Introduction to Netty这篇文章。 ```java public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
int port = args.length > 0? Integer.parseInt(args[0]): 8080;new NettyServer(port).run();
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ch.pipeline().addLast(new RequestData.RequestDataDecoder(),new ResponseData.ResponseDataEncoder(),new ProcessingHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}
}
static class ProcessingHandler extends ChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {RequestData requestData = (RequestData) msg;ResponseData responseData = new ResponseData();responseData.setIntValue(requestData.getIntValue() * 2);ChannelFuture future = ctx.writeAndFlush(responseData);future.addListener(ChannelFutureListener.CLOSE);System.out.println(requestData);}
} }
public class NettyClient { static class ClientHandler extends ChannelInboundHandlerAdapter {
@Overridepublic void channelActive(ChannelHandlerContext ctx)throws Exception {RequestData msg = new RequestData();msg.setIntValue(123);msg.setStringValue("all work and no play makes jack a dull boy");ChannelFuture future = ctx.writeAndFlush(msg);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {System.out.println((ResponseData)msg);ctx.close();}}public static void main(String[] args) throws Exception {String host = "localhost";int port = 8080;EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.SO_KEEPALIVE, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ch.pipeline().addLast(new RequestData.RequestDataEncoder(),new ResponseData.ResponseDataDecoder(), new ClientHandler());}});ChannelFuture f = b.connect(host, port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}
}
public class RequestData { private int intValue; private String stringValue;
public int getIntValue() {return intValue;}public void setIntValue(int intValue) {this.intValue = intValue;}public String getStringValue() {return stringValue;}public void setStringValue(String stringValue) {this.stringValue = stringValue;}static class RequestDataEncoderextends MessageToByteEncoder<RequestData> {private final Charset charset = Charset.forName("UTF-8");@Overrideprotected void encode(ChannelHandlerContext ctx,RequestData msg, ByteBuf out) throws Exception {out.writeInt(msg.getIntValue());out.writeInt(msg.getStringValue().length());out.writeCharSequence(msg.getStringValue(), charset);}}static class RequestDataDecoder extends ReplayingDecoder<RequestData> {private final Charset charset = Charset.forName("UTF-8");@Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, List<Object> out) throws Exception {RequestData data = new RequestData();data.setIntValue(in.readInt());int strLen = in.readInt();data.setStringValue(in.readCharSequence(strLen, charset).toString());out.add(data);}}
}
public class ResponseData { private int intValue;
public int getIntValue() {return intValue;}public void setIntValue(int intValue) {this.intValue = intValue;}@Overridepublic String toString() {return "ResponseData{" +"intValue=" + intValue +'}';}static class ResponseDataDecoderextends ReplayingDecoder<ResponseData> {@Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, List<Object> out) throws Exception {ResponseData data = new ResponseData();data.setIntValue(in.readInt());out.add(data);}}static class ResponseDataEncoderextends MessageToByteEncoder<ResponseData> {@Overrideprotected void encode(ChannelHandlerContext ctx,ResponseData msg, ByteBuf out) throws Exception {out.writeInt(msg.getIntValue());}}
} ```
