Netty从入门到网络通信实战

Netty 介绍

简单用 3 点概括一下 Netty 吧!

1Netty 是一个基于 NIO 的 client-server(客户端服务器)框架,使用它可以快速简单地开发网络应用程序。
2它极大地简化并简化了 TCP 和 UDP 套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。
3支持多种协议如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。

用官方的总结就是:Netty 成功地找到了一种在不妥协可维护性和性能的情况下实现易于开发,性能,稳定性和灵活性的方法。

2. Netty 特点

根据官网的描述,我们可以总结出下面一些特点:

●统一的 API,支持多种传输类型,阻塞和非阻塞的。
●简单而强大的线程模型。
●自带编解码器解决 TCP 粘包/拆包问题。
●自带各种协议栈。
●真正的无连接数据包套接字支持。
●比直接使用 Java 核心 API 有更高的吞吐量、更低的延迟、更低的资源消耗和更少的内存复制。
●安全性不错,有完整的 SSL/TLS 以及 StartTLS 支持。
●社区活跃
●成熟稳定,经历了大型项目的使用和考验,而且很多开源项目都使用到了 Netty 比如我们经常接触的 Dubbo、RocketMQ 等等。
●……

3. 使用 Netty 能做什么?

这个应该是老铁们最关心的一个问题了,凭借自己的了解,简单说一下,理论上 NIO 可以做的事情 ,使用 Netty 都可以做并且更好。Netty 主要用来做网络通信 :

1作为 RPC 框架的网络通信工具 : 我们在分布式系统中,不同服务节点之间经常需要相互调用,这个时候就需要 RPC 框架了。不同服务指点的通信是如何做的呢?可以使用 Netty 来做。比如我调用另外一个节点的方法的话,至少是要让对方知道我调用的是哪个类中的哪个方法以及相关参数吧!
2实现一个自己的 HTTP 服务器 :通过 Netty 我们可以自己实现一个简单的 HTTP 服务器,这个大家应该不陌生。说到 HTTP 服务器的话,作为 Java 后端开发,我们一般使用 Tomcat 比较多。一个最基本的 HTTP 服务器可要以处理常见的 HTTP Method 的请求,比如 POST 请求、GET 请求等等。
3实现一个即时通讯系统 : 使用 Netty 我们可以实现一个可以聊天类似微信的即时通讯系统,这方面的开源项目还蛮多的,可以自行去 Github 找一找。
4消息推送系统 :市面上有很多消息推送系统都是基于 Netty 来做的。

4. Netty 使用 kryo 序列化传输对象实战

4.1. 传输实体类


我们首先定义两个对象,这两个对象是客户端与服务端进行交互的实体类。 客户端将 RpcRequest 类型的对象发送到服务端,服务端进行相应的处理之后将得到结果 RpcResponse 对象返回给客户端。

4.1.1. 客户端请求

RpcRequest.java :客户端请求实体类

  1. @AllArgsConstructor
  2. @Getter
  3. @NoArgsConstructor
  4. @Builder
  5. @ToString
  6. public class RpcRequest {
  7. private String interfaceName;
  8. private String methodName;
  9. }

4.1.2. 服务端响应

RpcResponse.java :服务端响应实体类

  1. @AllArgsConstructor
  2. @Getter
  3. @NoArgsConstructor
  4. @Builder
  5. @ToString
  6. public class RpcResponse {
  7. private String message;
  8. }

4.2. 客户端

4.2.1. 初始化客户端

