在第一天进行了登录之后,要开始进行信息的交互了
对于信息的交互,最重要的是选择一定的数据结构,我们同样在model包中编写如下数据结构

  1. //message.go
  2. package model
  3. import (
  4. "gorm.io/plugin/soft_delete"
  5. "time"
  6. )
  7. type Message struct {
  8. ID int32 `json:"id" gorm:"primarykey"`
  9. CreatedAt time.Time `json:"createAt"`
  10. UpdatedAt time.Time `json:"updatedAt"`
  11. DeletedAt soft_delete.DeletedAt `json:"deletedAt"`
  12. FromUserId int32 `json:"fromUserId" gorm:"index"`
  13. ToUserId int32 `json:"toUserId" gorm:"index;comment:'发送给端的id,可为用户id或者群id'"`
  14. Content string `json:"content" gorm:"type:varchar(2500)"`
  15. MessageType int16 `json:"messageType" gorm:"comment:'消息类型:1单聊,2群聊'"`
  16. ContentType int16 `json:"contentType" gorm:"comment:'消息内容类型:1文字 2.普通文件 3.图片 4.音频 5.视频 6.语音聊天 7.视频聊天'"`
  17. Pic string `json:"pic" gorm:"type:text;comment:'缩略图"`
  18. Url string `json:"url" gorm:"type:varchar(350);comment:'文件或者图片地址'"`
  19. }

这个Message结构体同样作为gorm的必须,进行与数据库的交互,每次进行一次信息的发送,就会保存在对应的数据库中。

程序的开始?

有了对应的数据结构,那程序的主体是什么呢?因为是利用golang来编写,可以选择用一些golang 的特性进行编写,比如利用协程,可以很方便的实现高度的并发,在每一个客户端进行连接的同时,为每一个客户端开启一个协程,所以就有了如下逻辑

  1. // cmd/main.go
  2. package main
  3. import (
  4. "gochat/internal/route"
  5. "gochat/internal/server"
  6. "time"
  7. "net/http"
  8. )
  9. func main(){
  10. newRouter := route.NewRoute()
  11. go server.MyServer.Start()
  12. s := &http.Server{
  13. Addr: ":3000",
  14. Handler: newRouter,
  15. ReadTimeout: 10 * time.Second,
  16. WriteTimeout: 10 * time.Second,
  17. MaxHeaderBytes: 1 << 20,
  18. }
  19. s.ListenAndServe()
  20. }
  1. 查看代码的11行,在昨天编写了route的部分之后,今天需要添加一些
  1. group.POST("/friend", ChatApi.AddFriend)
  2. group.GET("/user", ChatApi.GetUserList)
  3. group.GET("/user/:uuid", ChatApi.GetUserDetails)
  4. group.GET("/user/name", ChatApi.GetUserOrGroupByName)
  5. group.PUT("/user", ChatApi.ModifyUserInfo)
  6. group.GET("/message", ChatApi.GetMessage)
  7. group.GET("/file/:fileName", ChatApi.GetFile)
  8. group.POST("/file", ChatApi.SaveFile)
  9. group.GET("/group/:uuid", ChatApi.GetGroup)
  10. group.POST("/group/:uuid", ChatApi.SaveGroup)
  11. group.POST("/group/join/:userUuid/:groupUuid", ChatApi.JoinGroup)
  12. group.GET("/group/user/:uuid", ChatApi.GetGroupUsers)

这些与今天主体通信无关系,但是也是很重要的,其实最主要的是编写对应的api,这些单独来说,今天主要解决最主要的通信部分
通信部分最主要的是使用websocket,也就是下面这句代码

  1. group.GET("/socket.io", socket)
  1. 对于通信,我们为什么使用使用websocket呢?<br />websocketsocket有什么关系呢,就和周杰伦和周杰,卡巴斯基和巴基斯坦,javajavascript一样,没啥关系。对于即时通讯,按照以前的技术就是采用轮询,Comet来实现的,当需要即时通讯时,就在特定的时间间隔发送一个request,然后返回最新的数据,这样的缺点就是每次都要发一个httprequest,他的头很长,这样代价就太高了,就很浪费,然后就有了websocket<br />WebSocketHTTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP之上的。
  1. 连接过程 —— 握手过程
  2. 1. 浏览器、服务器建立TCP连接,三次握手。这是通信的基础,传输控制层,若失败后续都不执行。
  3. 2. TCP连接成功后,浏览器通过HTTP协议向服务器传送WebSocket支持的版本号等信息。(开始前的HTTP握手)
  4. 3. 服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据。
  5. 4. 当收到了连接成功的消息后,通过TCP通道进行传输通信。

