基于TCP的聊天室

1、服务端

  • 新用户到来,生成一个User的实例,代表该用户。 ```java type User struct{ ID int // 用户的唯一标识,通过GenUserID 函数生成 Addr string // 用户的IP地址和端口 EnterAt time.Time // 用户进入的时间 MessageChannel chan string // 当前用户发送消息的通道 }
  1. - 新开一个goroutine用于给用户发送消息
  2. ```java
  3. func sendMessage(conn net.Conn, ch <- chan string){
  4. for msg := range ch{
  5. fmt.Fprintln(conn, msg)
  6. }
  7. }

结合User结构体的MessageChannel,很容易知道,需要给某个用户发送消息,只需要往该用户的MessageChannel中写入消息即可。这里需要特别提醒下,因为sendMessage在一个新的goroutine中,如果函数的ch不关闭,该goroutine是不会退出的,因此需要注意不关闭ch导致goroutine泄露问题。

  • 给当前用户发送欢迎信息,同时给聊天室所有的用户发送有新用户到来的提醒 ``java user.MessageChannel <- "Welcome" + user.String() msg := Message{ OwnerID: user.ID, Content: "user:“ + strconv.Itoa(user.ID) + “` has enter”, } messageChannel <- msg
  1. - 将该新用户写入全局用户列表,也就是聊天室用户列表。同时控制用户超时退出,超过5分钟没有任何响应,则提出
  2. ```java
  3. enteringChannel <- user
  4. // 控制超时用户踢出
  5. var userActive = make(chan struct{})
  6. go func() {
  7. d := 5 * time.Minute
  8. timer := time.NewTimer(d)
  9. for{
  10. select {
  11. case <- timer.C:
  12. conn.Close()
  13. case <- userActive:
  14. timer.Reset(d)
  15. }
  16. }
  17. }()
  • 读取用户的输入,并将用户信息发送给其他用户。在bufio包中有多重方式获取文本输入,ReadBytes、ReadString和独特的ReadLine,对于简单的目的这些都有些复杂。在Go1,1中添加了一个新类型,Scabber,以便更容易的处理如按行读取输入序列或空格分隔单词等这类简单任务。它终结了如输入一个很长的有问题的行这样的输入错误,并且提供了简单的默认行为:基于行的输入,每行都提出了分隔标识。 ```java // 循环读取用户的输入 input := bufio.NewScanner(conn) for input.Scan(){ msg.Content = strconv.Itoa(user.ID) + “;” + input.Text() messageChannel <- msg

    // 用户活跃 userActive <- struct{}{} }

if err := input.Err();err != nil { log.Println(“读取错误:”, err) }

  1. - 用户离开,需要做登记,并给连天使其他用户发通知
  2. ```java
  3. leavingChannel <- user
  4. msg.Content = "user: `" + strconv.Itoa(user.ID) + "` has left"
  5. messageChannel <- msg

完整代码
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. )
  11. type User struct{
  12. ID int // 用户的唯一标识,通过GenUserID 函数生成
  13. Addr string // 用户的IP地址和端口
  14. EnterAt time.Time // 用户进入的时间
  15. MessageChannel chan string // 当前用户发送消息的通道
  16. }
  17. // 给用户发送信息
  18. type Message struct{
  19. OwnerID int
  20. Content string
  21. }
  22. var (
  23. // 新用户到来,通过该channel进行登记
  24. enteringChannel = make(chan *User)
  25. // 用户离开,通过该channel进行登记
  26. leavingChannel = make(chan *User)
  27. // 广播专用的用户普通消息channel, 缓冲是尽可能避免出现异常情况阻塞
  28. messageChannel = make(chan Message, 9)
  29. )
  30. func (u *User) String() string{
  31. return u.Addr + ",UID:" + strconv.Itoa(u.ID) + ", Enter At:" + u.EnterAt.Format("2006-01-02 15:04:05+8000")
  32. }
  33. func main() {
  34. listener, err := net.Listen("tcp",":2020")
  35. if err != nil {
  36. panic(err)
  37. }
  38. go broadcaster()
  39. for {
  40. conn, err := listener.Accept()
  41. if err != nil {
  42. log.Println(err)
  43. continue
  44. }
  45. go handleConn(conn)
  46. }
  47. }
  48. // broadcaster 用于记录聊天室用户,并进行消息广播
  49. // 1. 新用户进来; 2.用户普通消息; 3.用户离开
  50. func broadcaster(){
  51. users := make(map[*User]struct{})
  52. for {
  53. select{
  54. case user := <- enteringChannel:
  55. // 新用户进入
  56. users[user] = struct{}{}
  57. case user := <- leavingChannel:
  58. // 用户离开
  59. delete(users, user)
  60. // 避免goroutine泄露
  61. close(user.MessageChannel)
  62. case msg := <-messageChannel:
  63. // 给所有在线用户发送消息
  64. for user := range users {
  65. if user.ID == msg.OwnerID{
  66. continue
  67. }
  68. user.MessageChannel <- msg.Content
  69. }
  70. }
  71. }
  72. }
  73. func handleConn(conn net.Conn){
  74. defer conn.Close()
  75. // 1. 新用户进来,构建该用户实例
  76. user := &User{
  77. ID: GenUserID(),
  78. Addr: conn.RemoteAddr().String(),
  79. EnterAt: time.Now(),
  80. MessageChannel: make(chan string,8),
  81. }
  82. // 2. 当前在一个新的goroutine 中,用来进行读写操作,因此需要开一个goroutine用于读写操作
  83. // 读写goroutine 之间通过channel 进行通信
  84. go sendMessage(conn, user.MessageChannel)
  85. // 3. 给当前用户发送欢迎信息;给所有用户告知新用户列表
  86. user.MessageChannel <- "Welcome" + user.String()
  87. msg := Message{
  88. OwnerID: user.ID,
  89. Content: "user:`" + strconv.Itoa(user.ID) + "` has enter",
  90. }
  91. messageChannel <- msg
  92. // 4. 将该记录到全局的用户列表中,避免用锁
  93. enteringChannel <- user
  94. // 控制超时用户踢出
  95. var userActive = make(chan struct{})
  96. go func() {
  97. d := 5 * time.Minute
  98. timer := time.NewTimer(d)
  99. for{
  100. select {
  101. case <- timer.C:
  102. conn.Close()
  103. case <- userActive:
  104. timer.Reset(d)
  105. }
  106. }
  107. }()
  108. // 5. 循环读取用户的输入
  109. input := bufio.NewScanner(conn)
  110. for input.Scan(){
  111. msg.Content = strconv.Itoa(user.ID) + ";" + input.Text()
  112. messageChannel <- msg
  113. // 用户活跃
  114. userActive <- struct{}{}
  115. }
  116. if err := input.Err();err != nil {
  117. log.Println("读取错误:", err)
  118. }
  119. // 6. 用户离开
  120. leavingChannel <- user
  121. msg.Content = "user: `" + strconv.Itoa(user.ID) + "` has left"
  122. messageChannel <- msg
  123. }
  124. func sendMessage(conn net.Conn, ch <- chan string){
  125. for msg := range ch{
  126. fmt.Fprintln(conn, msg)
  127. }
  128. }
  129. // 生成用户id
  130. var (
  131. globalID int
  132. idocker sync.Mutex
  133. )
  134. func GenUserID() int {
  135. idocker.Lock()
  136. defer idocker.Unlock()
  137. globalID ++
  138. return globalID
  139. }

2、客户端

客户端的实现直接采用 《The Go Programming Language》一书对应的示例源码:ch8/netcat3/netcat.go 。

  1. func main() {
  2. conn, err := net.Dial("tcp", ":2020")
  3. if err != nil {
  4. panic(err)
  5. }
  6. done := make(chan struct{})
  7. go func() {
  8. io.Copy(os.Stdout, conn) // NOTE: ignoring errors
  9. log.Println("done")
  10. done <- struct{}{} // signal the main goroutine
  11. }()
  12. mustCopy(conn, os.Stdin)
  13. conn.Close()
  14. <-done
  15. }
  16. func mustCopy(dst io.Writer, src io.Reader) {
  17. if _, err := io.Copy(dst, src); err != nil {
  18. log.Fatal(err)
  19. }
  20. }
  • 新开了一个 goroutine 用于接收消息;
  • 通过 io.Copy 来操作 IO,包括从标准输入读取数据写入 TCP 连接中,以及从 TCP 连接中读取数据写入标准输出;
  • 新开的 goroutine 通过一个 channel 来和 main goroutine 通讯;