4. Netty高级引用

4.1 Netty编解码器

4.1.1 java的编解码

  1. 编码(encode)称为序列化,它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途
  2. 解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作

image.png
java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
Java序列化目的:1.网络传输。2.对象持久化。
Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低。
Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框
架,这些编解码框架实现消息的高效序列化。

4.1.2 Netty的编解码

4.1.2.1 概念

在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络
中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进
行解码。
对于Netty而言,编解码器由两部分组成:编码器、解码器。

  • 解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
  • 编码器:将消息对象转成字节或其他序列形式在网络上传输。

Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所
以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站
OutboundHandler” 数据。

4.1.2.2 解码器(Decoder)

解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象
ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中
主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder
image.png
抽象解码器

  • ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
  • ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
  • MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)

核心方法:

  1. decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)

代码实现:
解码器:

  1. package com.lagou.codec;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToMessageDecoder;
  5. import io.netty.util.CharsetUtil;
  6. import java.util.List;
  7. /*** 消息解码-可以将字符串消息进行在进行解码. 只有消息入站时才会进行解码 */
  8. public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  9. @Override
  10. protected void decode(ChannelHandlerContext ctx, ByteBuf in,
  11. List<Object> out) throws Exception {
  12. System.out.println("正在进行消息解码");
  13. out.add(in.toString(CharsetUtil.UTF_8));
  14. }
  15. }

通道读取方法:

  1. /**
  2. * 通道读取事件
  3. *
  4. * @param ctx
  5. * @param msg
  6. * @throws Exception
  7. */
  8. @Override
  9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  10. System.out.println("客户端发送过来的消息:" + msg);
  11. }

启动类:

  1. protected void initChannel(SocketChannel ch) throws Exception {
  2. //8. 向pipeline中添加自定义业务处理handler
  3. ch.pipeline().addLast(new MessageDecoder());//添加解码器
  4. ch.pipeline().addLast(new NettyServerHandler());
  5. }

4.1.2.3 编码器(Encoder)

与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器
实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现
ChannelOutboundHandler接口。
image.png
抽象编码器

  • MessageToByteEncoder: 将消息转化成字节
  • MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)

核心方法:

  1. encode(ChannelHandlerContext ctx, String msg, List<Object> out)

代码实现:
编码器:

  1. package com.lagou.codec;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.MessageToMessageEncoder;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.List;
  8. /*** 编码器 */
  9. public class MessageEncoder extends MessageToMessageEncoder<String> {
  10. @Override
  11. protected void encode(ChannelHandlerContext ctx, String msg,
  12. List<Object> out) throws Exception {
  13. System.out.println("消息进行消息编码");
  14. out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
  15. }
  16. }

消息发送:

  1. /**
  2. * 通道就绪事件
  3. *
  4. * @param ctx
  5. * @throws Exception
  6. */
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  9. ChannelFuture future = ctx.writeAndFlush("你好呀.我是Netty客户端");
  10. future.addListener(new ChannelFutureListener() {
  11. @Override
  12. public void operationComplete(ChannelFuture future) throws Exception{
  13. if (future.isSuccess()) {
  14. System.out.println("数据发送成功!");
  15. } else {
  16. System.out.println("数据发送失败!");
  17. }
  18. }
  19. });
  20. }

启动类:

  1. @Override
  2. protected void initChannel(SocketChannel ch) throws Exception {
  3. //6. 向pipeline中添加自定义业务处理handler
  4. ch.pipeline().addLast(new MessageDecoder());//添加解码器
  5. ch.pipeline().addLast(new MessageEncoder());//添加编码器
  6. ch.pipeline().addLast(new NettyClientHandler());
  7. }

4.1.2.4 编码解码器(Codec)

编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和
ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。
image.png
Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类
ByteToMessageCodec ,MessageToMessageCodec都继承与此类.
代码实现:

  1. /**
  2. * 编解码器
  3. */
  4. public class MessageCoder extends MessageToMessageCodec {
  5. @Override
  6. protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
  7. System.out.println("正在进行消息编码");
  8. String str = (String) msg;
  9. out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
  10. }
  11. @Override
  12. protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
  13. System.out.println("正在进行消息解码");
  14. ByteBuf byteBuf = (ByteBuf) msg;
  15. out.add(byteBuf.toString(CharsetUtil.UTF_8));
  16. }
  17. }

