netty资料
简介
Netty是一个java NIO框架,封装了nio复杂底层的api,提供了便捷易用的高级api。
组件介绍
Channel、EventLoop 和ChannelFuture
- Channel—Socket;
- EventLoop—控制流、多线程处理、并发;
EventLoop线程模型,一个EventLoop在一个线程里调度。一个channel可以注册到多个EventLoop。 -
ChannelHandler 和ChannelPipeline
ChannelHandler:处理IO事件,转发到下一个Handler。或者终止一个IO事件、
ChannelPipeline:handler是一个连式结构,ChannelPipeline提供了addLast(io.netty.channel.ChannelHandler…)和addFirst(io.netty.channel.ChannelHandler…)等各种方法往pipeline中添加handler。Bootstrap和EventLoopGroup
Bootstrap
用来启动配置socket,封装了socket初始化连接的过程。
server端ServerBootstrap,client端Bootstrap。服务器致力于使用一个父Channel 来接受来自客户端的连接,并创建子Channel 以用于它们之间的通信;
客户端将最可能只需要一个单独的、没有父Channel 的Channel 来用于所有的网络交互EventLoopGroup,相当于java的线程池,EventLoop相当于线程。
Example
code是来源于Introduction to Netty这篇文章。 ```java public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
int port = args.length > 0
? Integer.parseInt(args[0])
: 8080;
new NettyServer(port).run();
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestData.RequestDataDecoder(),
new ResponseData.ResponseDataEncoder(),
new ProcessingHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
static class ProcessingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RequestData requestData = (RequestData) msg;
ResponseData responseData = new ResponseData();
responseData.setIntValue(requestData.getIntValue() * 2);
ChannelFuture future = ctx.writeAndFlush(responseData);
future.addListener(ChannelFutureListener.CLOSE);
System.out.println(requestData);
}
} }
public class NettyClient { static class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue(
"all work and no play makes jack a dull boy");
ChannelFuture future = ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println((ResponseData)msg);
ctx.close();
}
}
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestData.RequestDataEncoder(),
new ResponseData.ResponseDataDecoder(), new ClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
public class RequestData { private int intValue; private String stringValue;
public int getIntValue() {
return intValue;
}
public void setIntValue(int intValue) {
this.intValue = intValue;
}
public String getStringValue() {
return stringValue;
}
public void setStringValue(String stringValue) {
this.stringValue = stringValue;
}
static class RequestDataEncoder
extends MessageToByteEncoder<RequestData> {
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void encode(ChannelHandlerContext ctx,
RequestData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
out.writeInt(msg.getStringValue().length());
out.writeCharSequence(msg.getStringValue(), charset);
}
}
static class RequestDataDecoder extends ReplayingDecoder<RequestData> {
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {
RequestData data = new RequestData();
data.setIntValue(in.readInt());
int strLen = in.readInt();
data.setStringValue(in.readCharSequence(strLen, charset).toString());
out.add(data);
}
}
}
public class ResponseData { private int intValue;
public int getIntValue() {
return intValue;
}
public void setIntValue(int intValue) {
this.intValue = intValue;
}
@Override
public String toString() {
return "ResponseData{" +
"intValue=" + intValue +
'}';
}
static class ResponseDataDecoder
extends ReplayingDecoder<ResponseData> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {
ResponseData data = new ResponseData();
data.setIntValue(in.readInt());
out.add(data);
}
}
static class ResponseDataEncoder
extends MessageToByteEncoder<ResponseData> {
@Override
protected void encode(ChannelHandlerContext ctx,
ResponseData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
}
}
} ```