WebSocket技术
1、简介
HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。它基于TCP传输协议,并复用HTTP的握手通道。
对大部分web开发者来说,上面这段描述有点枯燥,其实只要记住几点:
- WebSocket可以在浏览器里使用
- 支持双向通信
- 使用很简单
1、优点
说到优点,这里的对比参照物是HTTP协议,概括地说就是:支持双向通信,更灵活,更高效,可扩展性更好。
- 支持双向通信,实时性更强。
- 更好的二进制支持。
- 较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。
- 支持扩展。ws协议定义了扩展,用户可以扩展协议,或者实现自定义的子协议。(比如支持自定义压缩算法等)
在客户端,new WebSocket实例化一个新的WebSocket客户端对象,请求类似 ws://yourdomain:port/path 的服务端WebSocket URL,客户端WebSocket对象会自动解析并识别为WebSocket请求,并连接服务端端口,执行双方握手过程,客户端发送数据格式类似:
GET /webfin/websocket/ HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==
Origin: http://localhost:8080
Sec-WebSocket-Version: 13
可以看到,客户端发起的WebSocket连接报文类似传统HTTP报文,Upgrade:websocket参数值表明这是WebSocket类型请求,Sec-WebSocket-Key是WebSocket客户端发送的一个 base64编码的密文,要求服务端必须返回一个对应加密的Sec-WebSocket-Accept应答,否则客户端会抛出Error during WebSocket handshake错误,并关闭连接。
服务端收到报文后返回的数据格式类似:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=
Sec-WebSocket-Accept的值是服务端采用与客户端一致的密钥计算出来后返回客户端的,HTTP/1.1 101 Switching Protocols表示服务端接受WebSocket协议的客户端连接,经过这样的请求-响应处理后,两端的WebSocket连接握手成功, 后续就可以进行TCP通讯了。
2、导入依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
3、代码实现
首先要注入ServerEndpointExporter,这个bean会自动注册使用@ServerEndpoint注解声明的Websocket endpoint。要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将有容器自己提供和管理
3.1 配置类
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
return taskScheduler;
}
}
3.2 WebSocket服务端
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketEndPoint {
/**
* 用于存放所有在线客户端
*/
private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
public static Map<String, Session> getSessionMap(){
return SESSION_MAP;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
log.info("有新的客户端上线: {}", session.getId());
SESSION_MAP.put(userId, session);
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
String sessionId = session.getId();
log.info("有客户端离线: {}", sessionId);
SESSION_MAP.remove(sessionId);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message) {
for(String userId : SESSION_MAP.keySet()) {
SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
}
}
/**
* @param session 消息
* @param error 异常
*/
@OnError
public void onError(Session session, Throwable error) {
String sessionId = session.getId();
if (SESSION_MAP.get(sessionId) != null) {
log.info("发生了错误,移除客户端: {}", sessionId);
SESSION_MAP.remove(sessionId);
}
log.error("发生异常:",error);
}
/**
* 指定对象发送消息
*
* @param message 消息对象
*/
public static void sendToUser(List<String> persons, String message) {
persons.forEach(userId -> send(userId,message));
}
/**
* 单发 1对1
* @param userId 用户id
* @param message 消息
*/
public static void send(String userId,String message) {
SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
}
/**
* 群发
* @param message 消息
*/
public static void batchSend(String message) {
for(String sessionId : SESSION_MAP.keySet())
{
SESSION_MAP.get(sessionId).getAsyncRemote().sendText(message);
}
}
}
3.3 定时任务
定时任务每10s一次
@Scheduled(cron = "0/10 * * * * ?")
@Component
@EnableScheduling
@Slf4j
public class ScheduleTask {
@Scheduled(cron = "0/10 * * * * ?")
public void sendMessage(){
String message = "定时报时间,现在:"+new Date();
log.info("{}","**********定时任务执行***********");
Map<String, Session> map = WebSocketEndPoint.getSessionMap();
for (String userId : map.keySet()) {
map.get(userId).getAsyncRemote().sendText(message);
}
}
}
3.4 服务层
public interface SendMessageService {
/**
* 一对一 一对多
* @param map 消息和用户
* @return 返回消息
*/
String sendToUser(Map<String, Object> map);
/**
* 群发消息
* @param message 消息
* @return 返回消息
*/
String batchSend(String message);
}
@Service
@Slf4j
public class SendMessageServiceImpl implements SendMessageService {
/**
*
* @param map 用户和消息信息
*/
@Override
public String sendToUser(Map<String, Object> map) {
List<String> persons = new ArrayList<>();
Object obj = map.get("persons");
if (ObjectUtils.isEmpty(obj)) {
log.info("{}","用户为空,不进行发送");
return "发送用户未指定,不进行发送";
}
if (obj instanceof ArrayList<?>) {
for (Object o : (List<?>) obj) {
persons.add(o.toString());
}
}
String message = map.get("msg").toString();
if (StringUtils.hasText(message)) {
WebSocketEndPoint.sendToUser(persons,message);
return "消息发送成功";
}
log.info("{}","消息为空,不进行发送");
return "发送的消息为空,不进行发送";
}
/**
* 群发消息
* @param message 消息
* @return 返回信息
*/
@Override
public String batchSend(String message) {
if (StringUtils.hasText(message)) {
WebSocketEndPoint.batchSend(message);
return "消息发送成功";
}
log.info("{}","发送的消息为空");
return "消息为空,不进行发送";
}
}
3.5 编写Controller
@RestController
@RequestMapping("/send")
public class PushMessageController {
private final SendMessageService sendService;
public PushMessageController(SendMessageService sendService) {
this.sendService = sendService;
}
/**
*
* @param message 消息
* @return 群发送完成
*/
@GetMapping("/batchSend")
public String batchSend(@PathParam("message") String message) {
return this.sendService.batchSend(message);
}
/**
*
* @param map 用户+消息
* @return String
*/
@GetMapping("/sendToUser")
public String sendToUser(@RequestBody Map<String, Object> map) {
return sendService.sendToUser(map);
}
}