缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

案例需求(聊天服务器)

  • 用户可以连接到服务器。
  • 用户可以设定自己的用户名。
  • 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。

目标(Day 4)

  • 诊断并修复内存泄漏

诊断

  • 在day 3的代码基础上, 使用go tool pprof查看heap日志

    1. $ go tool pprof ~/chat_server_mem.profile
    2. File: chat_server.test
    3. Type: inuse_space
    4. Time: Mar 10, 2021 at 7:35am (CST)
    5. Entering interactive mode (type "help" for commands, "o" for options)
    6. (pprof) top
    7. Showing nodes accounting for 9495.99kB, 100% of 9495.99kB total
    8. Showing top 10 nodes out of 12
    9. flat flat% sum% cum cum%
    10. 7287.48kB 76.74% 76.74% 7287.48kB 76.74% time.startTimer
    11. 1184.27kB 12.47% 89.21% 1184.27kB 12.47% runtime/pprof.StartCPUProfile
    12. 512.19kB 5.39% 94.61% 512.19kB 5.39% runtime.malg
    13. 512.05kB 5.39% 100% 7799.53kB 82.13% learning/gooop/chat_server.(*tChatClient).beginWrite
    14. 0 0% 100% 1184.27kB 12.47% command-line-arguments.Test_ChatServer
    15. 0 0% 100% 512.19kB 5.39% runtime.mstart
    16. 0 0% 100% 512.19kB 5.39% runtime.newproc.func1
    17. 0 0% 100% 512.19kB 5.39% runtime.newproc1
    18. 0 0% 100% 512.19kB 5.39% runtime.systemstack
    19. 0 0% 100% 1184.27kB 12.47% testing.tRunner
    20. (pprof)
  • 疑似有两个泄漏点, 一个是time.startTimer, 一个是(*tChatClient).beginWrite

  • 由于(*tChatClient).beginWrite才是业务代码, 且cum% > time.startTimer的cum%
  • 因此可以怀疑:
    • (*tChatClient).beginWrite是内存泄漏的根本点
    • 主要泄漏原因是调用了太多次time.startTimer

复查代码

  • 复查tChatClient.beginWrite的代码, 导致不断分配内存的点可能有两个:

    • Logging.Logf, 不断追加日志.
    • 解决方法: 改造Logging, 限制最多日志条数(使用容量有限的队列)
    • for循环中不断调用time.After, 导致大量创建timer.
    • 解决方法: 不使用time.After, 而使用独立的routine和timer检测读超时

      1. func (me *tChatClient) beginWrite() {
      2. Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
      3. writer := io.Writer(me.conn)
      4. for {
      5. select {
      6. case <- me.closeChan:
      7. Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
      8. _ = me.conn.Close()
      9. me.closeFlag = 2
      10. me.postConnClosed()
      11. return
      12. case msg := <- me.sendChan:
      13. atomic.AddInt32(&me.pendingSend, -1)
      14. _,e := writer.Write([]byte(msg.Encode()))
      15. if e != nil {
      16. Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
      17. me.closeConn()
      18. } else {
      19. me.sendLogs = append(me.sendLogs, msg)
      20. }
      21. break
      22. case <- time.After(time.Duration(5) * time.Second):
      23. me.postRecvTimeout()
      24. break
      25. }
      26. }
      27. }

改造Logging

主要是将日志数组改造为容量有上限的日志队列, 防止诊断日志的采集, 导致内存无限增长.

  1. package chat_server
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type ILoggingService interface {
  7. Logf(f string, args... interface{})
  8. AllLogs() []string
  9. }
  10. type tLoggingService struct {
  11. mutex *sync.Mutex
  12. logs []string
  13. capacity int
  14. rindex int
  15. windex int
  16. }
  17. var gMaxLogs = 10_000
  18. var gEmptyString = ""
  19. func newLoggingService() ILoggingService {
  20. return &tLoggingService{
  21. mutex: new(sync.Mutex),
  22. logs: make([]string, gMaxLogs*2),
  23. //logs: make([]string, 0),
  24. capacity: gMaxLogs,
  25. rindex: 0,
  26. windex: 0,
  27. }
  28. }
  29. func (me *tLoggingService) size() int {
  30. return me.windex - me.rindex
  31. }
  32. func (me *tLoggingService) Logf(f string, args... interface{}) {
  33. log := fmt.Sprintf(f, args...)
  34. me.mutex.Lock()
  35. //me.logs = append(me.logs, log)
  36. me.ensureSpace()
  37. me.logs[me.windex] = log
  38. me.windex++
  39. me.mutex.Unlock()
  40. fmt.Println(log)
  41. }
  42. func (me *tLoggingService) ensureSpace() {
  43. for me.size() >= me.capacity {
  44. // dequeue head items
  45. me.logs[me.rindex] = gEmptyString
  46. me.rindex++
  47. }
  48. if me.rindex >= me.capacity {
  49. // move data to offset 0
  50. for i,n := 0, me.size();i < n;i++ {
  51. me.logs[i], me.logs[i + me.rindex] = me.logs[i + me.rindex], gEmptyString
  52. }
  53. // reset read and write index
  54. me.windex, me.rindex = me.windex - me.rindex, 0
  55. }
  56. }
  57. func (me *tLoggingService) AllLogs() []string {
  58. return me.logs
  59. }
  60. var Logging = newLoggingService()