客户端中主要有一个用于向服务端发送消息的 sendMessage()方法,通过这个方法你可以将消息也就是RpcRequest 对象发送到服务端,并且你可以同步获取到服务端返回的结果也就是RpcResponse 对象。

  1. public class NettyClient {
  2. private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
  3. private final String host;
  4. private final int port;
  5. private static final Bootstrap b;
  6. public NettyClient(String host, int port) {
  7. this.host = host;
  8. this.port = port;
  9. }
  10. // 初始化相关资源比如 EventLoopGroup, Bootstrap
  11. static {
  12. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  13. b = new Bootstrap();
  14. KryoSerializer kryoSerializer = new KryoSerializer();
  15. b.group(eventLoopGroup)
  16. .channel(NioSocketChannel.class)
  17. .handler(new LoggingHandler(LogLevel.INFO))
  18. // 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
  19. // 如果 15 秒之内没有发送数据给服务端的话,就发送一次心跳请求
  20. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
  21. .handler(new ChannelInitializer<SocketChannel>() {
  22. @Override
  23. protected void initChannel(SocketChannel ch) {
  24. /*
  25. 自定义序列化编解码器
  26. */
  27. // RpcResponse -> ByteBuf
  28. ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
  29. // ByteBuf -> RpcRequest
  30. ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
  31. ch.pipeline().addLast(new NettyClientHandler());
  32. }
  33. });
  34. }
  35. /**
  36. * 发送消息到服务端
  37. *
  38. * @param rpcRequest 消息体
  39. * @return 服务端返回的数据
  40. */
  41. public RpcResponse sendMessage(RpcRequest rpcRequest) {
  42. try {
  43. ChannelFuture f = b.connect(host, port).sync();
  44. logger.info("client connect {}", host + ":" + port);
  45. Channel futureChannel = f.channel();
  46. logger.info("send message");
  47. if (futureChannel != null) {
  48. futureChannel.writeAndFlush(rpcRequest).addListener(future -> {
  49. if (future.isSuccess()) {
  50. logger.info("client send message: [{}]", rpcRequest.toString());
  51. } else {
  52. logger.error("Send failed:", future.cause());
  53. }
  54. });
  55. //阻塞等待 ,直到Channel关闭
  56. futureChannel.closeFuture().sync();
  57. // 将服务端返回的数据也就是RpcResponse对象取出
  58. AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
  59. return futureChannel.attr(key).get();
  60. }
  61. } catch (InterruptedException e) {
  62. logger.error("occur exception when connect server:", e);
  63. }
  64. return null;
  65. }
  66. }

sendMessage()方法分析:

1首先初始化了一个 Bootstrap
2通过 Bootstrap 对象连接服务端
3通过 Channel 向服务端发送消息RpcRequest
4发送成功后,阻塞等待 ,直到Channel关闭
5拿到服务端返回的结果 RpcResponse

4.2.2. 自定义 ChannelHandler 处理服务端消息

  1. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  2. private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  5. try {
  6. RpcResponse rpcResponse = (RpcResponse) msg;
  7. logger.info("client receive msg: [{}]", rpcResponse.toString());
  8. // 声明一个 AttributeKey 对象
  9. AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
  10. // 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源
  11. // AttributeMap的key是AttributeKey,value是Attribute
  12. ctx.channel().attr(key).set(rpcResponse);
  13. ctx.channel().close();
  14. } finally {
  15. ReferenceCountUtil.release(msg);
  16. }
  17. }
  18. @Override
  19. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  20. logger.error("client caught exception", cause);
  21. ctx.close();
  22. }
  23. }

NettyClientHandler用于读取服务端发送过来的 RpcResponse 消息对象,并将 RpcResponse 消息对象保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源。

这样的话,我们就能通过 channel 和 key 将数据读取出来。

  1. AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
  2. return futureChannel.attr(key).get();

这个额外提一下 AttributeMap ,AttributeMap 是一个接口,但是类似于 Map 数据结构 。

  1. public interface AttributeMap {
  2. <T> Attribute<T> attr(AttributeKey<T> key);
  3. <T> boolean hasAttr(AttributeKey<T> key);
  4. }

Channel 实现了 AttributeMap 接口,这样也就表明它存在了AttributeMap 相关的属性。 每个 Channel上的AttributeMap属于共享数据。AttributeMap 的结构,和Map很像,我们可以把 key 看作是AttributeKey,value 看作是Attribute,我们可以根据 AttributeKey找到对应的Attribute。

  1. public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
  2. ......
  3. }

4.3. 服务端

4.3.1. 初始化服务端

