有时候服务端需要向客户端主动推送消息,因此需要建立websocket通信
1. websocket库
go get github.com/gorilla/websocket
2. 服务端
2.1 创建websockt客户端的管理
/*** @Author: cyj19* @Date: 2022/5/24 10:51*/package wsimport ("context""github.com/gorilla/websocket""log")// ClientManager websocket客户端管理type ClientManager struct {Clients map[string]*Client // 客户端集合BroadCastChan chan []byte // 广播管道RegisterChan chan *Client // 注册管道UnregisterChan chan *Client // 注销管道}// Client websocket客户端type Client struct {ID stringConn *websocket.Conn // websocket连接SendChan chan []byte // 发送管道}// Message 消息体type Message struct {Sender string `json:"sender,omitempty"`Recipient string `json:"recipient,omitempty"`Content string `json:"content,omitempty"`}var Manager = &ClientManager{Clients: make(map[string]*Client),BroadCastChan: make(chan []byte),RegisterChan: make(chan *Client),UnregisterChan: make(chan *Client),}func (m *ClientManager) Close() {log.Println("关闭manager")close(m.RegisterChan)close(m.UnregisterChan)close(m.BroadCastChan)m.Clients = nil}// Start 启动websocket服务func (m *ClientManager) Start(ctx context.Context) {defer func() {m.Close()}()for {select {case <-ctx.Done():returncase conn := <-m.RegisterChan:log.Println("注册 ", conn.ID)m.Clients[conn.ID] = conn//msg, _ := json.Marshal(&Message{Content: "/A new socket has connected."})case conn := <-m.UnregisterChan:if _, ok := m.Clients[conn.ID]; ok {log.Println("注销 ", conn.ID)// 关闭发送管道close(conn.SendChan)delete(m.Clients, conn.ID)}case msg := <-m.BroadCastChan:for _, conn := range m.Clients {select {case conn.SendChan <- msg:default:// 无法发送数据,注销该连接close(conn.SendChan)delete(m.Clients, conn.ID)}}}}}// BroadCast 广播消息func (m *ClientManager) BroadCast(msg []byte){m.BroadCastChan <- msg}// Send 向客户端发送数据func (c *Client) Send() {defer func() {c.Conn.Close()}()for {select {case msg, ok := <-c.SendChan:if !ok {c.Conn.WriteMessage(websocket.CloseMessage, []byte{})return}c.Conn.WriteMessage(websocket.TextMessage, msg)}}}func (c *Client) Receive() {defer func() {Manager.Unregister <- c}()for {mType, msg, err := c.Conn.ReadMessage()if err != nil {log.Println(err)break}log.Printf("type:%d msg:%s \n", mType, string(msg))// TO DO ...}}
2.2 http协议升级为websocket协议
/*** @Author: cyj19* @Date: 2022/5/24 10:51*/package mainimport ("context""day20220524/ws""github.com/gin-gonic/gin""github.com/gorilla/websocket""github.com/rs/xid""log""net/http""os""os/signal""syscall")func main() {ctx, cancel := context.WithCancel(context.Background())defer cancel()g := gin.Default()g.GET("/ws", wsHandle)go ws.Manager.Start(ctx)s := &http.Server{Addr: ":8081",Handler: g,MaxHeaderBytes: 1 << 20,}go func() {if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed {log.Fatalf("s.ListenAndServe error: %#v \n", err)}}()quit := make(chan os.Signal, 1)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)<-quitif err := s.Shutdown(ctx); err != nil {log.Fatal("Server forced to shutdown: ", err)}log.Println("server exit...")}func wsHandle(c *gin.Context) {wu := &websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true}}// 升级为websocket协议conn, err := wu.Upgrade(c.Writer, c.Request, nil)if err != nil {http.NotFound(c.Writer, c.Request)return}client := &ws.Client{ID: xid.New().String(), Conn: conn, SendChan: make(chan []byte)}// 注册ws.Manager.RegisterChan <- clientgo client.Send()go client.Receive()}
3. 客户端
建立客户端进行拨号,测试服务端是否正常
/*** @Author: cyj19* @Date: 2022/5/24 14:02*/package mainimport ("flag""fmt""github.com/gorilla/websocket""net/url""time")func main() {u := url.URL{Scheme: "ws", Host: "127.0.0.1:8081", Path: "/ws"}var dialer *websocket.Dialerconn, _, err := dialer.Dial(u.String(), nil)if err != nil {fmt.Println(err)return}defer conn.Close()go timeWriter(conn)for {_, message, err := conn.ReadMessage()if err != nil {fmt.Println("read:", err)return}fmt.Printf("received: %s\n", message)}}func timeWriter(conn *websocket.Conn) {for {time.Sleep(time.Second * 2)conn.WriteMessage(websocket.TextMessage, []byte(time.Now().Format("2006-01-02 15:04:05")))}}