改造tChatClient

  • 去掉写循环中, time.After的调用
  • 使用专门的routine和读计数器, 检测读超时的状况 ```go func (me *tChatClient) open(){ if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {

    1. return

    }

    go me.beginWrite() go me.beginRead()

    // 读超时检测 go me.beginWatchRecvTimeout()
    }

func (me tChatClient) beginWatchRecvTimeout() { duration := time.Duration(5) for range time.Tick(duration time.Second) { if me.isClosed() { break }

  1. me.timeoutCounter++
  2. if me.timeoutCounter >= 3 {
  3. me.postRecvTimeout()
  4. }
  5. }

}

func (me *tChatClient) beginWrite() { Logging.Logf(“tChatClient.beginWrite, %v, serverFlag=%v”, me.name, me.serverFlag) writer := io.Writer(me.conn) for { select { case <- me.closeChan: Logging.Logf(“tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v”, me.name, me.serverFlag) _ = me.conn.Close() me.closeFlag = 2 me.postConnClosed() return

  1. case msg := <- me.sendChan:
  2. atomic.AddInt32(&me.pendingSend, -1)
  3. _,e := writer.Write([]byte(msg.Encode()))
  4. if e != nil {
  5. Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
  6. me.closeConn()
  7. } else {
  8. me.sendLogs = append(me.sendLogs, msg)
  9. }
  10. break
  11. //case <- time.After(time.Duration(5) * time.Second):
  12. // me.postRecvTimeout()
  13. // break
  14. }
  15. }

}

func (me *tChatClient) beginRead() { reader := bufio.NewReader(me.conn) for { line, err := reader.ReadString(‘\n’) if err != nil { Logging.Logf(“tChatClient.beginRead, read error, %v, serverFlag=%v”, me.name, me.serverFlag) me.closeConn() break }

  1. // 重置读超时计数
  2. me.timeoutCounter = 0
  3. ok, msg := MsgDecoder.Decode(line)
  4. if ok {
  5. fn := me.recvHandler
  6. if fn != nil {
  7. fn(me, msg)
  8. }
  9. me.recvLogs = append(me.recvLogs, msg)
  10. }
  11. }

}

  1. <a name="pM2sT"></a>
  2. # 复测
  3. - 重跑测试, 查pprof, 现在内存清爽多了, 已经看不到业务代码导致的泄漏点, 修复有效

$ go tool pprof ~/chat_server_mem.profile File: chat_server.test Type: inuse_space Time: Mar 10, 2021 at 7:55am (CST) Entering interactive mode (type “help” for commands, “o” for options) (pprof) top Showing nodes accounting for 2.66MB, 100% of 2.66MB total flat flat% sum% cum cum% 1.50MB 56.47% 56.47% 1.50MB 56.47% runtime.malg 1.16MB 43.53% 100% 1.16MB 43.53% runtime/pprof.StartCPUProfile 0 0% 100% 1.16MB 43.53% command-line-arguments.Test_ChatServer 0 0% 100% 1.50MB 56.47% runtime.mstart 0 0% 100% 1.50MB 56.47% runtime.newproc.func1 0 0% 100% 1.50MB 56.47% runtime.newproc1 0 0% 100% 1.50MB 56.47% runtime.systemstack 0 0% 100% 1.16MB 43.53% testing.tRunner (pprof)

```

(end)