启动类:

  1. protected void initChannel(SocketChannel ch) throws Exception {
  2. //8. 向pipeline中添加自定义业务处理handler
  3. ch.pipeline().addLast(new MessageCoder());//添加编解码器
  4. ch.pipeline().addLast(new NettyServerHandler());
  5. }

4.2 案例 - 群聊天

案例要求:
1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
2. 实现多人群聊
3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
4. 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

4.2.1 聊天室服务端编写

  1. NettyChatServer ```java package com.lagou.chat; import com.lagou.demo.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /*
  • 聊天室服务端 */ public class NettyChatServer { //端口号 private int port; public NettyChatServer(int port) {
    1. this.port = port;
    } public void run() throws InterruptedException {
    1. //1. 创建bossGroup线程组: 处理网络事件--连接事件
    2. EventLoopGroup bossGroup = null;
    3. //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
    4. EventLoopGroup workerGroup = null;
    5. try {
    6. bossGroup = new NioEventLoopGroup(1);
    7. workerGroup = new NioEventLoopGroup();
    8. //3. 创建服务端启动助手
    9. ServerBootstrap serverBootstrap = new ServerBootstrap();
    10. //4. 设置bossGroup线程组和workerGroup线程组
    11. serverBootstrap.group(bossGroup, workerGroup)
    12. .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
    13. .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
    14. .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置
    15. .childHandler(new ChannelInitializer<SocketChannel>() {
    16. //7. 创建一个通道初始化对象
    17. @Override
    18. protected void initChannel(SocketChannel ch) throws Exception {
    19. //8. 向pipeline中添加自定义业务处理handler
    20. //添加编解码器
    21. ch.pipeline().addLast(new StringDecoder());
    22. ch.pipeline().addLast(new StringEncoder());
    23. // todo
    24. ch.pipeline().addLast(new
    25. NettyChatServerHandler());
    26. }
    27. });
    28. //9. 启动服务端并绑定端口,同时将异步改为同步
    29. ChannelFuture future = serverBootstrap.bind(port);
    30. future.addListener(new ChannelFutureListener() {
    31. @Override
    32. public void operationComplete(ChannelFuture future) throws Exception {
    33. if (future.isSuccess()) {
    34. System.out.println("端口绑定成功!");
    35. } else {
    36. System.out.println("端口绑定失败!");
    37. }
    38. }
    39. });
    40. System.out.println("聊天室服务端启动成功.");
    41. //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
    42. future.channel().closeFuture().sync();
    43. } finally {
    44. bossGroup.shutdownGracefully();
    45. workerGroup.shutdownGracefully();
    46. }
    } public static void main(String[] args) throws InterruptedException {
    1. new NettyChatServer(9998).run();
    } } ```
  1. NettyChatServerHandle ```java package com.lagou.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.ArrayList; import java.util.List; /**
  • 聊天室业务处理类 / public class NettyChatServerHandler extends SimpleChannelInboundHandler { public static List channelList = new ArrayList<>(); /*
    • 通道就绪事件 *
    • @param ctx
    • @throws Exception / @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有新的客户端连接的时候, 将通道放入集合 channelList.add(channel); System.out.println(“[Server]:” + channel.remoteAddress().toString().substring(1) + “在线.”); } /*
    • 通道未就绪—channel下线 *
    • @param ctx
    • @throws Exception / @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception{ Channel channel = ctx.channel(); //当有客户端断开连接的时候,就移除对应的通道 channelList.remove(channel); System.out.println(“[Server]:” + channel.remoteAddress().toString().substring(1) + “下线.”); } /*
    • 通道读取事件 *
    • @param ctx
    • @param msg
    • @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //当前发送消息的通道, 当前发送的客户端连接 Channel channel = ctx.channel(); for (Channel channel1 : channelList) {
      1. //排除自身通道
      2. if (channel != channel1) {
      3. channel1.writeAndFlush("[" + channel.remoteAddress().toString().substring(1) + "]说:" + msg);
      4. }
      } } /**
    • 异常处理事件 *
    • @param ctx
    • @param cause
    • @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); //移除集合 channelList.remove(channel); System.out.println(“[Server]:” + channel.remoteAddress().toString().substring(1) + “异常.”); } } ```

      4.2.2 聊天室客户端编写

  1. NettyChatClient ```java package com.lagou.chat; import com.lagou.demo.NettyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; /**
  • 聊天室的客户端 */ public class NettyChatClient { private String ip;//服务端IP private int port;//服务端端口号 public NettyChatClient(String ip, int port) {
    1. this.ip = ip;
    2. this.port = port;
    } public void run() throws InterruptedException {
    1. //1. 创建线程组
    2. EventLoopGroup group = null;
    3. try {
    4. group = new NioEventLoopGroup();
    5. //2. 创建客户端启动助手
    6. Bootstrap bootstrap = new Bootstrap();
    7. //3. 设置线程组
    8. bootstrap.group(group)
    9. .channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
    10. .handler(new ChannelInitializer<SocketChannel>() { //5.创建一个通道初始化对象
    11. @Override
    12. protected void initChannel(SocketChannel ch) throws Exception {
    13. //6. 向pipeline中添加自定义业务处理handler
    14. //添加编解码器
    15. ch.pipeline().addLast(new StringDecoder());
    16. ch.pipeline().addLast(new StringEncoder());
    17. //添加客户端的处理类
    18. ch.pipeline().addLast(new
    19. NettyChatClientHandler());
    20. }
    21. });
    22. //7. 启动客户端,等待连接服务端,同时将异步改为同步
    23. ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
    24. Channel channel = channelFuture.channel();
    25. System.out.println("-------" +
    26. channel.localAddress().toString().substring(1) + "--------");
    27. Scanner scanner = new Scanner(System.in);
    28. while (scanner.hasNextLine()) {
    29. String msg = scanner.nextLine();
    30. //向服务端发送消息
    31. channel.writeAndFlush(msg);
    32. }
    33. //8. 关闭通道和关闭连接池
    34. channelFuture.channel().closeFuture().sync();
    35. } finally {
    36. group.shutdownGracefully();
    37. }
    } public static void main(String[] args) throws InterruptedException {
    1. new NettyChatClient("127.0.0.1", 9998).run();
    } } ```
  1. NettyChatClientHandle ```java package com.lagou.chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /**
  • 聊天室处理类 / public class NettyChatClientHandler extends SimpleChannelInboundHandler { /*
    • 通道读取就绪事件 *
    • @param ctx
    • @param msg
    • @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } } ```

