缘起
最近阅读<
本系列笔记拟采用golang练习之
案例需求(聊天服务器)
- 用户可以连接到服务器。
- 用户可以设定自己的用户名。
- 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。
目标
- 实现聊天服务端, 支持端口监听, 多个客户端的连入, 消息收发, 广播, 断开, 并采集日志
- 改造已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以防止死锁
- 测试多个客户端的连入, 收发和断开, 并诊断服务端日志
设计
- IMsg: 定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
- IMsgDecoder: 定义消息解码器及其实现
- IChatClient: 定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
- tChatClient: 聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
- IChatServer: 定义聊天服务器接口, 为方便测试, 提供日志采集方法
- tChatServer: 实现聊天服务器IChatServer
单元测试
ChatServer_test.go
package chat_serverimport ("fmt"cs "learning/gooop/chat_server""strings""testing""time")func Test_ChatServer(t *testing.T) {fnAssertTrue := func(b bool, msg string) {if !b {t.Fatal(msg)}}port := 3333server := cs.NewChatServer()err := server.Open(port)if err != nil {t.Fatal(err)}clientCount := 3address := fmt.Sprintf("localhost:%v", port)for i := 0;i < clientCount;i++ {err, client := cs.DialChatClient(address)if err != nil {t.Fatal(err)}id := fmt.Sprintf("c%02d", i)client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {t.Logf("%v recv: %v\n", id, msg)})go func() {client.SetName(id)client.Send(&cs.NameMsg{id })n := 0for range time.Tick(time.Duration(1) * time.Second) {client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })n++if n >= 3 {break}}client.Close()}()}passedSeconds := 0for range time.Tick(time.Second) {passedSeconds++t.Logf("%v seconds passed", passedSeconds)if passedSeconds >= 5 {break}}server.Close()logs := server.GetLogs()fnHasLog := func(log string) bool {for _,it := range logs {if strings.Contains(it, log) {return true}}return false}for i := 0;i < clientCount;i++ {msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)}}
测试输出
$ go test -v ChatServer_test.go=== RUN Test_ChatServertChatServer.handleIncomingConn, clientCount=1tChatServer.handleIncomingConn, clientCount=2tChatServer.handleIncomingConn, clientCount=3ChatServer_test.go:59: 1 seconds passedChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}ChatServer_test.go:59: 2 seconds passedChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}tChatClient.postConnClosed, c00, serverFlag=falsetChatClient.postConnClosed, c02, serverFlag=falsetChatClient.postConnClosed, c01, serverFlag=falsetChatClient.postConnClosed, c02, serverFlag=truetChatServer.handleClientClosed, c02tChatServer.handleClientClosed, c02, clientCount=2tChatClient.postConnClosed, c01, serverFlag=truetChatServer.handleClientClosed, c01tChatServer.handleClientClosed, c01, clientCount=1ChatServer_test.go:59: 3 seconds passedtChatClient.postConnClosed, c00, serverFlag=truetChatServer.handleClientClosed, c00tChatServer.handleClientClosed, c00, clientCount=0ChatServer_test.go:59: 4 seconds passedChatServer_test.go:59: 5 seconds passed--- PASS: Test_ChatServer (5.00s)PASSok command-line-arguments 5.003s
IMsg.go
定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
package chat_serverimport ("encoding/base64""fmt")type IMsg interface {Encode() string}type NameMsg struct {Name string}func (me *NameMsg) Encode() string {return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))}type ChatMsg struct {Name stringWords string}func (me *ChatMsg) Encode() string {return fmt.Sprintf("CHAT %s %s\n",base64.StdEncoding.EncodeToString([]byte(me.Name)),base64.StdEncoding.EncodeToString([]byte(me.Words)),)}
IMsgDecoder.go
定义消息解码器及其实现
package chat_serverimport ("encoding/base64""strings")type IMsgDecoder interface {Decode(line string) (bool, IMsg)}type tMsgDecoder struct {}func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {items := strings.Split(line, " ")size := len(items)if items[0] == "NAME" && size == 2 {name, err := base64.StdEncoding.DecodeString(items[1])if err != nil {return false, nil}return true, &NameMsg{Name: string(name),}}if items[0] == "CHAT" && size == 3 {name, err := base64.StdEncoding.DecodeString(items[1])if err != nil {return false, nil}words, err := base64.StdEncoding.DecodeString(items[2])if err != nil {return false, nil}return true, &ChatMsg{Name: string(name),Words: string(words),}}return false, nil}var MsgDecoder = &tMsgDecoder{}
IChatClient.go
定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
package chat_servertype IChatClient interface {GetName() stringSetName(name string)Send(msg IMsg)RecvHandler(handler ClientRecvFunc)CloseHandler(handler ClientCloseFunc)Close()}type ClientRecvFunc func(client IChatClient, msg IMsg)type ClientCloseFunc func(client IChatClient)
tChatClient.go
聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
package chat_serverimport ("bufio""fmt""io""net""sync/atomic""time")type tChatClient struct {conn net.Connname stringopenFlag int32closeFlag int32serverFlag boolcloseChan chan boolsendChan chan IMsgsendLogs []IMsgdropLogs []IMsgrecvLogs []IMsgpendingSend int32recvHandler ClientRecvFunccloseHandler ClientCloseFunc}var gMaxPendingSend int32 = 100func DialChatClient(address string) (error, IChatClient) {conn, err := net.Dial("tcp", address)if err != nil {return err, nil}return nil, openChatClient(conn, false)}func openChatClient(conn net.Conn, serverFlag bool) IChatClient {it := &tChatClient{conn: conn,openFlag: 0,closeFlag: 0,serverFlag: serverFlag,closeChan: make(chan bool),sendChan: make(chan IMsg, gMaxPendingSend),name: "anonymous",sendLogs: []IMsg{},dropLogs: []IMsg{},recvLogs: []IMsg{},}it.open()return it}func (me *tChatClient) GetName() string {return me.name}func (me *tChatClient) SetName(name string) {me.name = name}func (me *tChatClient) open(){if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return}go me.beginWrite()go me.beginRead()}func (me *tChatClient) isClosed() bool {return me.closeFlag != 0}func (me *tChatClient) isNotClosed() bool {return !me.isClosed()}func (me *tChatClient) Send(msg IMsg) {if me.isClosed() {return}if me.pendingSend < gMaxPendingSend {atomic.AddInt32(&me.pendingSend, 1)me.sendChan <- msg} else {me.dropLogs = append(me.dropLogs, msg)}}func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {if me.isNotClosed() {me.recvHandler = handler}}func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {if me.isNotClosed() {me.closeHandler = handler}}func (me *tChatClient) Close() {if me.isNotClosed() {me.closeConn()}}func (me *tChatClient) beginWrite() {writer := io.Writer(me.conn)for {select {case <- me.closeChan:_ = me.conn.Close()me.closeFlag = 2me.postConnClosed()returncase msg := <- me.sendChan:atomic.AddInt32(&me.pendingSend, -1)_,e := writer.Write([]byte(msg.Encode()))if e != nil {me.closeConn()break} else {me.sendLogs = append(me.sendLogs, msg)}case <- time.After(time.Duration(10) * time.Second):me.postRecvTimeout()break}}}func (me *tChatClient) postRecvTimeout() {fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)me.closeConn()}func (me *tChatClient) beginRead() {reader := bufio.NewReader(me.conn)for {line, err := reader.ReadString('\n')if err != nil {me.closeConn()break}ok, msg := MsgDecoder.Decode(line)if ok {fn := me.recvHandlerif fn != nil {fn(me, msg)}me.recvLogs = append(me.recvLogs, msg)}}}func (me *tChatClient) closeConn() {if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {return}me.closeChan <- true}func (me *tChatClient) postConnClosed() {fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)handler := me.closeHandlerif handler != nil {handler(me)}me.closeHandler = nilme.recvHandler = nil}
IChatServer.go
定义聊天服务器接口, 为方便测试, 提供日志采集方法
package chat_servertype IChatServer interface {Open(port int) errorBroadcast(msg IMsg)Close()GetLogs() []string}
tChatServer.go
实现聊天服务器IChatServer
package chat_serverimport ("errors""fmt""net""sync""sync/atomic")type tChatServer struct {openFlag int32closeFlag int32clients []IChatClientclientCount intclientLock *sync.RWMutexlistener net.ListenerrecvLogs []IMsglogs []string}func NewChatServer() IChatServer {it := &tChatServer{openFlag: 0,closeFlag: 0,clients: []IChatClient{},clientCount: 0,clientLock: new(sync.RWMutex),listener: nil,recvLogs: []IMsg{},}return it}func (me *tChatServer) Open(port int) error {if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return errors.New("server already opened")}listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))if err != nil {return err}me.listener = listenergo me.beginListening()return nil}func (me *tChatServer) logf(f string, args... interface{}) {msg := fmt.Sprintf(f, args...)me.logs = append(me.logs, msg)fmt.Println(msg)}func (me *tChatServer) GetLogs() []string {return me.logs}func (me *tChatServer) isClosed() bool {return me.closeFlag != 0}func (me *tChatServer) isNotClosed() bool {return !me.isClosed()}func (me *tChatServer) beginListening() {for !me.isClosed() {conn, err := me.listener.Accept()if err != nil {me.Close()break}me.handleIncomingConn(conn)}}func (me *tChatServer) Close() {if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {return}_ = me.listener.Close()me.closeAllClients()}func (me *tChatServer) closeAllClients() {me.clientLock.Lock()defer me.clientLock.Unlock()for i,it := range me.clients {if it != nil {it.Close()me.clients[i] = nil}}me.clientCount = 0}func (me *tChatServer) handleIncomingConn(conn net.Conn) {// init clientclient := openChatClient(conn, true)client.RecvHandler(me.handleClientMsg)client.CloseHandler(me.handleClientClosed)// lock me.clientsme.clientLock.Lock()defer me.clientLock.Unlock()// append to me.clientsif len(me.clients) > me.clientCount {me.clients[me.clientCount] = client} else {me.clients = append(me.clients, client)}me.clientCount++me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)}func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {me.recvLogs = append(me.recvLogs, msg)if nameMsg,ok := msg.(*NameMsg);ok {client.SetName(nameMsg.Name)} else if _, ok := msg.(*ChatMsg);ok {me.Broadcast(msg)}}func (me *tChatServer) handleClientClosed(client IChatClient) {me.logf("tChatServer.handleClientClosed, %s", client.GetName())me.clientLock.Lock()defer me.clientLock.Unlock()if me.clientCount <= 0 {return}lastI := me.clientCount - 1for i,it := range me.clients {if it == client {if i == lastI {me.clients[i] = nil} else {me.clients[i], me.clients[lastI] = me.clients[lastI], nil}me.clientCount--break}}me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)}func (me *tChatServer) Broadcast(msg IMsg) {me.clientLock.RLock()defer me.clientLock.RUnlock()for _,it := range me.clients {if it != nil {it.Send(msg)}}}
(未完待续)
