有时候服务端需要向客户端主动推送消息,因此需要建立websocket通信
1. websocket库
go get github.com/gorilla/websocket
2. 服务端
2.1 创建websockt客户端的管理
/**
* @Author: cyj19
* @Date: 2022/5/24 10:51
*/
package ws
import (
"context"
"github.com/gorilla/websocket"
"log"
)
// ClientManager websocket客户端管理
type ClientManager struct {
Clients map[string]*Client // 客户端集合
BroadCastChan chan []byte // 广播管道
RegisterChan chan *Client // 注册管道
UnregisterChan chan *Client // 注销管道
}
// Client websocket客户端
type Client struct {
ID string
Conn *websocket.Conn // websocket连接
SendChan chan []byte // 发送管道
}
// Message 消息体
type Message struct {
Sender string `json:"sender,omitempty"`
Recipient string `json:"recipient,omitempty"`
Content string `json:"content,omitempty"`
}
var Manager = &ClientManager{
Clients: make(map[string]*Client),
BroadCastChan: make(chan []byte),
RegisterChan: make(chan *Client),
UnregisterChan: make(chan *Client),
}
func (m *ClientManager) Close() {
log.Println("关闭manager")
close(m.RegisterChan)
close(m.UnregisterChan)
close(m.BroadCastChan)
m.Clients = nil
}
// Start 启动websocket服务
func (m *ClientManager) Start(ctx context.Context) {
defer func() {
m.Close()
}()
for {
select {
case <-ctx.Done():
return
case conn := <-m.RegisterChan:
log.Println("注册 ", conn.ID)
m.Clients[conn.ID] = conn
//msg, _ := json.Marshal(&Message{Content: "/A new socket has connected."})
case conn := <-m.UnregisterChan:
if _, ok := m.Clients[conn.ID]; ok {
log.Println("注销 ", conn.ID)
// 关闭发送管道
close(conn.SendChan)
delete(m.Clients, conn.ID)
}
case msg := <-m.BroadCastChan:
for _, conn := range m.Clients {
select {
case conn.SendChan <- msg:
default:
// 无法发送数据,注销该连接
close(conn.SendChan)
delete(m.Clients, conn.ID)
}
}
}
}
}
// BroadCast 广播消息
func (m *ClientManager) BroadCast(msg []byte){
m.BroadCastChan <- msg
}
// Send 向客户端发送数据
func (c *Client) Send() {
defer func() {
c.Conn.Close()
}()
for {
select {
case msg, ok := <-c.SendChan:
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.Conn.WriteMessage(websocket.TextMessage, msg)
}
}
}
func (c *Client) Receive() {
defer func() {
Manager.Unregister <- c
}()
for {
mType, msg, err := c.Conn.ReadMessage()
if err != nil {
log.Println(err)
break
}
log.Printf("type:%d msg:%s \n", mType, string(msg))
// TO DO ...
}
}
2.2 http协议升级为websocket协议
/**
* @Author: cyj19
* @Date: 2022/5/24 10:51
*/
package main
import (
"context"
"day20220524/ws"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/rs/xid"
"log"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := gin.Default()
g.GET("/ws", wsHandle)
go ws.Manager.Start(ctx)
s := &http.Server{
Addr: ":8081",
Handler: g,
MaxHeaderBytes: 1 << 20,
}
go func() {
if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("s.ListenAndServe error: %#v \n", err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
if err := s.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown: ", err)
}
log.Println("server exit...")
}
func wsHandle(c *gin.Context) {
wu := &websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
return true
}}
// 升级为websocket协议
conn, err := wu.Upgrade(c.Writer, c.Request, nil)
if err != nil {
http.NotFound(c.Writer, c.Request)
return
}
client := &ws.Client{ID: xid.New().String(), Conn: conn, SendChan: make(chan []byte)}
// 注册
ws.Manager.RegisterChan <- client
go client.Send()
go client.Receive()
}
3. 客户端
建立客户端进行拨号,测试服务端是否正常
/**
* @Author: cyj19
* @Date: 2022/5/24 14:02
*/
package main
import (
"flag"
"fmt"
"github.com/gorilla/websocket"
"net/url"
"time"
)
func main() {
u := url.URL{Scheme: "ws", Host: "127.0.0.1:8081", Path: "/ws"}
var dialer *websocket.Dialer
conn, _, err := dialer.Dial(u.String(), nil)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
go timeWriter(conn)
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("read:", err)
return
}
fmt.Printf("received: %s\n", message)
}
}
func timeWriter(conn *websocket.Conn) {
for {
time.Sleep(time.Second * 2)
conn.WriteMessage(websocket.TextMessage, []byte(time.Now().Format("2006-01-02 15:04:05")))
}
}