缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

案例需求(聊天服务器)

  • 用户可以连接到服务器。
  • 用户可以设定自己的用户名。
  • 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。

目标

  • 实现聊天服务端, 支持端口监听, 多个客户端的连入, 消息收发, 广播, 断开, 并采集日志
  • 改造已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以防止死锁
  • 测试多个客户端的连入, 收发和断开, 并诊断服务端日志

设计

  • IMsg: 定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
  • IMsgDecoder: 定义消息解码器及其实现
  • IChatClient: 定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
  • tChatClient: 聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
  • IChatServer: 定义聊天服务器接口, 为方便测试, 提供日志采集方法
  • tChatServer: 实现聊天服务器IChatServer

单元测试

ChatServer_test.go

  1. package chat_server
  2. import (
  3. "fmt"
  4. cs "learning/gooop/chat_server"
  5. "strings"
  6. "testing"
  7. "time"
  8. )
  9. func Test_ChatServer(t *testing.T) {
  10. fnAssertTrue := func(b bool, msg string) {
  11. if !b {
  12. t.Fatal(msg)
  13. }
  14. }
  15. port := 3333
  16. server := cs.NewChatServer()
  17. err := server.Open(port)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. clientCount := 3
  22. address := fmt.Sprintf("localhost:%v", port)
  23. for i := 0;i < clientCount;i++ {
  24. err, client := cs.DialChatClient(address)
  25. if err != nil {
  26. t.Fatal(err)
  27. }
  28. id := fmt.Sprintf("c%02d", i)
  29. client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {
  30. t.Logf("%v recv: %v\n", id, msg)
  31. })
  32. go func() {
  33. client.SetName(id)
  34. client.Send(&cs.NameMsg{id })
  35. n := 0
  36. for range time.Tick(time.Duration(1) * time.Second) {
  37. client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })
  38. n++
  39. if n >= 3 {
  40. break
  41. }
  42. }
  43. client.Close()
  44. }()
  45. }
  46. passedSeconds := 0
  47. for range time.Tick(time.Second) {
  48. passedSeconds++
  49. t.Logf("%v seconds passed", passedSeconds)
  50. if passedSeconds >= 5 {
  51. break
  52. }
  53. }
  54. server.Close()
  55. logs := server.GetLogs()
  56. fnHasLog := func(log string) bool {
  57. for _,it := range logs {
  58. if strings.Contains(it, log) {
  59. return true
  60. }
  61. }
  62. return false
  63. }
  64. for i := 0;i < clientCount;i++ {
  65. msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
  66. fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
  67. msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
  68. fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
  69. }
  70. }

测试输出

  1. $ go test -v ChatServer_test.go
  2. === RUN Test_ChatServer
  3. tChatServer.handleIncomingConn, clientCount=1
  4. tChatServer.handleIncomingConn, clientCount=2
  5. tChatServer.handleIncomingConn, clientCount=3
  6. ChatServer_test.go:59: 1 seconds passed
  7. ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
  8. ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
  9. ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
  10. ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
  11. ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
  12. ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
  13. ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
  14. ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
  15. ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
  16. ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
  17. ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
  18. ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
  19. ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
  20. ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
  21. ChatServer_test.go:59: 2 seconds passed
  22. ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
  23. ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
  24. ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
  25. tChatClient.postConnClosed, c00, serverFlag=false
  26. tChatClient.postConnClosed, c02, serverFlag=false
  27. tChatClient.postConnClosed, c01, serverFlag=false
  28. tChatClient.postConnClosed, c02, serverFlag=true
  29. tChatServer.handleClientClosed, c02
  30. tChatServer.handleClientClosed, c02, clientCount=2
  31. tChatClient.postConnClosed, c01, serverFlag=true
  32. tChatServer.handleClientClosed, c01
  33. tChatServer.handleClientClosed, c01, clientCount=1
  34. ChatServer_test.go:59: 3 seconds passed
  35. tChatClient.postConnClosed, c00, serverFlag=true
  36. tChatServer.handleClientClosed, c00
  37. tChatServer.handleClientClosed, c00, clientCount=0
  38. ChatServer_test.go:59: 4 seconds passed
  39. ChatServer_test.go:59: 5 seconds passed
  40. --- PASS: Test_ChatServer (5.00s)
  41. PASS
  42. ok command-line-arguments 5.003s

