介绍 Netty WebSocket 整合 SpringBoot 的方式,源码
项目准备:
引入依赖:
导入 Netty 的依赖到 Pom 文件中
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.73.Final</version>
</dependency>
创建服务端:
使用单例模式创建服务端,确保获取到的是同一个服务端对象
package com.dmbjz.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class WsServer {
//创建单例模式
private static class SingletionWsServer{
static final WsServer instance = new WsServer();
}
//返回单例模式对象
public static WsServer getInstance(){
return SingletionWsServer.instance;
}
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
/* 构造方法内进行初始化 */
public WsServer(){
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WsServerInitializer());
}
public void start(){
//额外线程启动Netty而不是在Main方法中启动,因此无需异步
//使用Spring容器进行托管,也无需进行关闭
this.channelFuture = bootstrap.bind(8088);
log.warn("Netty WebSocket Server启动完毕...");
}
}
添加ServerInitializer:
添加 WebSocket 的初始化器
package com.dmbjz.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/* 自定义WebSocket初始化器 */
public class WsServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//Http的编码解码器
pipeline.addLast(new HttpServerCodec());
//WebSocket以块的方式写数据,需要添加ChunkedWrite处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
* Http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合起来
* 所以浏览器发送大量数据时就会发出多次Http请求
*/
pipeline.addLast(new HttpObjectAggregator(1024*64));
/* WebSocket的数据是以帧的形式传递,需要指定 WebSocketServerProtocolHandler和自定义处理帧数据Handler
* WebSocketServerProtocolHandler的功能是将Http协议升级为WebSocket协议来保持长连接,并解析 websocket 访问地址
* 浏览器访问WebSocket的形式是 ws://localhost:7070/访问路径,参数就为 “/访问路径”
* 例如访问的是 ws://localhost:7070/rog16hp,这里的handler就要指定为 "/rog16hp"
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new ChatHandler());
}
}
添加消息处理Handler:
package com.dmbjz.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/* 处理消息的Handler */
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//用于记录和管理所有客户端的Channel,可以进行Channel的自动移除,因此不需要再写 handlerRemoved 方法
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String content = msg.text();
log.info("服务器收到消息:"+content);
//对所有加入到Group的channel回复消息
clients.forEach(channel ->
channel.writeAndFlush(new TextWebSocketFrame("[服务器接收到消息:]" +LocalDateTime.now()+",消息为:"+ content))
);
}
/*
* 当客户端连接服务端后触发
* 获取客户端的Channel,并且放到ChannelGrou中去进行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
clients.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.warn("客户端断开,长ID为:"+ctx.channel().id().asLongText());
log.warn("客户端断开,短ID为:"+ctx.channel().id().asShortText());
}
/* 客户端发生异常后触发 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
ctx.channel().close();
clients.remove(ctx.channel());
}
}
添加监听执行:
Netty 需要在 Spring 加载完成后进行监听执行,因此需要监听 SpringBoot 的 Application
package com.dmbjz;
import com.dmbjz.netty.WsServer;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/* Spring加载完成后进行监听执行 */
@Component
public class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {
/* 当一个ApplicationContext被初始化或刷新触发 */
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//判断当前是否为spring容器初始化完成,防止重复执行
if(event.getApplicationContext().getParent() == null){
try {
WsServer.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
启动测试:
对之前在 WebSocket 章节中的页面进行修改,使其监听该案例的WebSocket地址,然后进行测试
访问页面检测监听结果
![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1649555166418-2365f7cb-5082-4c7d-954c-02dff6b0973b.png#clientId=u4e14a6b7-98cb-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=310&id=ud773b280&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1394&originWidth=2560&originalType=binary&ratio=1&rotation=0&showTitle=false&size=238988&status=done&style=stroke&taskId=u82cb5f16-d4c6-42db-8281-61c5990167b&title=&width=569)<br />** 输入消息,服务端成功接收**
使用Service:
如果在处理消息时需要使用业务逻辑 Service,只能通过获取 Bean 的方式进行调用,例如在读取状态下需要调用业务方法 UserService :
import cn.hutool.extra.spring.SpringUtil;
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
UserService userService = SpringUtil.getBean("userServiceImpl");
}