这样传输的时候,并不需要http协议,对于传统 HTTP 每次请求-应答都需要客户端与服务端建立连接的模式,WebSocket 是类似 Socket 的 TCP 长连接的通讯模式,一旦 WebSocket 连接建立后,后续数据都以帧序列的形式传输。在客户端断开 WebSocket 连接或 Server 端断掉连接前,不需要客户端和服务端重新发起连接请求。在海量并发及客户端与服务器交互负载流量大的情况下,极大的节省了网络带宽资源的消耗,有明显的性能优势,且客户端发送和接受消息是在同一个持久连接上发起,实时性优势明显。
[

](https://blog.csdn.net/wwd0501/article/details/54582912)
下面是一个简单的demo
http://www.jsons.cn/websocket/,用这个当作客户端就ok

  1. package main
  2. import (
  3. "github.com/gorilla/websocket"
  4. "net/http"
  5. )
  6. var upGrader = websocket.Upgrader{
  7. CheckOrigin: func(r *http.Request) bool {
  8. return true
  9. },
  10. }
  11. func test(c http.ResponseWriter, r* http.Request){
  12. conn ,_ := upGrader.Upgrade(c,r ,nil)
  13. defer conn.Close()
  14. for {
  15. i,msg,_ := conn.ReadMessage()
  16. conn.WriteMessage(i,msg)
  17. }
  18. }
  19. func main(){
  20. http.HandleFunc("/",test)
  21. http.ListenAndServe(":8888",nil)
  22. }

https://www.cnblogs.com/barrywxx/p/7412808.html
https://blog.csdn.net/chenyu201003/article/details/81449762
https://blog.csdn.net/whynottrythis/article/details/109119328 这些是websocket的一些参考资料

怎么实现socket.io?

还是同样的,在/internal/route 中编写socket.go

  1. package route
  2. import (
  3. "gochat/internal/server"
  4. "net/http"
  5. "github.com/gin-gonic/gin"
  6. "github.com/gorilla/websocket"
  7. )
  8. //需要使用websocket
  9. var upGrader = websocket.Upgrader{
  10. CheckOrigin: func(r *http.Request) bool { //允许跨域
  11. return true
  12. },
  13. }
  14. func RunSocekt(c *gin.Context) {
  15. user := c.Query("user")
  16. if user == "" {
  17. return
  18. }
  19. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  20. if err != nil {
  21. return
  22. }
  23. client := &server.Client{
  24. Name: user,
  25. Conn: ws,
  26. Send: make(chan []byte),
  27. }
  28. server.MyServer.Register <- client
  29. go client.Read()
  30. go client.Write()
  31. }

所对应的数据结构client和其方法read,write在server中实现,先暂时搁置,你只需要大概知道一个是启动协程进行读写的就ok了
这段代码的意思就是当启动了一个socket的时候,将传入的user等信息新建一个client客户端传入server这个主体代码逻辑中进行新建一个客户端,当然现在看着云里雾里的,下面开始解读server中的内容

server?client?

  1. // internal/server/server.go
  2. package server
  3. import (
  4. "github.com/gorilla/websocket"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/google/uuid"
  7. "gochat/pkg/protocol"
  8. "gochat/pkg/common/util"
  9. "gochat/pkg/common/constant"
  10. "gochat/config"
  11. "gochat/internal/service"
  12. "encoding/base64"
  13. "io/ioutil"
  14. "strings"
  15. "sync"
  16. )
  17. type Client struct {
  18. Conn *websocket.Conn
  19. Name string
  20. Send chan []byte
  21. }
  22. type Server struct {
  23. Clients map[string]*Client
  24. mutex *sync.Mutex
  25. Broadcast chan []byte
  26. Register chan *Client
  27. Ungister chan *Client
  28. }
  29. var MyServer = NewServer()
  30. func NewServer() *Server {
  31. return &Server{
  32. mutex: &sync.Mutex{},
  33. Clients: make(map[string]*Client),
  34. Broadcast: make(chan []byte),
  35. Register: make(chan *Client),
  36. Ungister: make(chan *Client),
  37. }
  38. }
  39. //启动服务
  40. func (s *Server) Start() {
  41. for {
  42. select {
  43. case conn := <-s.Register://登录
  44. s.Clients[conn.Name] = conn
  45. msg := &protocol.Message{
  46. From: "System",
  47. To: conn.Name,
  48. Content: "welcome!",
  49. }
  50. protoMsg, _ := proto.Marshal(msg)
  51. conn.Send <- protoMsg
  52. case conn := <-s.Ungister://退出
  53. if _, ok := s.Clients[conn.Name]; ok {
  54. close(conn.Send)
  55. delete(s.Clients, conn.Name)
  56. }
  57. case message := <-s.Broadcast://发消息
  58. msg := &protocol.Message{}
  59. proto.Unmarshal(message, msg)
  60. if msg.To != "" {
  61. // 一般消息,比如文本消息,视频文件消息等
  62. if msg.ContentType >= constant.TEXT && msg.ContentType <= constant.VIDEO {
  63. // 保存消息只会在存在socket的一个端上进行保存,防止分布式部署后,消息重复问题
  64. _, exits := s.Clients[msg.From]
  65. if exits {
  66. saveMessage(msg)
  67. }
  68. if msg.MessageType == constant.MESSAGE_TYPE_USER {
  69. client, ok := s.Clients[msg.To]
  70. if ok {
  71. msgByte, err := proto.Marshal(msg)
  72. if err == nil {
  73. client.Send <- msgByte
  74. }
  75. }
  76. } else if msg.MessageType == constant.MESSAGE_TYPE_GROUP {
  77. sendGroupMessage(msg, s)
  78. }
  79. } else {
  80. // 语音电话,视频电话等,仅支持单人聊天,不支持群聊
  81. // 不保存文件,直接进行转发
  82. client, ok := s.Clients[msg.To]
  83. if ok {
  84. client.Send <- message
  85. }
  86. }
  87. } else {
  88. // 无对应接受人员进行广播
  89. for _, conn := range s.Clients {
  90. select {
  91. case conn.Send <- message:
  92. default:
  93. close(conn.Send)
  94. delete(s.Clients, conn.Name)
  95. }
  96. }
  97. }
  98. }
  99. }
  100. }// 发送给群组消息,需要查询该群所有人员依次发送
  101. func sendGroupMessage(msg *protocol.Message, s *Server) {
  102. // 发送给群组的消息,查找该群所有的用户进行发送
  103. users := service.GroupService.GetUserIdByGroupUuid(msg.To)
  104. for _, user := range users {
  105. if user.Uuid == msg.From {
  106. continue
  107. }
  108. client, ok := s.Clients[user.Uuid]
  109. if !ok {
  110. continue
  111. }
  112. fromUserDetails := service.UserService.GetUserDetails(msg.From)
  113. // 由于发送群聊时,from是个人,to是群聊uuid。所以在返回消息时,将form修改为群聊uuid,和单聊进行统一
  114. msgSend := protocol.Message{
  115. Avatar: fromUserDetails.Avatar,
  116. FromUsername: msg.FromUsername,
  117. From: msg.To,
  118. To: msg.From,
  119. Content: msg.Content,
  120. ContentType: msg.ContentType,
  121. Type: msg.Type,
  122. MessageType: msg.MessageType,
  123. Url: msg.Url,
  124. }
  125. msgByte, err := proto.Marshal(&msgSend)
  126. if err == nil {
  127. client.Send <- msgByte
  128. }
  129. }
  130. }
  131. // 保存消息,如果是文本消息直接保存,如果是文件,语音等消息,保存文件后,保存对应的文件路径
  132. func saveMessage(message *protocol.Message) {
  133. // 如果上传的是base64字符串文件,解析文件保存
  134. if message.ContentType == 2 {
  135. url := uuid.New().String() + ".png"
  136. index := strings.Index(message.Content, "base64")
  137. index += 7
  138. content := message.Content
  139. content = content[index:]
  140. dataBuffer, dataErr := base64.StdEncoding.DecodeString(content)
  141. if dataErr != nil {
  142. return
  143. }
  144. err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, dataBuffer, 0666)
  145. if err != nil {
  146. return
  147. }
  148. message.Url = url
  149. message.Content = ""
  150. } else if message.ContentType == 3 {
  151. // 普通的文件二进制上传
  152. fileSuffix := util.GetFileType(message.File)
  153. nullStr := ""
  154. if nullStr == fileSuffix {
  155. fileSuffix = strings.ToLower(message.FileSuffix)
  156. }
  157. contentType := util.GetContentTypeBySuffix(fileSuffix)
  158. url := uuid.New().String() + "." + fileSuffix
  159. err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, message.File, 0666)
  160. if err != nil {
  161. return
  162. }
  163. message.Url = url
  164. message.File = nil
  165. message.ContentType = contentType
  166. }
  167. service.MessageService.SaveMessage(*message)
  168. }
  169. func (c *Client) Read() {
  170. defer func() {
  171. MyServer.Ungister <- c
  172. c.Conn.Close()
  173. }()
  174. for {
  175. c.Conn.PongHandler()
  176. _, message, err := c.Conn.ReadMessage()
  177. if err != nil {
  178. MyServer.Ungister <- c
  179. c.Conn.Close()
  180. break
  181. }
  182. msg := &protocol.Message{}
  183. proto.Unmarshal(message, msg)
  184. // pong WebSocket 协议定义了三种类型的控制消息:
  185. //close、ping 和 pong。调用连接 WriteControl、WriteMessage 或 NextWriter 方法
  186. //向对端发送控制消息。
  187. if msg.Type == constant.HEAT_BEAT {
  188. pong := &protocol.Message{
  189. Content: constant.PONG,
  190. Type: constant.HEAT_BEAT,
  191. }
  192. pongByte, err2 := proto.Marshal(pong)
  193. if nil != err2 {
  194. }
  195. c.Conn.WriteMessage(websocket.BinaryMessage, pongByte)
  196. } else {
  197. MyServer.Broadcast <- message
  198. }
  199. }
  200. }
  201. func (c *Client) Write() {
  202. defer func() {
  203. c.Conn.Close()
  204. }()
  205. for message := range c.Send {
  206. c.Conn.WriteMessage(websocket.BinaryMessage, message)
  207. }
  208. }

