编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
codec(编码器)的组成部分有两个:decoder(解码器) 和 encoder(编码器),encoder负责把业务数据转换成字节码数据,decoder负责把字节码数据转换成业务数据
编解码器都实现了ChannelInboundHandler或者ChannelOutboundHandler接口
image.png
StringDecoder继承关系

编码器与解码器需要放置在自定义业务处理Handler之前,当进行解码操作时(服务端收到数据),对PipeLine相当于入栈操作;进行编码操作时,对PipeLine相当于出栈操作
37VT%6`H{}5J~%}B22D(HBW.png
编码器与解码器顺序一
image.png
调用顺序二

不论解码器handler还是编码器handler,接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行;在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会期望结果可能不一致

常用编码器与解码器:

  1. 数据编解码器:
  2. StringEnvoder 对字符串编码
  3. StringDecoder 对字符串解码
  4. ObjectEncoder 只能对Java对象进行编码
  5. ObjectDecoder 只能对Java对象进行解码
  6. HttpServerCodec Http服务编码&解码器(将消息解析为HttpRequestHttpContent两个部分)
  7. 特殊编&解码器:
  8. LineBasedFrameDecoder 使用行位控制符(\n或者\r\n)作为分割符来解码数据
  9. DelimiterBasedFrameDecoder 使用自定义特殊字符作为消息分割符来解码数据
  10. LengthFieldBasedFrameDecoder 指定长度来标识整包信息达到自动处理粘包和半包问题
  11. 数据包较大时:
  12. ZlibDecoder 压缩编码器(数据较大时可以在发送时进行压缩,服务端需要指定对应压缩解码器)
  13. ZlibEncoder 压缩解码器
  14. 传输服务编解码器:
  15. Protobuf 谷歌编码解码器,支持非Java对象编码解码,适合PRC数据交换服务

HTTP编解码器:

使用 HttpServerCodec 对请求数据进行解析,请求数据将会被 HttpServerCodec 解析为 HttpRequest(请求头)和HttpContent(请求体)两个部分,最后使用 DefaultFullHttpResponse 返回结果,源码地址

  1. package http.coder;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.handler.codec.FixedLengthFrameDecoder;
  9. import io.netty.handler.codec.http.*;
  10. import io.netty.util.CharsetUtil;
  11. import java.nio.charset.StandardCharsets;
  12. import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
  13. /* Http编解码器使用案例 - 服务端 */
  14. public class HttpCoderServer {
  15. public static void main(String[] args) throws Exception{
  16. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  17. EventLoopGroup workerGroup = new NioEventLoopGroup();
  18. try {
  19. ServerBootstrap serverBootstrap = new ServerBootstrap();
  20. serverBootstrap.group(bossGroup,workerGroup)
  21. .channel(NioServerSocketChannel.class)
  22. .childHandler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. ChannelPipeline pipeline = ch.pipeline();
  26. pipeline.addLast(new HttpServerCodec()); //将消息分为HttpRequest请求头和HttpContent请求体两部分
  27. /* 只处理请求头数据 */
  28. pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
  29. @Override
  30. protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
  31. System.out.println("请求头内容为: "+msg);
  32. }
  33. });
  34. /* 只处理请求体数据 */
  35. pipeline.addLast(new SimpleChannelInboundHandler<HttpContent>() {
  36. @Override
  37. protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) throws Exception {
  38. ByteBuf buf = msg.content();
  39. String s = buf.toString(CharsetUtil.UTF_8);
  40. System.out.println("请求体内容为: "+s);
  41. /* 返回响应结果 */
  42. DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
  43. byte[] info = "数据已接收".getBytes(StandardCharsets.UTF_8);
  44. response.content().writeBytes(info); //设置返回内容
  45. response.headers().setInt(CONTENT_LENGTH,info.length); //设置返回内容长度
  46. ctx.writeAndFlush(response);
  47. }
  48. });
  49. }
  50. });
  51. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  52. channelFuture.channel().closeFuture().sync();
  53. }finally {
  54. bossGroup.shutdownGracefully();
  55. workerGroup.shutdownGracefully();
  56. }
  57. }
  58. }

使用PostMan进行数据发送测试,服务端成功收到消息
image.png


实体类数据传输:

Netty 默认只能使用 byte 类型进行数据传输,当对象为实体类时无法直接通过强转进行解析,需要使用第三方编解码器进行操作后才能使用,常用方案有以下两种:

JBoss编解码器:

源码参考

引入依赖:

  1. <!-- Netty Java序列化框架marshalling -->
  2. <dependency>
  3. <groupId>org.jboss.marshalling</groupId>
  4. <artifactId>jboss-marshalling</artifactId>
  5. <version>1.3.0.CR9</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.jboss.marshalling</groupId>
  9. <artifactId>jboss-marshalling-serial</artifactId>
  10. <version>1.3.0.CR9</version>
  11. </dependency>

JBoss解码器:

  1. package jboss;
  2. import io.netty.handler.codec.marshalling.*;
  3. import org.jboss.marshalling.MarshallerFactory;
  4. import org.jboss.marshalling.Marshalling;
  5. import org.jboss.marshalling.MarshallingConfiguration;
  6. /**
  7. * Marshalling工厂,Java实体类编解码器
  8. */
  9. public final class MarshallingCodeCFactory {
  10. /**
  11. * 创建Jboss Marshalling解码器MarshallingDecoder
  12. * @return MarshallingDecoder
  13. */
  14. public static MarshallingDecoder buildMarshallingDecoder() {
  15. //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
  16. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  17. //创建了MarshallingConfiguration对象,配置了版本号为5
  18. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  19. configuration.setVersion(5);
  20. //根据marshallerFactory和configuration创建provider
  21. UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
  22. //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
  23. MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
  24. return decoder;
  25. }
  26. /**
  27. * 创建Jboss Marshalling编码器MarshallingEncoder
  28. * @return MarshallingEncoder
  29. */
  30. public static MarshallingEncoder buildMarshallingEncoder() {
  31. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  32. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  33. configuration.setVersion(5);
  34. MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
  35. //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
  36. MarshallingEncoder encoder = new MarshallingEncoder(provider);
  37. return encoder;
  38. }
  39. }

实体类:

  1. package jboss;
  2. import lombok.Data;
  3. import lombok.experimental.Accessors;
  4. import java.io.Serializable;
  5. /* Netty 传输数据实体类 */
  6. @Data
  7. @Accessors(chain = true)
  8. public class TranslatorData implements Serializable {
  9. private String id;
  10. private String name;
  11. private String message; //传输消息具体内容
  12. }

服务端:

  1. package jboss;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. /* Jboss解析实体类-Netty服务端 */
  9. public class NettyServer {
  10. public static void main(String[] args) throws InterruptedException {
  11. EventLoopGroup bossGroup = new NioEventLoopGroup();
  12. EventLoopGroup workGroup = new NioEventLoopGroup();
  13. //启动器,负责组装Netty组件进行启动服务器
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. try {
  16. bootstrap.group(bossGroup,workGroup)
  17. .channel(NioServerSocketChannel.class)
  18. .option(ChannelOption.SO_BACKLOG,1024)
  19. .childHandler(new ChannelInitializer<SocketChannel>() {
  20. //连接建立后开始执行
  21. @Override
  22. protected void initChannel(SocketChannel ch) throws Exception {
  23. ChannelPipeline pipeline = ch.pipeline();
  24. //添加Java实体类编解码器
  25. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  26. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  27. //自定义Handler,通常实现子类ChannelInboundHandlerAdapter或SimpleChannelInboundHandler<参数>进行实现
  28. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  29. //当有读事件时执行
  30. @Override
  31. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  32. TranslatorData info = (TranslatorData) msg;
  33. System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());
  34. //创建新数据
  35. TranslatorData response = new TranslatorData();
  36. response.setId("resp: " + info.getId());
  37. response.setName("resp: " + info.getName());
  38. response.setMessage("resp: " + info.getMessage());
  39. //返回数据
  40. ctx.writeAndFlush(response);
  41. }
  42. });
  43. }
  44. });
  45. System.out.println("服务器 is ready .....");
  46. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  47. ChannelFuture future = bootstrap.bind(8765).sync();
  48. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  49. future.channel().closeFuture().sync();
  50. }finally {
  51. bossGroup.shutdownGracefully();
  52. workGroup.shutdownGracefully();
  53. }
  54. }
  55. }

客户端:

  1. package jboss;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.logging.LogLevel;
  8. import io.netty.handler.logging.LoggingHandler;
  9. import io.netty.util.ReferenceCountUtil;
  10. /* Jboss解析实体类-Netty客户端 */
  11. public class NettyClient {
  12. public static void main(String[] args) {
  13. EventLoopGroup workGroup = new NioEventLoopGroup();
  14. Bootstrap bootstrap = new Bootstrap();
  15. try {
  16. bootstrap.group(workGroup)
  17. .channel(NioSocketChannel.class)
  18. //表示缓存区动态调配(自适应)
  19. .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
  20. .handler(new LoggingHandler(LogLevel.INFO))
  21. .handler(new ChannelInitializer<SocketChannel>() {
  22. @Override
  23. protected void initChannel(SocketChannel sc) throws Exception {
  24. ChannelPipeline pipeline = sc.pipeline();
  25. //添加Java实体类编解码器
  26. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  27. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  28. //添加自定义处理器
  29. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  30. @Override
  31. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  32. TranslatorData response = (TranslatorData)msg;
  33. System.out.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage());
  34. }
  35. });
  36. }
  37. });
  38. //绑定端口,同步等等请求连接
  39. ChannelFuture future = bootstrap.connect("127.0.0.1", 8765).sync();
  40. Channel channel = future.channel();
  41. System.out.println("客户端尝试连接....");
  42. /*模拟数据发送*/
  43. for(int i =0; i <10; i++){
  44. TranslatorData request = new TranslatorData();
  45. request.setId("" + i);
  46. request.setName("请求消息名称 " + i);
  47. request.setMessage("请求消息内容 " + i);
  48. channel.writeAndFlush(request);
  49. }
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }

测试:

image.png

Protobuf:

传统的数据传输是使用 Http + JSON 的方式,引入Netty后可以使用 TCP + Protobuf 的方式进行数据交互。需要将类定义为 protobuf 的 .proto 格式,protobuf会在编译阶段生成它能识别的Java对象进行传输使用。生成的文件将包含外部类和内部类,实现数据推送的为内部类数据,支持跨平台发送数据
image.png
客户端需要添加Proto编码器,服务端需要proto解码器
image.png
IDEA 对应插件

使用ProtoBuf需要先添加依赖:

  1. <dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>3.19.3</version>
  5. </dependency>

单数据类型发送:

客户端与服务端都使用同一个实体类进行交互,源码地址

1、创建Protobuf传输对象
创建一个文件名为 StudentPOJO 的 proto 文件

  1. syntax = "proto3"; //指定Protobuf版本
  2. option java_outer_classname = "StudentPOJO"; //文件名,同时也是外部类名
  3. //真正发送的实体类对象,内部参数需要参照 protobyf官方语言指南
  4. message Student{
  5. int32 id = 1; //相当于 private int id ,1相当于属性序号
  6. string name = 2;
  7. }

2、编译文件
访问 GitHub下载 Proto-3.19.3-win 资源包,解压后对刚才编写的 proto 文件进行编译,编译后放入项目中

  1. protoc.exe --java_out=. 文件名.proto
  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1645091863467-74cc33f2-e975-447b-87e4-1ee5cecc90b6.png#clientId=u91c76975-c5b2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=183&id=ub582e2a3&margin=%5Bobject%20Object%5D&name=image.png&originHeight=275&originWidth=658&originalType=binary&ratio=1&rotation=0&showTitle=false&size=18382&status=done&style=stroke&taskId=u87cd738f-9eb3-4de8-aa3d-c5f41f4c5b9&title=&width=438.6666666666667)<br />** 编译proto文件为Java文件**

3、创建客户端与服务端
客户端添加上 protobuf 的编码器,服务端添加上 protobuf 的解码器即可

客户端:

  1. package protobuf;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.protobuf.ProtobufEncoder;
  11. /* 客户端代码 */
  12. public class NettyClient {
  13. public static void main(String[] args) throws Exception {
  14. //客户端需要一个事件循环组
  15. EventLoopGroup group = new NioEventLoopGroup();
  16. try {
  17. //客户端启动对象,客户端使用的是 Bootstrap 而不是 ServerBootstrap
  18. Bootstrap bootstrap = new Bootstrap();
  19. //设置相关参数
  20. bootstrap.group(group) //设置线程组
  21. .channel(NioSocketChannel.class) //设置客户端通道的实现类
  22. .handler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. ChannelPipeline pipeline = ch.pipeline();
  26. pipeline.addLast("proEncoder",new ProtobufEncoder()); //自定义处理器
  27. pipeline.addLast(new NettyClientHandler()); //自定义处理器
  28. }
  29. });
  30. System.out.println("客户端 ok...");
  31. //启动客户端并连接到服务端
  32. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
  33. //给关闭通道进行监听(关闭通道事件发生后触发)
  34. channelFuture.channel().closeFuture().sync();
  35. } finally {
  36. group.shutdownGracefully();
  37. }
  38. }
  39. }

客户端Handler:

  1. package protobuf;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import java.nio.charset.StandardCharsets;
  8. /* 自定义管道 Handler */
  9. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  10. /* 当通道就绪就会触发该方法
  11. * 利用ProtoBuf发送数据
  12. * */
  13. @Override
  14. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  15. StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(10).setName("小明同学").build();
  16. System.out.println("客户端发送Proto数据");
  17. ctx.writeAndFlush(student);
  18. }
  19. /*当通道有读取数据事件时触发
  20. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  21. * 参数二: Object 客户端发送的数据,默认是Object需要转换
  22. */
  23. @Override
  24. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  25. ByteBuf buf = (ByteBuf) msg;
  26. System.out.println("服务器回复的消息: "+buf.toString(CharsetUtil.UTF_8));
  27. System.out.println("服务器的地址: "+ctx.channel().remoteAddress());
  28. }
  29. /* 异常发生时触发 */
  30. @Override
  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  32. cause.printStackTrace();
  33. ctx.close();
  34. }
  35. }

服务端:

  1. package protobuf;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.protobuf.ProtobufDecoder;
  8. /* 服务端代码 */
  9. public class NettyServer {
  10. public static void main(String[] args) throws InterruptedException {
  11. /* 创建 BossGroup 和 WorkerGroup 线程组
  12. * BossGroup 只处理连接请求,真正的和客户端业务处理会交给 WorkGrouop 完成
  13. * bossGroup 和 workerGroup 含有的子线程的个数默认为 CPU最大线程数 X 2,可以手动指定
  14. */
  15. EventLoopGroup bossGroup = new NioEventLoopGroup(1); //创建子线程的个数为1的 bossGroup
  16. EventLoopGroup workerGroup = new NioEventLoopGroup(8); //创建子线程的个数为8的 workerGroup
  17. try {
  18. //创建服务器端的启动对象,配置参数
  19. ServerBootstrap bootstrap = new ServerBootstrap();
  20. //使用链式参数配置启动参数
  21. bootstrap.group(bossGroup,workerGroup) //设置两个线程组
  22. .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
  23. .option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
  24. .childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
  25. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  26. //给PipeLine设置处理器
  27. @Override
  28. protected void initChannel(SocketChannel ch) throws Exception {
  29. ChannelPipeline pipeline = ch.pipeline();
  30. //指定ProtoBuf解码器对哪种对象进行解码
  31. pipeline.addLast("proDecoder",new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
  32. pipeline.addLast(new NettyServerHandler()); //将自定义处理器加入到PipeLine
  33. }
  34. }); //给 WorkerGroup 的 EventLoop 对应的管道设置处理器
  35. System.out.println("服务器 is ready .....");
  36. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  37. ChannelFuture cf = bootstrap.bind(6666).sync();
  38. //给CF注册监听器,监控关心的事件
  39. cf.addListener(new ChannelFutureListener() {
  40. @Override
  41. public void operationComplete(ChannelFuture channelFuture) throws Exception {
  42. if(cf.isSuccess()){
  43. System.out.println("监听端口成功");
  44. }else{
  45. System.out.println("监听端口失败");
  46. }
  47. }
  48. });
  49. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  50. cf.channel().closeFuture().sync();
  51. } finally {
  52. bossGroup.shutdownGracefully(); //关闭资源
  53. workerGroup.shutdownGracefully(); //关闭
  54. }
  55. }
  56. }

服务端Handler:

  1. package protobuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import java.nio.charset.StandardCharsets;
  6. import java.time.LocalDateTime;
  7. import java.util.concurrent.TimeUnit;
  8. /** 自定义管道 Handler */
  9. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  10. /*当有读取事件时该方法将被触发*/
  11. @Override
  12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  13. //读取从客户端发送的StudentPojo.Student
  14. StudentPOJO.Student student = (StudentPOJO.Student) msg;
  15. System.out.println("客户端发送的数据 id="+student.getId()+" 名称: "+student.getName());
  16. }
  17. /* 数据读取完毕后触发
  18. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  19. */
  20. @Override
  21. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  22. //对发送数据进行编码后写入到缓存并刷新
  23. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  24. }
  25. /* 处理异常,一般为关闭通道 */
  26. @Override
  27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  28. ctx.close(); //和 ctx.channel().close() 是一个意思
  29. }
  30. }

测试:
image.png
服务端接收并解码客户端发送的Protobuf封装数据

多数据类型发送:

客户端可能发送不同的实体类对象,服务端不同实体类对象的业务操作逻辑也不相同
相较于单数据类型发送,需要再proto文件上添加枚举类来控制具体选择的 Java 对象
源码地址

1、创建ProtoBuf传输对象
创建一个文件名为 StudentPOJO 的 proto 文件

  1. syntax = "proto3";
  2. option optimize_for = SPEED; //加快解析
  3. option java_package = "protobuf.morepojo"; //指定生成到哪个包下
  4. option java_outer_classname = "MyStudentInfo"; //文件名,同时也是外部类名
  5. //真正发送的实体类对象,内部参数需要参照 protobyf官方语言指南
  6. //protobuf 可以使用 message 管理其他的message
  7. message MyMessage{
  8. //定义一个枚举类型,根据枚举类型的值来判断取要取哪个对象实例
  9. enum DataType{
  10. StudentType = 0; //在proto3要求enum内值的编号从0开始
  11. TeacherType = 1;
  12. }
  13. DataType data_type = 1; //将枚举值定义为属性,利用data_type判断实际传的是哪个枚举类型,每个属性的编号在proto中必须是唯一的
  14. //oneof表示实际调用dataBody时,只能选择其中的一个进行操作
  15. oneof dataBody{
  16. Student student = 2;
  17. Teacher teacher = 3;
  18. }
  19. }
  20. message Student{
  21. int32 id = 1; //相当于 private int id , 1相当于属性序号
  22. string name = 2;
  23. }
  24. message Teacher{
  25. string name = 1;
  26. int32 age = 2;
  27. }

2、编译文件
访问 GitHub下载 Proto-3.19.3-win 资源包,解压后对刚才编写的 proto 文件进行编译,编译后放入项目中

  1. protoc.exe --java_out=. 文件名.proto
  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1645091863467-74cc33f2-e975-447b-87e4-1ee5cecc90b6.png#clientId=u91c76975-c5b2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=183&id=MBiIJ&margin=%5Bobject%20Object%5D&name=image.png&originHeight=275&originWidth=658&originalType=binary&ratio=1&rotation=0&showTitle=false&size=18382&status=done&style=stroke&taskId=u87cd738f-9eb3-4de8-aa3d-c5f41f4c5b9&title=&width=438.6666666666667)<br />** 编译proto文件为Java文件**

3、创建客户端和服务端
客户端代码和服务端代码与单数据类型发送案例中一致,只有Handler不同

客户端Handler:

  1. package protobuf.morepojo;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import protobuf.StudentPOJO;
  7. import java.nio.charset.StandardCharsets;
  8. import java.time.LocalDateTime;
  9. /** 自定义管道 Handler
  10. * 继承父类ChannelInboundHandlerAdapter的区别就是读取事件发生时的方法中的msg不会被指定成具体的类型
  11. * */
  12. public class NettyServerHandler extends SimpleChannelInboundHandler<MyStudentInfo.MyMessage> {
  13. /*当有读取事件时该方法将被触发*/
  14. @Override
  15. protected void channelRead0(ChannelHandlerContext ctx, MyStudentInfo.MyMessage msg) throws Exception {
  16. //根据dataType来显示不同的信息
  17. MyStudentInfo.MyMessage.DataType dataType = msg.getDataType();
  18. if(dataType == MyStudentInfo.MyMessage.DataType.StudentType){
  19. MyStudentInfo.Student student = msg.getStudent();
  20. System.out.println("获取到的学生信息: "+student.getName()+" id: "+student.getId());
  21. }else if(dataType == MyStudentInfo.MyMessage.DataType.TeacherType){
  22. MyStudentInfo.Teacher teacher = msg.getTeacher();
  23. System.out.println("获取到的教师信息: "+teacher.getName()+" 年龄: "+teacher.getAge());
  24. }else{
  25. System.out.println("传输的类型不正确");
  26. }
  27. }
  28. /* 数据读取完毕后触发
  29. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  30. */
  31. @Override
  32. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  33. //对发送数据进行编码后写入到缓存并刷新
  34. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  35. }
  36. /* 处理异常,一般为关闭通道 */
  37. @Override
  38. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  39. ctx.close(); //和 ctx.channel().close() 是一个意思
  40. }
  41. }

客户端Handler:

  1. package protobuf.morepojo;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.util.CharsetUtil;
  6. import protobuf.StudentPOJO;
  7. import java.util.Random;
  8. /* 自定义管道 Handler */
  9. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  10. /* 当通道就绪就会触发该方法
  11. * 利用ProtoBuf发送数据
  12. * */
  13. @Override
  14. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  15. /* 随机发送Student或Teacher对象*/
  16. int random = new Random().nextInt(3);
  17. MyStudentInfo.MyMessage myMessage= null;
  18. if(random%2==0){
  19. myMessage = MyStudentInfo.MyMessage.newBuilder()
  20. .setDataType(MyStudentInfo.MyMessage.DataType.StudentType)
  21. .setStudent(MyStudentInfo.MyMessage.newBuilder().getStudentBuilder().setId(998).setName("马超").build())
  22. .build();
  23. }else{
  24. myMessage = MyStudentInfo.MyMessage.newBuilder()
  25. .setDataType(MyStudentInfo.MyMessage.DataType.StudentType)
  26. .setTeacher(MyStudentInfo.MyMessage.newBuilder().getTeacherBuilder().setAge(25).setName("诸葛老师"))
  27. .build();
  28. }
  29. ctx.writeAndFlush(myMessage);
  30. }
  31. /*当通道有读取数据事件时触发
  32. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  33. * 参数二: Object 客户端发送的数据,默认是Object需要转换
  34. */
  35. @Override
  36. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  37. ByteBuf buf = (ByteBuf) msg;
  38. System.out.println("服务器回复的消息: "+buf.toString(CharsetUtil.UTF_8));
  39. System.out.println("服务器的地址: "+ctx.channel().remoteAddress());
  40. }
  41. /* 异常发生时触发 */
  42. @Override
  43. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  44. cause.printStackTrace();
  45. ctx.close();
  46. }
  47. }

测试:
服务端成功获取到客户端随机发送的数据
image.png


自定义编码器与解码器:

可以继承 ByteToMessageDecoder 或者 ReplayingDecode 实现自定义解码器。ByteToMessageDecoder 不会对消息进行拆分;ReplayingDecode 会自动对数据进行拆分,但需要指定参数,参数代表用户状态管理的类型,参数为void时代表不需要状态管理
通过继承 MessageToByteEncoder<数据类型> 实现自定义编码器,数据类型为channel中msg数据的类型
通过继承 ByteToMessageCodec<数据类型> 实现自定义编/解码器,数据类型为channel中msg数据的类型

ReplayingDecoder缺点:

1、并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException.
2、ReplayingDecoder在某些情况下可能稍慢于其父类ByteToMessageDecoder,例如网络缓慢并且消息格
式复杂时,消息会被拆成了多个碎片,速度变慢

案例:

使用自定义编码器与解码器对数据进行操作

自定义解码器:

  1. package inboundhandlerandoutboundhandler;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ReplayingDecoder;
  5. import java.util.List;
  6. /* 自定义解码器,参数代表用户状态管理的类型,其中void代表不需要状态管理 */
  7. public class MyByteToLongDecoder extends ReplayingDecoder<Void> {
  8. /* decode会根据接收的数据被调用多次,直到确定没有新的元素被泰诺健爱到List
  9. 或者是ByteBuf没有更多的可读字节为止
  10. * @param ctx: 上下文对象
  11. * @param in: 入栈的ByteBuf
  12. * @param out: List集合,将解码后的数据传给下一个ChannelInboundHandler进行处理,该处理器方法也会被调用多次
  13. */
  14. @Override
  15. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  16. System.out.println("MyByteToLongDecoder方法被调用");
  17. out.add(in.readLong());
  18. }
  19. }

自定义编码器:

  1. package inboundhandlerandoutboundhandler;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToByteEncoder;
  5. /* 自定义编码器 */
  6. public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
  7. //编码方法
  8. @Override
  9. protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
  10. System.out.println("MyLongToByteEncoder的Encode方法被调用");
  11. System.out.println("编码后的Msg=" + msg);
  12. out.writeLong(msg);
  13. }
  14. }

服务端:

  1. package inboundhandlerandoutboundhandler;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. public class MyServer {
  11. public static void main(String[] args) throws Exception{
  12. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  13. EventLoopGroup workerGroup = new NioEventLoopGroup();
  14. try {
  15. ServerBootstrap serverBootstrap = new ServerBootstrap();
  16. serverBootstrap.group(bossGroup,workerGroup)
  17. .channel(NioServerSocketChannel.class)
  18. .childHandler(new ChannelInitializer<SocketChannel>() {
  19. @Override
  20. protected void initChannel(SocketChannel ch) throws Exception {
  21. ChannelPipeline pipeline = ch.pipeline();
  22. pipeline.addLast(new MyByteToLongDecoder()); //入站的handler进行解码 MyByteToLongDecoder
  23. pipeline.addLast(new MyLongToByteEncoder()); //出站的handler进行编码
  24. pipeline.addLast(new MyServerHandler()); //自定义的handler 处理业务逻辑
  25. }
  26. });
  27. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  28. channelFuture.channel().closeFuture().sync();
  29. }finally {
  30. bossGroup.shutdownGracefully();
  31. workerGroup.shutdownGracefully();
  32. }
  33. }
  34. }

服务端Handler:

  1. package inboundhandlerandoutboundhandler;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. /* 自定义服务端Handler */
  5. public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
  6. @Override
  7. protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
  8. System.out.println("从客户端" + ctx.channel().remoteAddress() + " 读取到long " + msg);
  9. }
  10. /* 数据读取完毕后触发 */
  11. @Override
  12. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  13. System.out.println("全部接收完毕,开始返回指定数据给客户端");
  14. ctx.writeAndFlush(98765L); //给客户端返回数据
  15. }
  16. /* 当有异常的触发 */
  17. @Override
  18. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  19. cause.printStackTrace();
  20. ctx.close();
  21. }
  22. }

客户端:

  1. package inboundhandlerandoutboundhandler;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. public class MyClient {
  11. public static void main(String[] args) throws Exception{
  12. EventLoopGroup group = new NioEventLoopGroup();
  13. try {
  14. Bootstrap bootstrap = new Bootstrap();
  15. bootstrap.group(group).channel(NioSocketChannel.class)
  16. .handler(new ChannelInitializer<SocketChannel>() {
  17. @Override
  18. protected void initChannel(SocketChannel ch) throws Exception {
  19. ChannelPipeline pipeline = ch.pipeline();
  20. pipeline.addLast(new MyLongToByteEncoder()); //加入一个出站的handler 对数据进行一个编码
  21. pipeline.addLast(new MyByteToLongDecoder()); //这时一个入站的解码器(入站handler )
  22. pipeline.addLast(new MyClientHandler()); //加入一个自定义的handler处理业务
  23. }
  24. });
  25. ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
  26. channelFuture.channel().closeFuture().sync();
  27. }finally {
  28. group.shutdownGracefully();
  29. }
  30. }
  31. }

客户端Handler:

  1. package inboundhandlerandoutboundhandler;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.util.CharsetUtil;
  6. /* 自定义客户端Handler */
  7. public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
  8. @Override
  9. protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
  10. System.out.println("收到服务器消息=" + msg);
  11. }
  12. @Override
  13. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  14. System.out.println("MyClientHandler 发送数据");
  15. //ctx.writeAndFlush(123456L); //发送的是一个long
  16. ctx.writeAndFlush(Unpooled.copiedBuffer("abcdefghijklmnop", CharsetUtil.UTF_8));
  17. }
  18. }

测试:

启动客户端与服务端,客户端将消息编码后发送给服务端,服务端收到消息后解码进行输出;服务端收到全部消息后返回数据到编码器,编码器编码后再传递给客户端
image.png
客户端与服务端成功使用自定义编码器与解码器