应用场景

弹幕
游戏广播
消息订阅
多玩家游戏
协同编辑
股票基金实时报价
视频会议
在线教育
聊天室

结论

  • WebSocket和HTTP都是基于TCP协议。两个完全不同的应用层协议
  • WebSocket依赖于HTTP连接进行第一次握手
  • Socket不是协议,它是在程序层面上对传输层协议的接口封装,可以理解为一个能够提供端对端的通信的调用接口(API)

实例

springboot

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

内置tomcat注入bean

  1. import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. @Configuration
  6. @ConditionalOnWebApplication
  7. public class WebSocketConfig {
  8. //注意:用外置tomcat不需要注入此bean
  9. @Bean
  10. public ServerEndpointExporter serverEndpointExporter() {
  11. return new ServerEndpointExporter();
  12. }
  13. }

websocket实现类

  1. import cn.hutool.core.collection.CollectionUtil;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. import javax.websocket.CloseReason;
  9. import javax.websocket.OnClose;
  10. import javax.websocket.OnOpen;
  11. import javax.websocket.Session;
  12. import javax.websocket.server.PathParam;
  13. import javax.websocket.server.ServerEndpoint;
  14. import java.io.IOException;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.concurrent.ConcurrentHashMap;
  18. import java.util.concurrent.atomic.AtomicInteger;
  19. @Slf4j
  20. @ServerEndpoint(value = "/接口名/{userId}")
  21. @Component
  22. public class MyWebSocketService {
  23. private static final String NULL_KEY = "null";
  24. /**
  25. * 心跳连接有效时间(毫秒)
  26. */
  27. private static final Long BEAT_HEART_DURATION_TIME_MILLIS = 10 * 60 * 1000L;
  28. /**
  29. * 用来记录当前在线连接数
  30. */
  31. private static AtomicInteger onlineCount = new AtomicInteger(0);
  32. /**
  33. * concurrent包的线程安全Map,用来存放每个客户端对应的Session对象。
  34. */
  35. public static Map<String, Session> clients = new ConcurrentHashMap<String, Session>();
  36. /**
  37. * concurrent包的线程安全Map,用来存放每个客户端对应的Session对象。
  38. */
  39. private static Map<Session, String> sessionMap = new ConcurrentHashMap<Session, String>();
  40. private static Map<String, Session> oldClients = new ConcurrentHashMap<String, Session>();
  41. private static Map<Session, Long> sessionBeatheartMap = new ConcurrentHashMap<Session, Long>();
  42. @Autowired
  43. private MessageService messageService;
  44. @OnOpen
  45. public void onOpen(@PathParam("userId") String userId, Session session) {
  46. if (StringUtils.isEmpty(userId) || NULL_KEY.equalsIgnoreCase(userId)) {
  47. try {
  48. log.warn("[key={}]非法,禁止连接!!!", userId);
  49. session.close();
  50. } catch (IOException e) {
  51. }
  52. }
  53. if (clients.containsKey(userId)) {
  54. //删除原有连接
  55. destroyOldSession(userId);
  56. }
  57. //在线数加1
  58. addOnlineCount();
  59. clients.put(userId, session);
  60. sessionMap.put(session, userId);
  61. sessionBeatheartMap.put(session, System.currentTimeMillis());
  62. log.info("新连接[userId={}]加入!当前在线连接数为{}", userId, getOnlineCount());
  63. }
  64. @OnClose
  65. public void onClose(Session session) {
  66. String key = sessionMap.get(session);
  67. if (StringUtils.isNotEmpty(key)) {
  68. if (clients.containsKey(key)) {
  69. clients.remove(key);
  70. //在线数减1
  71. subOnlineCount();
  72. }
  73. sessionMap.remove(session);
  74. sessionBeatheartMap.remove(session);
  75. log.info("连接 [userId={}]关闭!当前在线连接数为{}", key, getOnlineCount());
  76. /**通知系统断开连接**/
  77. destroyOldSession(key);
  78. }
  79. }
  80. @Scheduled(cron = " */5 * * * * ?")
  81. public void processTerminalInformation() {
  82. if (clients.isEmpty()) {
  83. return;
  84. }
  85. clients.forEach((k, v) -> {
  86. try {
  87. List<Message> messages = messageService.getMessageLists(k);
  88. if (CollectionUtil.isNotEmpty(messages)) {
  89. v.getAsyncRemote().sendText(JSON.toJSONString(messages));
  90. }
  91. } catch (Exception e) {
  92. destroyOldSession(k);
  93. }
  94. });
  95. }
  96. @Scheduled(cron = "0 */1 * * * ?")
  97. public void processOnlineTime() {
  98. oldClients.forEach((k, v) -> {
  99. try {
  100. Long lastBeatTime = sessionBeatheartMap.get(v);
  101. if (lastBeatTime == null || (System.currentTimeMillis() - lastBeatTime) > BEAT_HEART_DURATION_TIME_MILLIS) {
  102. //超过90秒未收到空消息,KEY 设备已断开连接
  103. destroyOldSession(k);
  104. }
  105. } catch (Exception e) {
  106. //连接不可用,清理连接
  107. destroyOldSession(k);
  108. }
  109. });
  110. oldClients = clients;
  111. }
  112. private void destroyOldSession(String key) {
  113. Session oldSession = clients.get(key);
  114. if (oldSession != null) {
  115. if (clients.containsKey(key)) {
  116. subOnlineCount();
  117. clients.remove(key);
  118. if (oldSession != null) {
  119. sessionMap.remove(oldSession);
  120. sessionBeatheartMap.remove(oldSession);
  121. }
  122. try {
  123. oldSession.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "已断开连接!"));
  124. } catch (IOException e) {
  125. }
  126. }
  127. }
  128. }
  129. public static synchronized AtomicInteger getOnlineCount() {
  130. return onlineCount;
  131. }
  132. /**
  133. * 增加连接人数
  134. */
  135. public static synchronized void addOnlineCount() {
  136. onlineCount.incrementAndGet();
  137. }
  138. /**
  139. * 减少连接人数
  140. */
  141. public static synchronized void subOnlineCount() {
  142. onlineCount.decrementAndGet();
  143. }
  144. }