protobuf

这里涉及到了一个东西,就是protobuf,什么是protobuf
protobuf也叫protocol buffer是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了多种语言的实现:java、c#、c++、go 和 python,每一种实现都包含了相应语言的编译器以及库文件。由于它是一种二进制的格式,比使用 xml 、json进行数据交换快许多。可以把它用于分布式应用之间的数据通信或者异构环境下的数据交换。作为一种效率和兼容性都很优秀的二进制数据传输格式,可以用于诸如网络传输、配置文件、数据存储等诸多领域。要使用protobuf,得先创建.proto文件才行。 .proto文件中的定义很简单:为要序列化的每个数据结构定义消息,然后为消息中的每个字段指定名称和类型。

  1. /**********pkg/protocol/message.proto*******************/
  2. syntax = "proto3";
  3. package protocol;
  4. message Message {
  5. string avatar = 1; //头像
  6. string fromUsername = 2; // 发送消息用户的用户名
  7. string from = 3; // 发送消息用户uuid
  8. string to = 4; // 发送给对端用户的uuid
  9. string content = 5; // 文本消息内容
  10. int32 contentType = 6; // 消息内容类型:1.文字 2.图片 3.普通文件 4.音频 5.视频 6.语音聊天 7.视频聊天
  11. string type = 7; // 消息传输类型:如果是心跳消息,该内容为heatbeat,在线视频或者音频为webrtc
  12. int32 messageType = 8; // 消息类型,1.单聊 2.群聊
  13. string url = 9; // 图片,视频,语音的路径
  14. string fileSuffix = 10; // 文件后缀,如果通过二进制头不能解析文件后缀,使用该后缀
  15. bytes file = 11; // 如果是图片,文件,视频等的二进制
  16. }