4.3 基于Netty的Http服务器开发

4.3.1 介绍

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。
image.png

4.3.2 功能需求

  1. Netty 服务器在 8080 端口监听
    2. 浏览器发出请求 “http://localhost:8080/
    3. 服务器可以回复消息给客户端 “Hello! 我是Netty服务器 “ ,并对特定请求资源进行过滤.

    4.3.3 服务端代码实现

  2. NettyHttpServer ```java package com.lagou.http; import com.lagou.chat.NettyChatServer; import com.lagou.chat.NettyChatServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /*
  • 聊天室服务端 */ public class NettyHttpServer { //端口号 private int port; public NettyHttpServer(int port) {
    1. this.port = port;
    } public void run() throws InterruptedException {
    1. //1. 创建bossGroup线程组: 处理网络事件--连接事件
    2. EventLoopGroup bossGroup = null;
    3. //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
    4. EventLoopGroup workerGroup = null;
    5. try {
    6. bossGroup = new NioEventLoopGroup(1);
    7. workerGroup = new NioEventLoopGroup();
    8. //3. 创建服务端启动助手
    9. ServerBootstrap serverBootstrap = new ServerBootstrap();
    10. //4. 设置bossGroup线程组和workerGroup线程组
    11. serverBootstrap.group(bossGroup, workerGroup)
    12. .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
    13. .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
    14. .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)//6. 参数设置
    15. .childHandler(new ChannelInitializer<SocketChannel>() {
    16. //7. 创建一个通道初始化对象
    17. @Override
    18. protected void initChannel(SocketChannel ch) throws Exception {
    19. //8. 向pipeline中添加自定义业务处理handler
    20. //添加编解码器
    21. ch.pipeline().addLast(new HttpServerCodec());
    22. // 自定义业务处理类
    23. ch.pipeline().addLast(new NettyHttpServerHandler());
    24. }
    25. });
    26. //9. 启动服务端并绑定端口,同时将异步改为同步
    27. ChannelFuture future = serverBootstrap.bind(port);
    28. future.addListener(new ChannelFutureListener() {
    29. @Override
    30. public void operationComplete(ChannelFuture future) throws Exception {
    31. if (future.isSuccess()) {
    32. System.out.println("端口绑定成功!");
    33. } else {
    34. System.out.println("端口绑定失败!");
    35. }
    36. }
    37. });
    38. System.out.println("http服务端启动成功.");
    39. //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
    40. future.channel().closeFuture().sync();
    41. } finally {
    42. bossGroup.shutdownGracefully();
    43. workerGroup.shutdownGracefully();
    44. }
    } public static void main(String[] args) throws InterruptedException {
    1. new NettyHttpServer(8080).run();
    } } 2.NettyHttpServerHandlejava package com.lagou.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.; import io.netty.util.CharsetUtil; /*
  • http服务器处理类 / public class NettyHttpServerHandler extends SimpleChannelInboundHandler { /*
    • 读取就绪事件 *
    • @param ctx
    • @param msg
    • @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { //1.判断请求是不是HTTP请求 if (msg instanceof HttpRequest) {
      1. DefaultHttpRequest request = (DefaultHttpRequest) msg;
      2. System.out.println("浏览器请求路径:" + request.uri());
      3. if ("/favicon.ico".equals(request.uri())) {
      4. System.out.println("图标不响应");
      5. return;
      6. }
      7. //2.给浏览器进行响应
      8. ByteBuf byteBuf = Unpooled.copiedBuffer("Hello! 我是Netty服务器 ", CharsetUtil.UTF_8);
      9. DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
      10. //2.1 设置响应头
      11. response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
      12. response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
      13. ctx.writeAndFlush(response);
      } } } ```

