netty资料

  1. Introduction to Netty
  2. Netty in Action中文版

通过可以快速了解netty概念和基本用法。

简介

Netty是一个java NIO框架,封装了nio复杂底层的api,提供了便捷易用的高级api。

组件介绍

Channel、EventLoop 和ChannelFuture

  1. Channel—Socket;
  2. EventLoop—控制流、多线程处理、并发;
    EventLoop线程模型,一个EventLoop在一个线程里调度。一个channel可以注册到多个EventLoop。
  3. ChannelFuture—异步通知。

    ChannelHandler 和ChannelPipeline

    ChannelHandler:处理IO事件,转发到下一个Handler。或者终止一个IO事件、
    ChannelPipeline:handler是一个连式结构,ChannelPipeline提供了addLast(io.netty.channel.ChannelHandler…)和addFirst(io.netty.channel.ChannelHandler…)等各种方法往pipeline中添加handler。

    Bootstrap和EventLoopGroup

  4. Bootstrap
    用来启动配置socket,封装了socket初始化连接的过程。
    server端ServerBootstrap,client端Bootstrap。

    服务器致力于使用一个父Channel 来接受来自客户端的连接,并创建子Channel 以用于它们之间的通信;
    客户端将最可能只需要一个单独的、没有父Channel 的Channel 来用于所有的网络交互

  5. EventLoopGroup,相当于java的线程池,EventLoop相当于线程。

    Example

    code是来源于Introduction to Netty这篇文章。 ```java public class NettyServer {

    private int port;

    public NettyServer(int port) {

    1. this.port = port;

    }

    public static void main(String[] args) throws Exception {

    1. int port = args.length > 0
    2. ? Integer.parseInt(args[0])
    3. : 8080;
    4. new NettyServer(port).run();

    }

    public void run() throws Exception {

    1. EventLoopGroup bossGroup = new NioEventLoopGroup();
    2. EventLoopGroup workerGroup = new NioEventLoopGroup();
    3. try {
    4. ServerBootstrap b = new ServerBootstrap();
    5. b.group(bossGroup, workerGroup)
    6. .channel(NioServerSocketChannel.class)
    7. .childHandler(new ChannelInitializer<SocketChannel>() {
    8. @Override
    9. public void initChannel(SocketChannel ch)
    10. throws Exception {
    11. ch.pipeline().addLast(new RequestData.RequestDataDecoder(),
    12. new ResponseData.ResponseDataEncoder(),
    13. new ProcessingHandler());
    14. }
    15. }).option(ChannelOption.SO_BACKLOG, 128)
    16. .childOption(ChannelOption.SO_KEEPALIVE, true);
    17. ChannelFuture f = b.bind(port).sync();
    18. f.channel().closeFuture().sync();
    19. } finally {
    20. workerGroup.shutdownGracefully();
    21. bossGroup.shutdownGracefully();
    22. }

    }

    static class ProcessingHandler extends ChannelInboundHandlerAdapter {

    1. @Override
    2. public void channelRead(ChannelHandlerContext ctx, Object msg)
    3. throws Exception {
    4. RequestData requestData = (RequestData) msg;
    5. ResponseData responseData = new ResponseData();
    6. responseData.setIntValue(requestData.getIntValue() * 2);
    7. ChannelFuture future = ctx.writeAndFlush(responseData);
    8. future.addListener(ChannelFutureListener.CLOSE);
    9. System.out.println(requestData);
    10. }

    } }

public class NettyClient { static class ClientHandler extends ChannelInboundHandlerAdapter {

  1. @Override
  2. public void channelActive(ChannelHandlerContext ctx)
  3. throws Exception {
  4. RequestData msg = new RequestData();
  5. msg.setIntValue(123);
  6. msg.setStringValue(
  7. "all work and no play makes jack a dull boy");
  8. ChannelFuture future = ctx.writeAndFlush(msg);
  9. }
  10. @Override
  11. public void channelRead(ChannelHandlerContext ctx, Object msg)
  12. throws Exception {
  13. System.out.println((ResponseData)msg);
  14. ctx.close();
  15. }
  16. }
  17. public static void main(String[] args) throws Exception {
  18. String host = "localhost";
  19. int port = 8080;
  20. EventLoopGroup workerGroup = new NioEventLoopGroup();
  21. try {
  22. Bootstrap b = new Bootstrap();
  23. b.group(workerGroup);
  24. b.channel(NioSocketChannel.class);
  25. b.option(ChannelOption.SO_KEEPALIVE, true);
  26. b.handler(new ChannelInitializer<SocketChannel>() {
  27. @Override
  28. public void initChannel(SocketChannel ch)
  29. throws Exception {
  30. ch.pipeline().addLast(new RequestData.RequestDataEncoder(),
  31. new ResponseData.ResponseDataDecoder(), new ClientHandler());
  32. }
  33. });
  34. ChannelFuture f = b.connect(host, port).sync();
  35. f.channel().closeFuture().sync();
  36. } finally {
  37. workerGroup.shutdownGracefully();
  38. }
  39. }

}

public class RequestData { private int intValue; private String stringValue;

  1. public int getIntValue() {
  2. return intValue;
  3. }
  4. public void setIntValue(int intValue) {
  5. this.intValue = intValue;
  6. }
  7. public String getStringValue() {
  8. return stringValue;
  9. }
  10. public void setStringValue(String stringValue) {
  11. this.stringValue = stringValue;
  12. }
  13. static class RequestDataEncoder
  14. extends MessageToByteEncoder<RequestData> {
  15. private final Charset charset = Charset.forName("UTF-8");
  16. @Override
  17. protected void encode(ChannelHandlerContext ctx,
  18. RequestData msg, ByteBuf out) throws Exception {
  19. out.writeInt(msg.getIntValue());
  20. out.writeInt(msg.getStringValue().length());
  21. out.writeCharSequence(msg.getStringValue(), charset);
  22. }
  23. }
  24. static class RequestDataDecoder extends ReplayingDecoder<RequestData> {
  25. private final Charset charset = Charset.forName("UTF-8");
  26. @Override
  27. protected void decode(ChannelHandlerContext ctx,
  28. ByteBuf in, List<Object> out) throws Exception {
  29. RequestData data = new RequestData();
  30. data.setIntValue(in.readInt());
  31. int strLen = in.readInt();
  32. data.setStringValue(in.readCharSequence(strLen, charset).toString());
  33. out.add(data);
  34. }
  35. }

}

public class ResponseData { private int intValue;

  1. public int getIntValue() {
  2. return intValue;
  3. }
  4. public void setIntValue(int intValue) {
  5. this.intValue = intValue;
  6. }
  7. @Override
  8. public String toString() {
  9. return "ResponseData{" +
  10. "intValue=" + intValue +
  11. '}';
  12. }
  13. static class ResponseDataDecoder
  14. extends ReplayingDecoder<ResponseData> {
  15. @Override
  16. protected void decode(ChannelHandlerContext ctx,
  17. ByteBuf in, List<Object> out) throws Exception {
  18. ResponseData data = new ResponseData();
  19. data.setIntValue(in.readInt());
  20. out.add(data);
  21. }
  22. }
  23. static class ResponseDataEncoder
  24. extends MessageToByteEncoder<ResponseData> {
  25. @Override
  26. protected void encode(ChannelHandlerContext ctx,
  27. ResponseData msg, ByteBuf out) throws Exception {
  28. out.writeInt(msg.getIntValue());
  29. }
  30. }

} ```