然后有命令,生成了message.pb.go,你就可以用里面的函数了,但是最基本用的就是Marshal与unMarshal进行序列化和反序列化。
proto的部分大约就是这些,下面来进行server.go的解读
可以这么理解
day2 进行message的编写 - 图1

一个server服务端由一个map[string]Client结构的map,一个互斥锁mutex,一个用于登录的channel Register与一个用于退出的channel ungister,一个用于发送消息的broadcast。对应的服务端拥有一个websocket的长连接Conn websocket.Conn,名字 Name string,一个发送消息的channel Send chan []byte。
在启动服务的时候,利用一个for循环进行循环处理,同时利用channel的特性,select ,用来监听几个channel是否有消息,这里分为三个部分,分别为

  1. 登录
  2. 登出
  3. 发消息

对于登录a来说,登录后直接新增加一个map,然后向用户发送一个message用来欢迎,其实我觉得可有可无吧,最后调用conn.send发送消息
对于登出b来说同理,首先关闭client的send channel,然后从map中delete这个client
最后是最重要的发消息c了

  1. case message := <-s.Broadcast://发消息
  2. msg := &protocol.Message{}
  3. proto.Unmarshal(message, msg)
  4. if msg.To != "" {
  5. // 一般消息,比如文本消息,视频文件消息等
  6. if msg.ContentType >= constant.TEXT && msg.ContentType <= constant.VIDEO {
  7. // 保存消息只会在存在socket的一个端上进行保存,防止分布式部署后,消息重复问题
  8. _, exits := s.Clients[msg.From]
  9. if exits {
  10. saveMessage(msg)
  11. }
  12. if msg.MessageType == constant.MESSAGE_TYPE_USER {//发给个人
  13. client, ok := s.Clients[msg.To]
  14. if ok {
  15. msgByte, err := proto.Marshal(msg)
  16. if err == nil {
  17. client.Send <- msgByte
  18. }
  19. }
  20. } else if msg.MessageType == constant.MESSAGE_TYPE_GROUP {//发送给群组
  21. sendGroupMessage(msg, s)
  22. }
  23. } else {
  24. // 语音电话,视频电话等,仅支持单人聊天,不支持群聊
  25. // 不保存文件,直接进行转发
  26. client, ok := s.Clients[msg.To]
  27. if ok {
  28. client.Send <- message
  29. }
  30. }
  31. } else {
  32. // 无对应接受人员进行广播
  33. for _, conn := range s.Clients {
  34. select {
  35. case conn.Send <- message:
  36. default:
  37. close(conn.Send)
  38. delete(s.Clients, conn.Name)
  39. }
  40. }
  41. }
  1. 代码逻辑清晰,利用反序列化的message结构体中的“to”字段来判断这段消息是广播还是单发,然后在判断是文字消息还是视频等信息,在分为群组和私聊,下面解读saveMessage(msg)与sendGroupMessage(msg, s)这两个函数
  1. //saveMessage(msg)
  2. // 消息内容类型:1.文字 2.图片 3.普通文件 4.音频 5.视频 6.语音聊天 7.视频聊天
  3. func saveMessage(message *protocol.Message) {
  4. // 如果上传的是base64字符串文件,解析文件保存
  5. if message.ContentType == 2 {
  6. url := uuid.New().String() + ".png"
  7. index := strings.Index(message.Content, "base64")
  8. index += 7
  9. content := message.Content
  10. content = content[index:]
  11. dataBuffer, dataErr := base64.StdEncoding.DecodeString(content)
  12. if dataErr != nil {
  13. return
  14. }
  15. err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, dataBuffer, 0666)
  16. if err != nil {
  17. return
  18. }
  19. message.Url = url
  20. message.Content = ""
  21. } else if message.ContentType == 3 {
  22. // 普通的文件二进制上传
  23. fileSuffix := util.GetFileType(message.File)
  24. nullStr := ""
  25. if nullStr == fileSuffix {
  26. fileSuffix = strings.ToLower(message.FileSuffix)
  27. }
  28. contentType := util.GetContentTypeBySuffix(fileSuffix)
  29. url := uuid.New().String() + "." + fileSuffix
  30. err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, message.File, 0666)
  31. if err != nil {
  32. return
  33. }
  34. message.Url = url
  35. message.File = nil
  36. message.ContentType = contentType
  37. }
  38. service.MessageService.SaveMessage(*message) //这句话就是写入数据库的意思,在service包中有详细解释
  39. }