Vue

用法

  1. created() {
  2. this.initWebSocket();
  3. },
  4. methods: {
  5. initWebSocket() {
  6. let token = localStorage.getItem('token');
  7. const url = 'ws://127.0.0.1:端口号/接口名/' + token;
  8. this.websocket = new WebSocket(url);
  9. this.websocket.onopen = this.websockOpen;
  10. this.websocket.onmessage = this.websocketonmessage;
  11. this.websocket.onclose = this.websocketclose;
  12. },
  13. websockOpen() {
  14. console.log("WebSocket连接成功");
  15. },
  16. websocketonmessage(e) { //数据接收
  17. console.log(e);
  18. },
  19. websocketclose(e) { //关闭
  20. console.log("close..")
  21. },
  22. logout() { //这部分是退出的时候关闭websocket链接
  23. window.localStorage.removeItem('token');
  24. this.$router.push({path: '/login'})
  25. this.websocket.close();
  26. },
  27. }

结论刨析

Ajax、Long poll、Websocket图示

WebSocket - 图1
虽然http1.1默认开启了keep-alive长连接保持了这个TCP通道使得在一个HTTP连接中,可以发送多个Request,接收多个Response,但是一个request只能有一个response。而且这个response也是被动的,不能主动发起

协议升级

每个WebSocket连接都始于一个HTTP请求。WebSocket协议在第一次握手连接时,通过HTTP协议在传送WebSocket支持的版本号,协议的字版本号,原始地址,主机地址等等一些列字段给服务器端

GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key:dGhlIHNhbXBsZSBub25jZQ== Origin: http://example.com Sec-WebSocket-Version: 13

注意,关键的地方是,这里面有个Upgrade首部,用来把当前的HTTP请求升级到WebSocket协议,这是HTTP协议本身的内容,是为了扩展支持其他的通讯协议。如果服务器支持新的协议,则必须返回101:

HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept:s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

至此,HTTP请求物尽其用,如果成功出发onopen事件,否则触发onerror事件,后面的传输则不再依赖HTTP协议。

为什么依赖http

  1. WebSocket设计上就是天生为HTTP增强通信(全双工通信等),所以在HTTP协议连接的基础上是很自然的一件事,并因此而能获得HTTP的诸多便利。
  2. 这诸多便利中有一条很重要,基于HTTP连接将获得最大的一个兼容支持,比如即使服务器不支持WebSocket也能建立HTTP通信,只不过返回的是onerror而已,这显然比服务器无响应要好的多。

参考文章

理清 WebSocket 和 HTTP 的关系
零距离接触websocket