4.4 基于Netty的WebSocket开发网页版聊天室

4.4.1 WebSocket简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
应用场景十分广泛:
1. 社交订阅
2. 协同编辑/编程
3. 股票基金报价
4. 体育实况更新
5. 多媒体聊天
6. 在线教育

4.4.2 WebSocket和HTTP的区别

http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端要想实时获取服务端消息就得不断发送长连接到服务端.
WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head的部分信息了与http的长链接通信来说,这种方式,不仅能降低服务器的压力。而且信息当中也减少了部分多余的信息。

4.4.3 导入基础环境

  1. 将资料中Netty-Springboot工程导入到idea
    image.png
    2. 相关依赖
    1. <!--整合web模块-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-web</artifactId>
    5. </dependency>
    6. <!--整合模板引擎 -->
    7. <dependency>
    8. <groupId>org.springframework.boot</groupId>
    9. <artifactId>spring-boot-starter-thymeleaf</artifactId>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.projectlombok</groupId>
    13. <artifactId>lombok</artifactId>
    14. </dependency>
  2. 静态资源
    image.png
    4. yam配置
    1. server:
    2. port: 8080
    3. resources:
    4. static-locations:
    5. - classpath:/static/
    6. spring:
    7. thymeleaf:
    8. cache: false
    9. checktemplatelocation: true
    10. enabled: true
    11. encoding: UTF-8
    12. mode: HTML5
    13. prefix: classpath:/templates/
    14. suffix: .html

    4.4.4 服务端开发

  3. 添加Netty依赖
    1. <!--引入netty依赖 -->
    2. <dependency>
    3. <groupId>io.netty</groupId>
    4. <artifactId>netty-all</artifactId>
    5. </dependency>
  4. Netty相关配置
    1. netty:
    2. port: 8081
    3. path: /chat
  5. Netty配置类
    1. package com.lagou.config;
    2. import lombok.Data;
    3. import org.springframework.boot.context.properties.ConfigurationProperties;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. @ConfigurationProperties(prefix = "netty")
    7. @Data
    8. public class NettyConfig {
    9. private int port;//netty监听的端口
    10. private String path;//websocket访问路径
    11. }
  6. NettyWebSocketServer开发 ```java package com.lagou.netty; import com.lagou.config.NettyConfig; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; /**
  • Netty服务器 / @Component public class NettyWebSocketServer implements Runnable { @Autowired NettyConfig nettyConfig; @Autowired WebSocketChannelInit webSocketChannelInit; private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(); /*
    • 资源关闭—在容器销毁是关闭 */ @PreDestroy public void close() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } @Override public void run() { try {
      1. //1.创建服务端启动助手
      2. ServerBootstrap serverBootstrap = new ServerBootstrap();
      3. //2.设置线程组
      4. serverBootstrap.group(bossGroup, workerGroup);
      5. //3.设置参数
      6. serverBootstrap.channel(NioServerSocketChannel.class)
      7. .handler(new LoggingHandler(LogLevel.DEBUG))
      8. .childHandler(webSocketChannelInit);
      9. //4.启动
      10. ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync();
      11. System.out.println("--Netty服务端启动成功---");
      12. channelFuture.channel().closeFuture().sync();
      } catch (Exception e) {
      1. e.printStackTrace();
      2. bossGroup.shutdownGracefully();
      3. workerGroup.shutdownGracefully();
      } finally {
      1. bossGroup.shutdownGracefully();
      2. workerGroup.shutdownGracefully();
      } } } ```
  1. 通道初始化对象 ```java package com.lagou.netty; import com.lagou.config.NettyConfig; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

/**

  • 通道初始化对象 */ @Component public class WebSocketChannelInit extends ChannelInitializer { @Autowired NettyConfig nettyConfig; @Autowired WebSocketHandler webSocketHandler; @Override protected void initChannel(Channel channel) throws Exception {
    1. ChannelPipeline pipeline = channel.pipeline();
    2. //对http协议的支持.
    3. pipeline.addLast(new HttpServerCodec());
    4. // 对大数据流的支持
    5. pipeline.addLast(new ChunkedWriteHandler());
    6. //post请求分三部分. request line / request header / message body
    7. // HttpObjectAggregator将多个信息转化成单一的request或者response对象
    8. pipeline.addLast(new HttpObjectAggregator(8000));
    9. // 将http协议升级为ws协议. websocket的支持
    10. pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
    11. // 自定义处理handler
    12. pipeline.addLast(webSocketHandler);
    } } ```
  1. 处理对象 ```java package com.lagou.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; /**
  • 自定义处理类
  • TextWebSocketFrame: websocket数据是帧的形式处理 / @Component @ChannelHandler.Sharable //设置通道共享 public class WebSocketHandler extends SimpleChannelInboundHandler { public static List channelList = new ArrayList<>(); /*
    • 通道就绪事件 *
    • @param ctx
    • @throws Exception / @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有新的客户端连接的时候, 将通道放入集合 channelList.add(channel); System.out.println(“有新的连接.”); } /*
    • 通道未就绪—channel下线 *
    • @param ctx
    • @throws Exception / @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有客户端断开连接的时候,就移除对应的通道 channelList.remove(channel); } /*
    • 读就绪事件 *
    • @param ctx
    • @param textWebSocketFrame
    • @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { String msg = textWebSocketFrame.text(); System.out.println(“msg:” + msg); //当前发送消息的通道, 当前发送的客户端连接 Channel channel = ctx.channel(); for (Channel channel1 : channelList) {
      1. //排除自身通道
      2. if (channel != channel1) {
      3. channel1.writeAndFlush(new TextWebSocketFrame(msg));
      4. }
      } } /**
    • 异常处理事件 *
    • @param ctx
    • @param cause
    • @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); //移除集合 channelList.remove(channel); } } ```
  1. 启动类
    1. package com.lagou;
    2. import com.lagou.netty.NettyWebSocketServer;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.boot.CommandLineRunner;
    5. import org.springframework.boot.SpringApplication;
    6. import org.springframework.boot.autoconfigure.SpringBootApplication;
    7. @SpringBootApplication
    8. public class NettySpringbootApplication implements CommandLineRunner {
    9. @Autowired
    10. NettyWebSocketServer nettyWebSocketServer;
    11. public static void main(String[] args) {
    12. SpringApplication.run(NettySpringbootApplication.class, args);
    13. }
    14. @Override
    15. public void run(String... args) throws Exception {
    16. new Thread(nettyWebSocketServer).start();
    17. }
    18. }
  2. 前端js开发
    1. $(function () {
    2. //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
    3. var username = "";
    4. while (true) {
    5. //弹出一个输入框,输入一段文字,可以提交
    6. username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
    7. if (username.trim() === ""){
    8. //如果返回的有内容
    9. alert("名称不能输入空")
    10. } else {
    11. $("#username").text(username);
    12. break;
    13. }
    14. }
    15. var ws = new WebSocket("ws://localhost:8081/chat");
    16. ws.onopen = function () {
    17. console.log("连接成功.")
    18. }
    19. ws.onmessage = function (evt) {
    20. showMessage(evt.data);
    21. }
    22. ws.onclose = function (){
    23. console.log("连接关闭")
    24. }
    25. ws.onerror = function (){
    26. console.log("连接异常")
    27. }
    28. function showMessage(message) {
    29. // 张三:你好
    30. var str = message.split(":");
    31. $("#msg_list").append(`<li class="active"}><div class="main"><img class="avatar" width="30" height="30" src="/img/user.png">
    32. <div>
    33. <div class="user_name">${str[0]}
    34. </div>
    35. <div class="text">${str[1]}</div>
    36. </div>
    37. </div>
    38. </li>`);
    39. // 置底
    40. setBottom();
    41. }
    42. $('#my_test').bind({
    43. focus: function (event) {
    44. event.stopPropagation()
    45. $('#my_test').val('');
    46. $('.arrow_box').hide()
    47. },
    48. keydown: function (event) {
    49. event.stopPropagation()
    50. if (event.keyCode === 13) {
    51. if ($('#my_test').val().trim() === '') {
    52. this.blur()
    53. $('.arrow_box').show()
    54. setTimeout(() => {
    55. this.focus()
    56. }, 1000)
    57. } else {
    58. $('.arrow_box').hide()
    59. //发送消息
    60. sendMsg();
    61. this.blur()
    62. setTimeout(() => {
    63. this.focus()
    64. })
    65. }
    66. }
    67. }
    68. });
    69. $('#send').on('click', function (event) {
    70. event.stopPropagation()
    71. if ($('#my_test').val().trim() === '') {
    72. $('.arrow_box').show()
    73. } else {
    74. sendMsg();
    75. }
    76. })
    77. function sendMsg() {
    78. var message = $("#my_test").val();
    79. $("#msg_list").append(`<li class="active"}>
    80. <div class="main self">
    81. <div class="text">` + message +
    82. `</div>
    83. </div>
    84. </li>`);
    85. $("#my_test").val('');
    86. //发送消息
    87. message = username + ":" + message;
    88. ws.send(message);
    89. // 置底
    90. setBottom();
    91. }
    92. // 置底
    93. function setBottom() {
    94. // 发送消息后滚动到底部
    95. const container = $('.m-message')
    96. const scroll = $('#msg_list')
    97. container.animate({
    98. scrollTop: scroll[0].scrollHeight - container[0].clientHeight +
    99. container.scrollTop() + 100
    100. });
    101. }
    102. });