NettyServer 主要作用就是开启了一个服务端用于接受客户端的请求并处理。

  1. public class NettyServer {
  2. private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
  3. private final int port;
  4. private NettyServer(int port) {
  5. this.port = port;
  6. }
  7. private void run() {
  8. EventLoopGroup bossGroup = new NioEventLoopGroup();
  9. EventLoopGroup workerGroup = new NioEventLoopGroup();
  10. KryoSerializer kryoSerializer = new KryoSerializer();
  11. try {
  12. ServerBootstrap b = new ServerBootstrap();
  13. b.group(bossGroup, workerGroup)
  14. .channel(NioServerSocketChannel.class)
  15. // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
  16. .childOption(ChannelOption.TCP_NODELAY, true)
  17. // 是否开启 TCP 底层心跳机制
  18. .childOption(ChannelOption.SO_KEEPALIVE, true)
  19. //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
  20. .option(ChannelOption.SO_BACKLOG, 128)
  21. .handler(new LoggingHandler(LogLevel.INFO))
  22. .childHandler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) {
  25. ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
  26. ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
  27. ch.pipeline().addLast(new NettyServerHandler());
  28. }
  29. });
  30. // 绑定端口,同步等待绑定成功
  31. ChannelFuture f = b.bind(port).sync();
  32. // 等待服务端监听端口关闭
  33. f.channel().closeFuture().sync();
  34. } catch (InterruptedException e) {
  35. logger.error("occur exception when start server:", e);
  36. } finally {
  37. bossGroup.shutdownGracefully();
  38. workerGroup.shutdownGracefully();
  39. }
  40. }
  41. }

4.3.2. 自定义 ChannelHandler 处理客户端消息

NettyServerHandler 用于介绍客户端发送过来的消息并返回结果给客户端。

  1. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  2. private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
  3. private static final AtomicInteger atomicInteger = new AtomicInteger(1);
  4. @Override
  5. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  6. try {
  7. RpcRequest rpcRequest = (RpcRequest) msg;
  8. logger.info("server receive msg: [{}] ,times:[{}]", rpcRequest, atomicInteger.getAndIncrement());
  9. RpcResponse messageFromServer = RpcResponse.builder().message("message from server").build();
  10. ChannelFuture f = ctx.writeAndFlush(messageFromServer);
  11. f.addListener(ChannelFutureListener.CLOSE);
  12. } finally {
  13. ReferenceCountUtil.release(msg);
  14. }
  15. }
  16. @Override
  17. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  18. logger.error("server catch exception",cause);
  19. ctx.close();
  20. }
  21. }

4.4. 编码器

4.4.1. 自定义编码器

NettyKryoEncoder 是我们自定义的编码器。它负责处理”出站”消息,将消息格式转换为字节数组然后写入到字节数据的容器 ByteBuf 对象中。

  1. /**
  2. * 自定义编码器。
  3. * <p>
  4. * 网络传输需要通过字节流来实现,ByteBuf 可以看作是 Netty 提供的字节数据的容器,使用它会让我们更加方便地处理字节数据。
  5. *
  6. * @author shuang.kou
  7. * @createTime 2020年05月25日 19:43:00
  8. */
  9. @AllArgsConstructor
  10. public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
  11. private final Serializer serializer;
  12. private final Class<?> genericClass;
  13. /**
  14. * 将对象转换为字节码然后写入到 ByteBuf 对象中
  15. */
  16. @Override
  17. protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) {
  18. if (genericClass.isInstance(o)) {
  19. // 1. 将对象转换为byte
  20. byte[] body = serializer.serialize(o);
  21. // 2. 读取消息的长度
  22. int dataLength = body.length;
  23. // 3.写入消息对应的字节数组长度,writerIndex 加 4
  24. byteBuf.writeInt(dataLength);
  25. //4.将字节数组写入 ByteBuf 对象中
  26. byteBuf.writeBytes(body);
  27. }
  28. }
  29. }

4.4.2. 自定义解码器

