HTML5 WebSocket

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。
现在,很多网站为了实现推送技术,所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。
HTML5 定义的 WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯
第七节WebSocket - 图1
浏览器通过 JavaScript 向服务器发出建立 WebSocket 连接的请求,连接建立以后,客户端和服务器端就可以通过 TCP 连接直接交换数据。
当你获取 Web Socket 连接后,你可以通过 send() 方法来向服务器发送数据,并通过 onmessage 事件来接收服务器返回的数据。
以下 API 用于创建 WebSocket 对象。

介绍

首先要使用WebSocket那么肯定需要服务器端有一个连接的地方,那么用golang来做就是将普通的一个接口,通过声明升级,来将这个接口升级为ws协议而不是http协议。

在golang中使用WebSocket有很多可以使用的包,这里使用github.com/gorilla/websocket这个包,我们可以去这个地址看一下,有详细的项目介绍,以及使用方法。

安装

在项目中执行go get github.com/gorilla/websocket 将这个依赖包拉到我们的项目中

实现

  1. // Manager 所有 websocket 信息
  2. type Manager struct {
  3. IdClientMap map[int32]*Client // 用户连接数组
  4. Lock sync.RWMutex // 锁
  5. MonitoringRealData, Register, UnRegister chan *Client // 监控实时数据,注册连接,删除连接
  6. Message chan *MessageData // 定义一个消息通道
  7. GroupMessage chan *GroupMessageData // 消息组通道
  8. BroadCastMessage chan *BroadCastMessageData // 广播消息
  9. incr int32 // 自增数
  10. }

首先定义一个结构体,来存储记录我们的websocket中的内容,每个用户请求接口连接WebSocket都是一个单独的进程,那么如果想要用户之间可以进行交互那么就需要将连接都存储起来,这里就
用一个IdClientMap来存储每个用户的连接,每次用户连接进来 incr都是自增+1 这个就代表当前用户的一个连接ID,用连接ID存储当前连接到数组中,就可以使用IdClientMap[1].client就可以做交互了。
定义了之后需要将这个结构体初始化,那么定义一个变量来初始化结构体中的各个属性。

  1. var (
  2. // WebsocketManager 初始化 wsManager 管理器
  3. WebsocketManager = Manager{
  4. IdClientMap: make(map[int32]*Client),
  5. Register: make(chan *Client, 128),
  6. UnRegister: make(chan *Client, 128),
  7. MonitoringRealData: make(chan *Client, 128),
  8. Message: make(chan *MessageData, 128),
  9. GroupMessage: make(chan *GroupMessageData, 128),
  10. BroadCastMessage: make(chan *BroadCastMessageData, 128),
  11. incr: 1,
  12. }
  13. SystemId int32 = 1
  14. )
  15. // Client 单个 websocket 信息
  16. type Client struct {
  17. Id int32
  18. GroupList []int32
  19. Socket *websocket.Conn
  20. Message chan []byte
  21. }
  22. // messageData 单个发送数据信息
  23. type MessageData struct {
  24. Id int32
  25. Group int
  26. Message []byte
  27. }
  28. // groupMessageData 组广播数据信息
  29. type GroupMessageData struct {
  30. Group int32
  31. Message []byte
  32. }
  33. // 广播发送数据信息
  34. type BroadCastMessageData struct {
  35. Message []byte
  36. }
  37. // 读取消息
  38. type ReadMessageRequest struct {
  39. Type int `json:"type"`
  40. Message string `json:"message"`
  41. AcceptId int32 `json:"accept_id"`
  42. }
  43. // 消息返回
  44. type MessageResponse struct {
  45. Status int `json:"status"`
  46. Type int `json:"type"`
  47. Message string `json:"message"`
  48. FromId int32 `json:"from_id;"`
  49. }

这里将各个属性都make初始化,再定义好所需的对应结构体,读取消息,存储用户连接发送消息,广播消息,群组消息。这里定义完之后,创建一个接口来访问。

  1. // webSocket连接
  2. func (manager *Manager) WsClient(ctx *gin.Context) {
  3. upGrader := websocket.Upgrader{
  4. // cross origin domain
  5. CheckOrigin: func(r *http.Request) bool {
  6. return true
  7. },
  8. // 处理 Sec-WebSocket-Protocol Header
  9. Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
  10. }
  11. conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
  12. if err != nil {
  13. log.Printf("websocket connect error: %s", ctx.Param("channel"))
  14. return
  15. }
  16. client := &Client{
  17. Id: atomic.AddInt32(&manager.incr, 1),
  18. Socket: conn,
  19. Message: make(chan []byte, 1024),
  20. }
  21. go client.Read(manager)
  22. go client.Write()
  23. manager.RegisterClient(client)
  24. }