IMsg.go

定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码

  1. package chat_server
  2. import (
  3. "encoding/base64"
  4. "fmt"
  5. )
  6. type IMsg interface {
  7. Encode() string
  8. }
  9. type NameMsg struct {
  10. Name string
  11. }
  12. func (me *NameMsg) Encode() string {
  13. return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
  14. }
  15. type ChatMsg struct {
  16. Name string
  17. Words string
  18. }
  19. func (me *ChatMsg) Encode() string {
  20. return fmt.Sprintf("CHAT %s %s\n",
  21. base64.StdEncoding.EncodeToString([]byte(me.Name)),
  22. base64.StdEncoding.EncodeToString([]byte(me.Words)),
  23. )
  24. }

IMsgDecoder.go

定义消息解码器及其实现

  1. package chat_server
  2. import (
  3. "encoding/base64"
  4. "strings"
  5. )
  6. type IMsgDecoder interface {
  7. Decode(line string) (bool, IMsg)
  8. }
  9. type tMsgDecoder struct {
  10. }
  11. func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {
  12. items := strings.Split(line, " ")
  13. size := len(items)
  14. if items[0] == "NAME" && size == 2 {
  15. name, err := base64.StdEncoding.DecodeString(items[1])
  16. if err != nil {
  17. return false, nil
  18. }
  19. return true, &NameMsg{
  20. Name: string(name),
  21. }
  22. }
  23. if items[0] == "CHAT" && size == 3 {
  24. name, err := base64.StdEncoding.DecodeString(items[1])
  25. if err != nil {
  26. return false, nil
  27. }
  28. words, err := base64.StdEncoding.DecodeString(items[2])
  29. if err != nil {
  30. return false, nil
  31. }
  32. return true, &ChatMsg{
  33. Name: string(name),
  34. Words: string(words),
  35. }
  36. }
  37. return false, nil
  38. }
  39. var MsgDecoder = &tMsgDecoder{}

IChatClient.go

定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.

  1. package chat_server
  2. type IChatClient interface {
  3. GetName() string
  4. SetName(name string)
  5. Send(msg IMsg)
  6. RecvHandler(handler ClientRecvFunc)
  7. CloseHandler(handler ClientCloseFunc)
  8. Close()
  9. }
  10. type ClientRecvFunc func(client IChatClient, msg IMsg)
  11. type ClientCloseFunc func(client IChatClient)

tChatClient.go

聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.

  1. package chat_server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type tChatClient struct {
  11. conn net.Conn
  12. name string
  13. openFlag int32
  14. closeFlag int32
  15. serverFlag bool
  16. closeChan chan bool
  17. sendChan chan IMsg
  18. sendLogs []IMsg
  19. dropLogs []IMsg
  20. recvLogs []IMsg
  21. pendingSend int32
  22. recvHandler ClientRecvFunc
  23. closeHandler ClientCloseFunc
  24. }
  25. var gMaxPendingSend int32 = 100
  26. func DialChatClient(address string) (error, IChatClient) {
  27. conn, err := net.Dial("tcp", address)
  28. if err != nil {
  29. return err, nil
  30. }
  31. return nil, openChatClient(conn, false)
  32. }
  33. func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
  34. it := &tChatClient{
  35. conn: conn,
  36. openFlag: 0,
  37. closeFlag: 0,
  38. serverFlag: serverFlag,
  39. closeChan: make(chan bool),
  40. sendChan: make(chan IMsg, gMaxPendingSend),
  41. name: "anonymous",
  42. sendLogs: []IMsg{},
  43. dropLogs: []IMsg{},
  44. recvLogs: []IMsg{},
  45. }
  46. it.open()
  47. return it
  48. }
  49. func (me *tChatClient) GetName() string {
  50. return me.name
  51. }
  52. func (me *tChatClient) SetName(name string) {
  53. me.name = name
  54. }
  55. func (me *tChatClient) open(){
  56. if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
  57. return
  58. }
  59. go me.beginWrite()
  60. go me.beginRead()
  61. }
  62. func (me *tChatClient) isClosed() bool {
  63. return me.closeFlag != 0
  64. }
  65. func (me *tChatClient) isNotClosed() bool {
  66. return !me.isClosed()
  67. }
  68. func (me *tChatClient) Send(msg IMsg) {
  69. if me.isClosed() {
  70. return
  71. }
  72. if me.pendingSend < gMaxPendingSend {
  73. atomic.AddInt32(&me.pendingSend, 1)
  74. me.sendChan <- msg
  75. } else {
  76. me.dropLogs = append(me.dropLogs, msg)
  77. }
  78. }
  79. func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {
  80. if me.isNotClosed() {
  81. me.recvHandler = handler
  82. }
  83. }
  84. func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {
  85. if me.isNotClosed() {
  86. me.closeHandler = handler
  87. }
  88. }
  89. func (me *tChatClient) Close() {
  90. if me.isNotClosed() {
  91. me.closeConn()
  92. }
  93. }
  94. func (me *tChatClient) beginWrite() {
  95. writer := io.Writer(me.conn)
  96. for {
  97. select {
  98. case <- me.closeChan:
  99. _ = me.conn.Close()
  100. me.closeFlag = 2
  101. me.postConnClosed()
  102. return
  103. case msg := <- me.sendChan:
  104. atomic.AddInt32(&me.pendingSend, -1)
  105. _,e := writer.Write([]byte(msg.Encode()))
  106. if e != nil {
  107. me.closeConn()
  108. break
  109. } else {
  110. me.sendLogs = append(me.sendLogs, msg)
  111. }
  112. case <- time.After(time.Duration(10) * time.Second):
  113. me.postRecvTimeout()
  114. break
  115. }
  116. }
  117. }
  118. func (me *tChatClient) postRecvTimeout() {
  119. fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)
  120. me.closeConn()
  121. }
  122. func (me *tChatClient) beginRead() {
  123. reader := bufio.NewReader(me.conn)
  124. for {
  125. line, err := reader.ReadString('\n')
  126. if err != nil {
  127. me.closeConn()
  128. break
  129. }
  130. ok, msg := MsgDecoder.Decode(line)
  131. if ok {
  132. fn := me.recvHandler
  133. if fn != nil {
  134. fn(me, msg)
  135. }
  136. me.recvLogs = append(me.recvLogs, msg)
  137. }
  138. }
  139. }
  140. func (me *tChatClient) closeConn() {
  141. if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
  142. return
  143. }
  144. me.closeChan <- true
  145. }
  146. func (me *tChatClient) postConnClosed() {
  147. fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)
  148. handler := me.closeHandler
  149. if handler != nil {
  150. handler(me)
  151. }
  152. me.closeHandler = nil
  153. me.recvHandler = nil
  154. }

IChatServer.go

定义聊天服务器接口, 为方便测试, 提供日志采集方法

  1. package chat_server
  2. type IChatServer interface {
  3. Open(port int) error
  4. Broadcast(msg IMsg)
  5. Close()
  6. GetLogs() []string
  7. }

tChatServer.go

