介绍 Netty WebSocket 整合 SpringBoot 的方式,源码
项目准备:
引入依赖:
导入 Netty 的依赖到 Pom 文件中
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.73.Final</version></dependency>
创建服务端:
使用单例模式创建服务端,确保获取到的是同一个服务端对象
package com.dmbjz.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;@Component@Slf4jpublic class WsServer {//创建单例模式private static class SingletionWsServer{static final WsServer instance = new WsServer();}//返回单例模式对象public static WsServer getInstance(){return SingletionWsServer.instance;}private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ServerBootstrap bootstrap;private ChannelFuture channelFuture;/* 构造方法内进行初始化 */public WsServer(){bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new WsServerInitializer());}public void start(){//额外线程启动Netty而不是在Main方法中启动,因此无需异步//使用Spring容器进行托管,也无需进行关闭this.channelFuture = bootstrap.bind(8088);log.warn("Netty WebSocket Server启动完毕...");}}
添加ServerInitializer:
添加 WebSocket 的初始化器
package com.dmbjz.netty;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.stream.ChunkedWriteHandler;/* 自定义WebSocket初始化器 */public class WsServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//Http的编码解码器pipeline.addLast(new HttpServerCodec());//WebSocket以块的方式写数据,需要添加ChunkedWrite处理器pipeline.addLast(new ChunkedWriteHandler());/** Http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合起来* 所以浏览器发送大量数据时就会发出多次Http请求*/pipeline.addLast(new HttpObjectAggregator(1024*64));/* WebSocket的数据是以帧的形式传递,需要指定 WebSocketServerProtocolHandler和自定义处理帧数据Handler* WebSocketServerProtocolHandler的功能是将Http协议升级为WebSocket协议来保持长连接,并解析 websocket 访问地址* 浏览器访问WebSocket的形式是 ws://localhost:7070/访问路径,参数就为 “/访问路径”* 例如访问的是 ws://localhost:7070/rog16hp,这里的handler就要指定为 "/rog16hp"*/pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));pipeline.addLast(new ChatHandler());}}
添加消息处理Handler:
package com.dmbjz.netty;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.util.concurrent.GlobalEventExecutor;import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;/* 处理消息的Handler */@Slf4jpublic class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//用于记录和管理所有客户端的Channel,可以进行Channel的自动移除,因此不需要再写 handlerRemoved 方法private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {String content = msg.text();log.info("服务器收到消息:"+content);//对所有加入到Group的channel回复消息clients.forEach(channel ->channel.writeAndFlush(new TextWebSocketFrame("[服务器接收到消息:]" +LocalDateTime.now()+",消息为:"+ content)));}/** 当客户端连接服务端后触发* 获取客户端的Channel,并且放到ChannelGrou中去进行管理*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();clients.add(channel);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.warn("客户端断开,长ID为:"+ctx.channel().id().asLongText());log.warn("客户端断开,短ID为:"+ctx.channel().id().asShortText());}/* 客户端发生异常后触发 */@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ctx.channel().close();clients.remove(ctx.channel());}}
添加监听执行:
Netty 需要在 Spring 加载完成后进行监听执行,因此需要监听 SpringBoot 的 Application
package com.dmbjz;import com.dmbjz.netty.WsServer;import org.springframework.context.ApplicationListener;import org.springframework.context.event.ContextRefreshedEvent;import org.springframework.stereotype.Component;/* Spring加载完成后进行监听执行 */@Componentpublic class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {/* 当一个ApplicationContext被初始化或刷新触发 */@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//判断当前是否为spring容器初始化完成,防止重复执行if(event.getApplicationContext().getParent() == null){try {WsServer.getInstance().start();} catch (Exception e) {e.printStackTrace();}}}}
启动测试:
对之前在 WebSocket 章节中的页面进行修改,使其监听该案例的WebSocket地址,然后进行测试

访问页面检测监听结果
<br />** 输入消息,服务端成功接收**
使用Service:
如果在处理消息时需要使用业务逻辑 Service,只能通过获取 Bean 的方式进行调用,例如在读取状态下需要调用业务方法 UserService :
import cn.hutool.extra.spring.SpringUtil;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {UserService userService = SpringUtil.getBean("userServiceImpl");}
