介绍 Netty WebSocket 整合 SpringBoot 的方式,源码

项目准备:

引入依赖:

导入 Netty 的依赖到 Pom 文件中

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.73.Final</version>
  5. </dependency>

创建服务端:

使用单例模式创建服务端,确保获取到的是同一个服务端对象

  1. package com.dmbjz.netty;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. @Slf4j
  11. public class WsServer {
  12. //创建单例模式
  13. private static class SingletionWsServer{
  14. static final WsServer instance = new WsServer();
  15. }
  16. //返回单例模式对象
  17. public static WsServer getInstance(){
  18. return SingletionWsServer.instance;
  19. }
  20. private EventLoopGroup bossGroup;
  21. private EventLoopGroup workGroup;
  22. private ServerBootstrap bootstrap;
  23. private ChannelFuture channelFuture;
  24. /* 构造方法内进行初始化 */
  25. public WsServer(){
  26. bossGroup = new NioEventLoopGroup();
  27. workGroup = new NioEventLoopGroup();
  28. bootstrap = new ServerBootstrap();
  29. bootstrap.group(bossGroup,workGroup)
  30. .channel(NioServerSocketChannel.class)
  31. .childHandler(new WsServerInitializer());
  32. }
  33. public void start(){
  34. //额外线程启动Netty而不是在Main方法中启动,因此无需异步
  35. //使用Spring容器进行托管,也无需进行关闭
  36. this.channelFuture = bootstrap.bind(8088);
  37. log.warn("Netty WebSocket Server启动完毕...");
  38. }
  39. }

添加ServerInitializer:

添加 WebSocket 的初始化器

  1. package com.dmbjz.netty;
  2. import io.netty.channel.ChannelInitializer;
  3. import io.netty.channel.ChannelPipeline;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.http.HttpObjectAggregator;
  6. import io.netty.handler.codec.http.HttpServerCodec;
  7. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  8. import io.netty.handler.stream.ChunkedWriteHandler;
  9. /* 自定义WebSocket初始化器 */
  10. public class WsServerInitializer extends ChannelInitializer<SocketChannel> {
  11. @Override
  12. protected void initChannel(SocketChannel ch) throws Exception {
  13. ChannelPipeline pipeline = ch.pipeline();
  14. //Http的编码解码器
  15. pipeline.addLast(new HttpServerCodec());
  16. //WebSocket以块的方式写数据,需要添加ChunkedWrite处理器
  17. pipeline.addLast(new ChunkedWriteHandler());
  18. /*
  19. * Http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合起来
  20. * 所以浏览器发送大量数据时就会发出多次Http请求
  21. */
  22. pipeline.addLast(new HttpObjectAggregator(1024*64));
  23. /* WebSocket的数据是以帧的形式传递,需要指定 WebSocketServerProtocolHandler和自定义处理帧数据Handler
  24. * WebSocketServerProtocolHandler的功能是将Http协议升级为WebSocket协议来保持长连接,并解析 websocket 访问地址
  25. * 浏览器访问WebSocket的形式是 ws://localhost:7070/访问路径,参数就为 “/访问路径”
  26. * 例如访问的是 ws://localhost:7070/rog16hp,这里的handler就要指定为 "/rog16hp"
  27. */
  28. pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
  29. pipeline.addLast(new ChatHandler());
  30. }
  31. }

添加消息处理Handler:

  1. package com.dmbjz.netty;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.channel.group.ChannelGroup;
  6. import io.netty.channel.group.DefaultChannelGroup;
  7. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  8. import io.netty.util.concurrent.GlobalEventExecutor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import java.time.LocalDateTime;
  11. /* 处理消息的Handler */
  12. @Slf4j
  13. public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  14. //用于记录和管理所有客户端的Channel,可以进行Channel的自动移除,因此不需要再写 handlerRemoved 方法
  15. private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  16. @Override
  17. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  18. String content = msg.text();
  19. log.info("服务器收到消息:"+content);
  20. //对所有加入到Group的channel回复消息
  21. clients.forEach(channel ->
  22. channel.writeAndFlush(new TextWebSocketFrame("[服务器接收到消息:]" +LocalDateTime.now()+",消息为:"+ content))
  23. );
  24. }
  25. /*
  26. * 当客户端连接服务端后触发
  27. * 获取客户端的Channel,并且放到ChannelGrou中去进行管理
  28. */
  29. @Override
  30. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  31. Channel channel = ctx.channel();
  32. clients.add(channel);
  33. }
  34. @Override
  35. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  36. log.warn("客户端断开,长ID为:"+ctx.channel().id().asLongText());
  37. log.warn("客户端断开,短ID为:"+ctx.channel().id().asShortText());
  38. }
  39. /* 客户端发生异常后触发 */
  40. @Override
  41. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
  42. ctx.channel().close();
  43. clients.remove(ctx.channel());
  44. }
  45. }

添加监听执行:

Netty 需要在 Spring 加载完成后进行监听执行,因此需要监听 SpringBoot Application

  1. package com.dmbjz;
  2. import com.dmbjz.netty.WsServer;
  3. import org.springframework.context.ApplicationListener;
  4. import org.springframework.context.event.ContextRefreshedEvent;
  5. import org.springframework.stereotype.Component;
  6. /* Spring加载完成后进行监听执行 */
  7. @Component
  8. public class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {
  9. /* 当一个ApplicationContext被初始化或刷新触发 */
  10. @Override
  11. public void onApplicationEvent(ContextRefreshedEvent event) {
  12. //判断当前是否为spring容器初始化完成,防止重复执行
  13. if(event.getApplicationContext().getParent() == null){
  14. try {
  15. WsServer.getInstance().start();
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. }

启动测试:

对之前在 WebSocket 章节中的页面进行修改,使其监听该案例的WebSocket地址,然后进行测试
image.png
访问页面检测监听结果

  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1649555166418-2365f7cb-5082-4c7d-954c-02dff6b0973b.png#clientId=u4e14a6b7-98cb-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=310&id=ud773b280&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1394&originWidth=2560&originalType=binary&ratio=1&rotation=0&showTitle=false&size=238988&status=done&style=stroke&taskId=u82cb5f16-d4c6-42db-8281-61c5990167b&title=&width=569)<br />** 输入消息,服务端成功接收**

使用Service:

如果在处理消息时需要使用业务逻辑 Service,只能通过获取 Bean 的方式进行调用,例如在读取状态下需要调用业务方法 UserService :

  1. import cn.hutool.extra.spring.SpringUtil;
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  4. UserService userService = SpringUtil.getBean("userServiceImpl");
  5. }