实现聊天服务器IChatServer

  1. package chat_server
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type tChatServer struct {
  10. openFlag int32
  11. closeFlag int32
  12. clients []IChatClient
  13. clientCount int
  14. clientLock *sync.RWMutex
  15. listener net.Listener
  16. recvLogs []IMsg
  17. logs []string
  18. }
  19. func NewChatServer() IChatServer {
  20. it := &tChatServer{
  21. openFlag: 0,
  22. closeFlag: 0,
  23. clients: []IChatClient{},
  24. clientCount: 0,
  25. clientLock: new(sync.RWMutex),
  26. listener: nil,
  27. recvLogs: []IMsg{},
  28. }
  29. return it
  30. }
  31. func (me *tChatServer) Open(port int) error {
  32. if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
  33. return errors.New("server already opened")
  34. }
  35. listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
  36. if err != nil {
  37. return err
  38. }
  39. me.listener = listener
  40. go me.beginListening()
  41. return nil
  42. }
  43. func (me *tChatServer) logf(f string, args... interface{}) {
  44. msg := fmt.Sprintf(f, args...)
  45. me.logs = append(me.logs, msg)
  46. fmt.Println(msg)
  47. }
  48. func (me *tChatServer) GetLogs() []string {
  49. return me.logs
  50. }
  51. func (me *tChatServer) isClosed() bool {
  52. return me.closeFlag != 0
  53. }
  54. func (me *tChatServer) isNotClosed() bool {
  55. return !me.isClosed()
  56. }
  57. func (me *tChatServer) beginListening() {
  58. for !me.isClosed() {
  59. conn, err := me.listener.Accept()
  60. if err != nil {
  61. me.Close()
  62. break
  63. }
  64. me.handleIncomingConn(conn)
  65. }
  66. }
  67. func (me *tChatServer) Close() {
  68. if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
  69. return
  70. }
  71. _ = me.listener.Close()
  72. me.closeAllClients()
  73. }
  74. func (me *tChatServer) closeAllClients() {
  75. me.clientLock.Lock()
  76. defer me.clientLock.Unlock()
  77. for i,it := range me.clients {
  78. if it != nil {
  79. it.Close()
  80. me.clients[i] = nil
  81. }
  82. }
  83. me.clientCount = 0
  84. }
  85. func (me *tChatServer) handleIncomingConn(conn net.Conn) {
  86. // init client
  87. client := openChatClient(conn, true)
  88. client.RecvHandler(me.handleClientMsg)
  89. client.CloseHandler(me.handleClientClosed)
  90. // lock me.clients
  91. me.clientLock.Lock()
  92. defer me.clientLock.Unlock()
  93. // append to me.clients
  94. if len(me.clients) > me.clientCount {
  95. me.clients[me.clientCount] = client
  96. } else {
  97. me.clients = append(me.clients, client)
  98. }
  99. me.clientCount++
  100. me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
  101. }
  102. func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {
  103. me.recvLogs = append(me.recvLogs, msg)
  104. if nameMsg,ok := msg.(*NameMsg);ok {
  105. client.SetName(nameMsg.Name)
  106. } else if _, ok := msg.(*ChatMsg);ok {
  107. me.Broadcast(msg)
  108. }
  109. }
  110. func (me *tChatServer) handleClientClosed(client IChatClient) {
  111. me.logf("tChatServer.handleClientClosed, %s", client.GetName())
  112. me.clientLock.Lock()
  113. defer me.clientLock.Unlock()
  114. if me.clientCount <= 0 {
  115. return
  116. }
  117. lastI := me.clientCount - 1
  118. for i,it := range me.clients {
  119. if it == client {
  120. if i == lastI {
  121. me.clients[i] = nil
  122. } else {
  123. me.clients[i], me.clients[lastI] = me.clients[lastI], nil
  124. }
  125. me.clientCount--
  126. break
  127. }
  128. }
  129. me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
  130. }
  131. func (me *tChatServer) Broadcast(msg IMsg) {
  132. me.clientLock.RLock()
  133. defer me.clientLock.RUnlock()
  134. for _,it := range me.clients {
  135. if it != nil {
  136. it.Send(msg)
  137. }
  138. }
  139. }

(未完待续)