有时候服务端需要向客户端主动推送消息,因此需要建立websocket通信

1. websocket库

go get github.com/gorilla/websocket

2. 服务端

2.1 创建websockt客户端的管理

  1. /**
  2. * @Author: cyj19
  3. * @Date: 2022/5/24 10:51
  4. */
  5. package ws
  6. import (
  7. "context"
  8. "github.com/gorilla/websocket"
  9. "log"
  10. )
  11. // ClientManager websocket客户端管理
  12. type ClientManager struct {
  13. Clients map[string]*Client // 客户端集合
  14. BroadCastChan chan []byte // 广播管道
  15. RegisterChan chan *Client // 注册管道
  16. UnregisterChan chan *Client // 注销管道
  17. }
  18. // Client websocket客户端
  19. type Client struct {
  20. ID string
  21. Conn *websocket.Conn // websocket连接
  22. SendChan chan []byte // 发送管道
  23. }
  24. // Message 消息体
  25. type Message struct {
  26. Sender string `json:"sender,omitempty"`
  27. Recipient string `json:"recipient,omitempty"`
  28. Content string `json:"content,omitempty"`
  29. }
  30. var Manager = &ClientManager{
  31. Clients: make(map[string]*Client),
  32. BroadCastChan: make(chan []byte),
  33. RegisterChan: make(chan *Client),
  34. UnregisterChan: make(chan *Client),
  35. }
  36. func (m *ClientManager) Close() {
  37. log.Println("关闭manager")
  38. close(m.RegisterChan)
  39. close(m.UnregisterChan)
  40. close(m.BroadCastChan)
  41. m.Clients = nil
  42. }
  43. // Start 启动websocket服务
  44. func (m *ClientManager) Start(ctx context.Context) {
  45. defer func() {
  46. m.Close()
  47. }()
  48. for {
  49. select {
  50. case <-ctx.Done():
  51. return
  52. case conn := <-m.RegisterChan:
  53. log.Println("注册 ", conn.ID)
  54. m.Clients[conn.ID] = conn
  55. //msg, _ := json.Marshal(&Message{Content: "/A new socket has connected."})
  56. case conn := <-m.UnregisterChan:
  57. if _, ok := m.Clients[conn.ID]; ok {
  58. log.Println("注销 ", conn.ID)
  59. // 关闭发送管道
  60. close(conn.SendChan)
  61. delete(m.Clients, conn.ID)
  62. }
  63. case msg := <-m.BroadCastChan:
  64. for _, conn := range m.Clients {
  65. select {
  66. case conn.SendChan <- msg:
  67. default:
  68. // 无法发送数据,注销该连接
  69. close(conn.SendChan)
  70. delete(m.Clients, conn.ID)
  71. }
  72. }
  73. }
  74. }
  75. }
  76. // BroadCast 广播消息
  77. func (m *ClientManager) BroadCast(msg []byte){
  78. m.BroadCastChan <- msg
  79. }
  80. // Send 向客户端发送数据
  81. func (c *Client) Send() {
  82. defer func() {
  83. c.Conn.Close()
  84. }()
  85. for {
  86. select {
  87. case msg, ok := <-c.SendChan:
  88. if !ok {
  89. c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
  90. return
  91. }
  92. c.Conn.WriteMessage(websocket.TextMessage, msg)
  93. }
  94. }
  95. }
  96. func (c *Client) Receive() {
  97. defer func() {
  98. Manager.Unregister <- c
  99. }()
  100. for {
  101. mType, msg, err := c.Conn.ReadMessage()
  102. if err != nil {
  103. log.Println(err)
  104. break
  105. }
  106. log.Printf("type:%d msg:%s \n", mType, string(msg))
  107. // TO DO ...
  108. }
  109. }

2.2 http协议升级为websocket协议

  1. /**
  2. * @Author: cyj19
  3. * @Date: 2022/5/24 10:51
  4. */
  5. package main
  6. import (
  7. "context"
  8. "day20220524/ws"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. "github.com/rs/xid"
  12. "log"
  13. "net/http"
  14. "os"
  15. "os/signal"
  16. "syscall"
  17. )
  18. func main() {
  19. ctx, cancel := context.WithCancel(context.Background())
  20. defer cancel()
  21. g := gin.Default()
  22. g.GET("/ws", wsHandle)
  23. go ws.Manager.Start(ctx)
  24. s := &http.Server{
  25. Addr: ":8081",
  26. Handler: g,
  27. MaxHeaderBytes: 1 << 20,
  28. }
  29. go func() {
  30. if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  31. log.Fatalf("s.ListenAndServe error: %#v \n", err)
  32. }
  33. }()
  34. quit := make(chan os.Signal, 1)
  35. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  36. <-quit
  37. if err := s.Shutdown(ctx); err != nil {
  38. log.Fatal("Server forced to shutdown: ", err)
  39. }
  40. log.Println("server exit...")
  41. }
  42. func wsHandle(c *gin.Context) {
  43. wu := &websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  44. return true
  45. }}
  46. // 升级为websocket协议
  47. conn, err := wu.Upgrade(c.Writer, c.Request, nil)
  48. if err != nil {
  49. http.NotFound(c.Writer, c.Request)
  50. return
  51. }
  52. client := &ws.Client{ID: xid.New().String(), Conn: conn, SendChan: make(chan []byte)}
  53. // 注册
  54. ws.Manager.RegisterChan <- client
  55. go client.Send()
  56. go client.Receive()
  57. }

3. 客户端

建立客户端进行拨号,测试服务端是否正常

  1. /**
  2. * @Author: cyj19
  3. * @Date: 2022/5/24 14:02
  4. */
  5. package main
  6. import (
  7. "flag"
  8. "fmt"
  9. "github.com/gorilla/websocket"
  10. "net/url"
  11. "time"
  12. )
  13. func main() {
  14. u := url.URL{Scheme: "ws", Host: "127.0.0.1:8081", Path: "/ws"}
  15. var dialer *websocket.Dialer
  16. conn, _, err := dialer.Dial(u.String(), nil)
  17. if err != nil {
  18. fmt.Println(err)
  19. return
  20. }
  21. defer conn.Close()
  22. go timeWriter(conn)
  23. for {
  24. _, message, err := conn.ReadMessage()
  25. if err != nil {
  26. fmt.Println("read:", err)
  27. return
  28. }
  29. fmt.Printf("received: %s\n", message)
  30. }
  31. }
  32. func timeWriter(conn *websocket.Conn) {
  33. for {
  34. time.Sleep(time.Second * 2)
  35. conn.WriteMessage(websocket.TextMessage, []byte(time.Now().Format("2006-01-02 15:04:05")))
  36. }
  37. }