NettyKryoDecoder是我们自定义的解码器。它负责处理”入站”消息,它会从 ByteBuf 中读取到业务对象对应的字节序列,然后再将字节序列转换为我们的业务对象。

  1. /**
  2. * 自定义解码器。
  3. *
  4. * @author shuang.kou
  5. * @createTime 2020年05月25日 19:42:00
  6. */
  7. @AllArgsConstructor
  8. @Slf4j
  9. public class NettyKryoDecoder extends ByteToMessageDecoder {
  10. private final Serializer serializer;
  11. private final Class<?> genericClass;
  12. /**
  13. * Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
  14. */
  15. private static final int BODY_LENGTH = 4;
  16. /**
  17. * 解码 ByteBuf 对象
  18. *
  19. * @param ctx 解码器关联的 ChannelHandlerContext 对象
  20. * @param in "入站"数据,也就是 ByteBuf 对象
  21. * @param out 解码之后的数据对象需要添加到 out 对象里面
  22. */
  23. @Override
  24. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
  25. //1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4,
  26. if (in.readableBytes() >= BODY_LENGTH) {
  27. //2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用
  28. in.markReaderIndex();
  29. //3.读取消息的长度
  30. //注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法
  31. int dataLength = in.readInt();
  32. //4.遇到不合理的情况直接 return
  33. if (dataLength < 0 || in.readableBytes() < 0) {
  34. log.error("data length or byteBuf readableBytes is not valid");
  35. return;
  36. }
  37. //5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex
  38. if (in.readableBytes() < dataLength) {
  39. in.resetReaderIndex();
  40. return;
  41. }
  42. // 6.走到这里说明没什么问题了,可以序列化了
  43. byte[] body = new byte[dataLength];
  44. in.readBytes(body);
  45. // 将bytes数组转换为我们需要的对象
  46. Object obj = serializer.deserialize(body, genericClass);
  47. out.add(obj);
  48. log.info("successful decode ByteBuf to Object");
  49. }
  50. }
  51. }

4.4.3. 自定义序列化接口

Serializer 接口主要有两个方法一个用于序列化,一个用户反序列化。

  1. public interface Serializer {
  2. /**
  3. * 序列化
  4. * @param obj 要序列化的对象
  5. * @return 字节数组
  6. */
  7. byte[] serialize(Object obj);
  8. /**
  9. * 反序列化
  10. *
  11. * @param bytes 序列化后的字节数组
  12. * @param clazz 类
  13. * @param <T>
  14. * @return 反序列化的对象
  15. */
  16. <T> T deserialize(byte[] bytes, Class<T> clazz);
  17. }

4.4.4. 实现序列化接口

  1. public class KryoSerializer implements Serializer {
  2. /**
  3. * 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
  4. * 所以,使用 ThreadLocal 存放 Kryo 对象
  5. */
  6. private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
  7. Kryo kryo = new Kryo();
  8. kryo.register(RpcResponse.class);
  9. kryo.register(RpcRequest.class);
  10. kryo.setReferences(true);//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
  11. kryo.setRegistrationRequired(false);//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
  12. return kryo;
  13. });
  14. @Override
  15. public byte[] serialize(Object obj) {
  16. try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  17. Output output = new Output(byteArrayOutputStream)) {
  18. Kryo kryo = kryoThreadLocal.get();
  19. // Object->byte:将对象序列化为byte数组
  20. kryo.writeObject(output, obj);
  21. kryoThreadLocal.remove();
  22. return output.toBytes();
  23. } catch (Exception e) {
  24. throw new SerializeException("序列化失败");
  25. }
  26. }
  27. @Override
  28. public <T> T deserialize(byte[] bytes, Class<T> clazz) {
  29. try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
  30. Input input = new Input(byteArrayInputStream)) {
  31. Kryo kryo = kryoThreadLocal.get();
  32. // byte->Object:从byte数组中反序列化出对对象
  33. Object o = kryo.readObject(input, clazz);
  34. kryoThreadLocal.remove();
  35. return clazz.cast(o);
  36. } catch (Exception e) {
  37. throw new SerializeException("反序列化失败");
  38. }
  39. }
  40. }

自定义序列化异常类 SerializeException 如下:

  1. public class SerializeException extends RuntimeException {
  2. public SerializeException(String message) {
  3. super(message);
  4. }
  5. }

4.5. 测试效果

启动服务端:

  1. new NettyServer(8889).run();

启动客户端并发送 4 次消息给服务端:

  1. RpcRequest rpcRequest = RpcRequest.builder()
  2. .interfaceName("interface")
  3. .methodName("hello").build();
  4. NettyClient nettyClient = new NettyClient("127.0.0.1", 8889);
  5. for (int i = 0; i < 3; i++) {
  6. nettyClient.sendMessage(rpcRequest);
  7. }
  8. RpcResponse rpcResponse = nettyClient.sendMessage(rpcRequest);
  9. System.out.println(rpcResponse.toString());

源码地址:https://github.com/Snailclimb/guide-rpc-framework-learning/tree/master/src/main/java/github/javaguide/netty/kyro