这里创建了一个Manage结构体下的成员方法WsClient ,在方法中调用拉取的websocket包的方法websocket.Upgrader 定义升级http到ws的参数,再调用upGrader.Upgrade(ctx.Writer, ctx.Request, nil)将当前接口升级为WebSocket接口。再初始化Client结构体,设置当前用户的连接,以及唯一的ID,再初始化一个消息的通道,
下面使用了go client.Read(manager) 这里是开启了golang的一个goroutine协程,这个是读取WebSocket消息,另外还有一个go client.Write() 这里是将消息传输到客户端,然后调用manager.RegisterClient(client) 注册一个连接到组里

  1. // 读取websocket发送来的数据
  2. func (c *Client) Read(manager *Manager) {
  3. defer func() {
  4. log.Printf("close:[%s],date:%s", c.Id, time.Now())
  5. WebsocketManager.UnRegister <- c
  6. }()
  7. for {
  8. messageType, message, err := c.Socket.ReadMessage()
  9. if err != nil || messageType == websocket.CloseMessage {
  10. break
  11. }
  12. var ResponseData []byte
  13. ReadMessage := ReadMessageRequest{}
  14. if err := json.Unmarshal(message, &ReadMessage); err != nil {
  15. ResponseData, _ = json.Marshal(MessageResponse{Status: 201, Message: "消息发送失败", Type: 1, FromId: SystemId})
  16. c.Message <- ResponseData
  17. } else {
  18. acceptClient, ok := manager.IdClientMap[ReadMessage.AcceptId]
  19. fmt.Printf("ok:%s,acceptId:%s", ok, ReadMessage.AcceptId)
  20. if !ok {
  21. ResponseData, _ = json.Marshal(MessageResponse{Status: 201, Message: "消息发送失败,对方已离线", Type: 1, FromId: SystemId})
  22. c.Message <- ResponseData
  23. } else {
  24. ResponseData, _ = json.Marshal(MessageResponse{Status: 200, Message: ReadMessage.Message, Type: 1, FromId: c.Id})
  25. acceptClient.Message <- ResponseData
  26. }
  27. }
  28. fmt.Printf("[clientId:]%v,[message:]%s\n", c.Id, message)
  29. }
  30. }
  31. // 写信息,从 channel 变量 Send 中读取数据写入 websocket 连接
  32. func (c *Client) Write() {
  33. defer func() {
  34. log.Printf("clientWrite [%s] disconnect", c.Id)
  35. if err := c.Socket.Close(); err != nil {
  36. log.Printf("clientWrite [%s] disconnect err: %s", c.Id, err)
  37. }
  38. }()
  39. for {
  40. select {
  41. case message, ok := <-c.Message:
  42. if !ok {
  43. _ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  44. return
  45. }
  46. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  47. if err != nil {
  48. log.Printf("client [%s] writemessage err: %s", c.Id, err)
  49. }
  50. }
  51. }
  52. }
  53. // 注册用户进程
  54. func (manager *Manager) RegisterClient(c *Client) {
  55. manager.Register <- c
  56. }
  57. // 注销用户进程
  58. func (manager *Manager) UnRegisterClient(c *Client) {
  59. manager.UnRegister <- c
  60. }
  61. // 监听用户进程注册,用户进程注销
  62. func (manager *Manager) Start() {
  63. for {
  64. select {
  65. case client := <-manager.Register:
  66. log.Printf("client [%s] connect", client.Id)
  67. /* log.Printf("register client [%s] to group [%s]", client.Id, client.Group)
  68. */
  69. manager.Lock.RLock()
  70. if _, ok := manager.IdClientMap[client.Id]; ok {
  71. manager.IdClientMap[client.Id].Socket.Close()
  72. }
  73. manager.IdClientMap[client.Id] = client
  74. ResponseData, _ := json.Marshal(MessageResponse{Status: 200, Message: fmt.Sprintf("连接成功,您的会话ID为[%v]", client.Id), Type: 1, FromId: SystemId})
  75. client.Message <- ResponseData
  76. manager.Lock.RUnlock()
  77. // 注销
  78. case client := <-manager.UnRegister:
  79. manager.Lock.Lock()
  80. delete(manager.IdClientMap, client.Id)
  81. manager.Lock.Unlock()
  82. close(client.Message)
  83. }
  84. }
  85. }

这里一共三个方法,Read,Write,Start 分别是读,写,启动。
通过c.Socket.ReadMessage() 分别返回3个参数,消息内容,消息类型,错误,这里我们定义全部使用json来传输消息,那么消息类型就是文本类型,判断一下err是否有错误,如果没有错误那么将收到的消息解析json,就可以拿到对应传输过来的参数,处理过后通过c.Message <- ResponseData 将消息传输到通道中。在Write中开启了一个循环,通过select case来接受一个channel通道的数据,然后通过c.Socket.WriteMessage(websocket.TextMessage, message) 来将数据传输给客户端。

调用

image.png
首先将WebSocket启动。然后将我们实现的接口WsClient注册一个路由/user/connect 将服务启动,那么就可以通过WebSocket连接到服务器

可以通过http://www.websocket-test.com/ 这个地址是一个WebSocket的测试网站。我们可以将地址粘贴进去连接进行一个测试。
image.png
image.png
这里填入地址ws://127.0.0.1:8088/user/connect就是我们实现的WebSocket接口,这里的协议是必须ws://,否则连接就会不成功,下面就可以发送消息到服务器。

image.png
这里发送消息{"type":1,"message":"xiaoxi ","accept_id":2} 定义类型是文本,消息内容,接受人ID,这里发送过去之后返回消息发送失败,对方已离线,那是因为id为2的已经不在线 断开连接了,那么重新打开一个网页 在来连接
image.png
这里新开浏览器连接的ID为4,那么我们在这边发送消息给连接ID为3的用户

image.png
这边发送之后,打开3号的窗口
image.png

可以看到这里多了一条消息,状态200 成功,消息内容,以及发送人的ID,这样一个简单的聊天WebSocket就完成了。