4.5 Netty中粘包和拆包的解决方案

4.5.1 粘包和拆包简介

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
image.png
2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
image.png
3. 如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
image.png
4. 如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包
image.png
TCP粘包和拆包产生的原因:
数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

4.5.2 粘包和拆包代码演示

  1. 粘包
    客户端 ```java /**
  • 通道就绪事件 *
  • @param ctx
  • @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) {
    1. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端" + i,
    2. CharsetUtil.UTF_8));
    } } 服务端java public int count = 0; /**
  • 通道读取事件 *
  • @param ctx
  • @param msg
  • @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println(“客户端发送过来的消息:” + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println(“读取次数:”+(++count)); }
    1. 运行结果:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/315585/1626885211780-2b77b1b1-21bc-4c9b-8467-0488dc6dd76a.png#align=left&display=inline&height=95&margin=%5Bobject%20Object%5D&name=image.png&originHeight=189&originWidth=1303&size=25076&status=done&style=none&width=651.5)<br />**服务端一次读取了客户端发送过来的消息,应该读取10次. 因此发生粘包.**<br />2. 拆包<br />客户端
    2. ```java
    3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    4. //一次发送102400字节数据
    5. byte[] bytes = new byte[102400];
    6. Arrays.fill(bytes, (byte) 10);
    7. for (int i = 0; i < 10; i++) {
    8. ctx.writeAndFlush(Unpooled.copiedBuffer(bytes));
    9. }
    10. }

服务端

  1. public int count = 0;
  2. /**
  3. * 通道读取事件
  4. *
  5. * @param ctx
  6. * @param msg
  7. * @throws Exception
  8. */
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  11. ByteBuf byteBuf = (ByteBuf) msg;
  12. System.out.println("长度是:" + byteBuf.readableBytes());
  13. System.out.println("读取次数 = " + (++count));
  14. }

运行结果:
image.png
当客户端发送的数据包比较大的时候, 读取了18次, 应该读取10次, 则发送拆包事件.

4.5.3 粘包和拆包的解决方法

  1. 业内解决方案
    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

    • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
    • 将换行符作为消息结束符
    • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
    • 通过在消息头中定义长度字段来标识消息的总长度
  2. Netty中的粘包和拆包解决方案
    Netty提供了4种解码器来解决,分别如下:

    • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
    • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
    • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
    • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度
  3. 代码实现

  • LineBasedFrameDecoder解码器

    1. ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
    1. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"\n", CharsetUtil.UTF_8));
  • DelimiterBasedFrameDecoder解码器

    1. ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
    2. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
    1. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"$", CharsetUtil.UTF_8));