util

在获取类型的时候,遇到了util这个东西,这个是自己编写的,目的就是为了判断文件类型,打开
/pkg/common/file_suffix.go查看

  1. package util
  2. import (
  3. "bytes"
  4. "gochat/pkg/common/constant"
  5. "encoding/hex"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "github.com/wxnacy/wgo/arrays"
  10. )
  11. var fileTypeMap sync.Map
  12. func init() {
  13. fileTypeMap.Store("ffd8ffe000104a464946", "jpg") //JPEG (jpg)
  14. fileTypeMap.Store("89504e470d0a1a0a0000", "png") //PNG (png)
  15. fileTypeMap.Store("47494638396126026f01", "gif") //GIF (gif)
  16. fileTypeMap.Store("49492a00227105008037", "tif") //TIFF (tif)
  17. fileTypeMap.Store("424d228c010000000000", "bmp") //16色位图(bmp)
  18. fileTypeMap.Store("424d8240090000000000", "bmp") //24位位图(bmp)
  19. fileTypeMap.Store("424d8e1b030000000000", "bmp") //256色位图(bmp)
  20. fileTypeMap.Store("41433130313500000000", "dwg") //CAD (dwg)
  21. fileTypeMap.Store("3c21444f435459504520", "html") //HTML (html) 3c68746d6c3e0 3c68746d6c3e0
  22. fileTypeMap.Store("3c68746d6c3e0", "html") //HTML (html) 3c68746d6c3e0 3c68746d6c3e0
  23. fileTypeMap.Store("3c21646f637479706520", "htm") //HTM (htm)
  24. fileTypeMap.Store("48544d4c207b0d0a0942", "css") //css
  25. fileTypeMap.Store("696b2e71623d696b2e71", "js") //js
  26. fileTypeMap.Store("7b5c727466315c616e73", "rtf") //Rich Text Format (rtf)
  27. fileTypeMap.Store("38425053000100000000", "psd") //Photoshop (psd)
  28. fileTypeMap.Store("46726f6d3a203d3f6762", "eml") //Email [Outlook Express 6] (eml)
  29. fileTypeMap.Store("d0cf11e0a1b11ae10000", "vsd") //Visio 绘图
  30. fileTypeMap.Store("5374616E64617264204A", "mdb") //MS Access (mdb)
  31. fileTypeMap.Store("252150532D41646F6265", "ps")
  32. fileTypeMap.Store("255044462d312e350d0a", "pdf") //Adobe Acrobat (pdf)
  33. fileTypeMap.Store("D0CF11E0", "xls") //xls
  34. fileTypeMap.Store("504B030414000600080000002100", "xlsx") //xls
  35. fileTypeMap.Store("d0cf11e0a1b11ae10000", "doc") //MS Excel 注意:word、msi 和 excel的文件头一样
  36. fileTypeMap.Store("504b0304140006000800", "docx") //docx文件
  37. fileTypeMap.Store("d0cf11e0a1b11ae10000", "wps") //WPS文字wps、表格et、演示dps都是一样的
  38. fileTypeMap.Store("2e524d46000000120001", "rmvb") //rmvb/rm相同
  39. fileTypeMap.Store("464c5601050000000900", "flv") //flv与f4v相同
  40. fileTypeMap.Store("00000020667479706d70", "mp4")
  41. fileTypeMap.Store("49443303000000002176", "mp3")
  42. fileTypeMap.Store("000001ba210001000180", "mpg") //
  43. fileTypeMap.Store("3026b2758e66cf11a6d9", "wmv") //wmv与asf相同
  44. fileTypeMap.Store("52494646e27807005741", "wav") //Wave (wav)
  45. fileTypeMap.Store("52494646246009005741", "wav") //Wave (wav)
  46. fileTypeMap.Store("52494646", "wav") //Wave (wav)
  47. fileTypeMap.Store("52494646d07d60074156", "avi")
  48. fileTypeMap.Store("1a45dfa3a34286810142", "webm")
  49. fileTypeMap.Store("4d546864000000060001", "mid") //MIDI (mid)
  50. fileTypeMap.Store("504b0304140000000800", "zip")
  51. fileTypeMap.Store("526172211a0700cf9073", "rar")
  52. fileTypeMap.Store("235468697320636f6e66", "ini")
  53. fileTypeMap.Store("504b03040a0000000000", "jar")
  54. fileTypeMap.Store("4d5a9000030000000400", "exe") //可执行文件
  55. fileTypeMap.Store("3c25402070616765206c", "jsp") //jsp文件
  56. fileTypeMap.Store("4d616e69666573742d56", "mf") //MF文件
  57. fileTypeMap.Store("3c3f786d6c2076657273", "xml") //xml文件
  58. fileTypeMap.Store("494e5345525420494e54", "sql") //xml文件
  59. fileTypeMap.Store("7061636b616765207765", "java") //java文件
  60. fileTypeMap.Store("406563686f206f66660d", "bat") //bat文件
  61. fileTypeMap.Store("1f8b0800000000000000", "gz") //gz文件
  62. fileTypeMap.Store("6c6f67346a2e726f6f74", "properties") //bat文件
  63. fileTypeMap.Store("cafebabe0000002e0041", "class") //bat文件
  64. fileTypeMap.Store("49545346030000006000", "chm") //bat文件
  65. fileTypeMap.Store("04000000010000001300", "mxp") //bat文件
  66. fileTypeMap.Store("6431303a637265617465", "torrent")
  67. fileTypeMap.Store("6D6F6F76", "mov") //Quicktime (mov)
  68. fileTypeMap.Store("FF575043", "wpd") //WordPerfect (wpd)
  69. fileTypeMap.Store("CFAD12FEC5FD746F", "dbx") //Outlook Express (dbx)
  70. fileTypeMap.Store("2142444E", "pst") //Outlook (pst)
  71. fileTypeMap.Store("AC9EBD8F", "qdf") //Quicken (qdf)
  72. fileTypeMap.Store("E3828596", "pwl") //Windows Password (pwl)
  73. fileTypeMap.Store("2E7261FD", "ram") //Real Audio (ram)
  74. }
  75. // 获取前面结果字节的二进制
  76. func bytesToHexString(src []byte) string {
  77. res := bytes.Buffer{}
  78. if src == nil || len(src) <= 0 {
  79. return ""
  80. }
  81. temp := make([]byte, 0)
  82. for _, v := range src {
  83. sub := v & 0xFF
  84. hv := hex.EncodeToString(append(temp, sub))
  85. if len(hv) < 2 {
  86. res.WriteString(strconv.FormatInt(int64(0), 10))
  87. }
  88. res.WriteString(hv)
  89. }
  90. return res.String()
  91. }
  92. // 用文件前面几个字节来判断
  93. // fSrc: 文件字节流(就用前面几个字节)
  94. func GetFileType(fSrc []byte) string {
  95. var fileType string
  96. fileCode := bytesToHexString(fSrc)
  97. fileTypeMap.Range(func(key, value interface{}) bool {
  98. k := key.(string)
  99. v := value.(string)
  100. if strings.HasPrefix(fileCode, strings.ToLower(k)) ||
  101. strings.HasPrefix(k, strings.ToLower(fileCode)) {
  102. fileType = v
  103. return false
  104. }
  105. return true
  106. })
  107. return fileType
  108. }
  109. func GetContentTypeBySuffix(suffix string) int32 {
  110. imgList := []string{"jpeg", "jpg", "png", "gif", "tif", "bmp", "dwg"}
  111. exists := arrays.Contains(imgList, suffix)
  112. if exists >= 0 {
  113. return constant.IMAGE
  114. }
  115. audioList := []string{"mp3", "wma", "wav", "mid", "ape", "flac"}
  116. existAudio := arrays.Contains(audioList, suffix)
  117. if existAudio >= 0 {
  118. return constant.AUDIO
  119. }
  120. videoList := []string{"rmvb", "flv", "mp4", "mpg", "mpeg", "avi", "rm", "mov", "wmv", "webm"}
  121. existVideo := arrays.Contains(videoList, suffix)
  122. if existVideo >= 0 {
  123. return constant.VIDEO
  124. }
  125. return constant.FILE
  126. }
  1. 里面使用了一个sync.Map,这个是线程安全的,防止使用的时候遇到冲突,然后在根据map与读取到的头进行对比,返回相对应的名称

接下来是群组消息的发送

  1. func sendGroupMessage(msg *protocol.Message, s *Server) {
  2. // 发送给群组的消息,查找该群所有的用户进行发送
  3. users := service.GroupService.GetUserIdByGroupUuid(msg.To)
  4. for _, user := range users {
  5. if user.Uuid == msg.From {
  6. continue
  7. }
  8. client, ok := s.Clients[user.Uuid]
  9. if !ok {
  10. continue
  11. }
  12. fromUserDetails := service.UserService.GetUserDetails(msg.From)
  13. // 由于发送群聊时,from是个人,to是群聊uuid。所以在返回消息时,将form修改为群聊uuid,和单聊进行统一
  14. msgSend := protocol.Message{
  15. Avatar: fromUserDetails.Avatar,
  16. FromUsername: msg.FromUsername,
  17. From: msg.To,
  18. To: msg.From,
  19. Content: msg.Content,
  20. ContentType: msg.ContentType,
  21. Type: msg.Type,
  22. MessageType: msg.MessageType,
  23. Url: msg.Url,
  24. }
  25. msgByte, err := proto.Marshal(&msgSend)
  26. if err == nil {
  27. client.Send <- msgByte
  28. }
  29. }
  30. }

read and write

这俩可以说是比较重要的了,作为client的两个方法,提供着对于message逻辑的最主要的功能
其中Read()方法

  1. func (c *Client) Read() {
  2. defer func() {
  3. MyServer.Ungister <- c
  4. c.Conn.Close()
  5. }()
  6. for {
  7. c.Conn.PongHandler()
  8. _, message, err := c.Conn.ReadMessage()
  9. if err != nil {
  10. MyServer.Ungister <- c
  11. c.Conn.Close()
  12. break
  13. }
  14. msg := &protocol.Message{}
  15. proto.Unmarshal(message, msg)
  16. // pong WebSocket 协议定义了三种类型的控制消息:
  17. //close、ping 和 pong。调用连接 WriteControl、WriteMessage 或 NextWriter 方法
  18. //向对端发送控制消息。
  19. if msg.Type == constant.HEAT_BEAT {
  20. pong := &protocol.Message{
  21. Content: constant.PONG,
  22. Type: constant.HEAT_BEAT,
  23. }
  24. pongByte, err2 := proto.Marshal(pong)
  25. if nil != err2 {
  26. }
  27. c.Conn.WriteMessage(websocket.BinaryMessage, pongByte)
  28. } else {
  29. MyServer.Broadcast <- message
  30. }
  31. }
  32. }
  33. func (c *Client) Write() {
  34. defer func() {
  35. c.Conn.Close()
  36. }()
  37. for message := range c.Send {
  38. c.Conn.WriteMessage(websocket.BinaryMessage, message)
  39. }
  40. }
  1. 在之前已经对每一个连接启动了两个协程readwrite进行读取,代码逻辑很简单,反正就是读取信息和发送消息。<br />那今天就结束了,明天详解api中的代码逻辑