在Server和Client通讯中,由于网络等原因很可能会发生数据丢包的现象。如果数据缺失,服务端接收的信息不完整,就会造成混乱。

我们需要在Server和Client之间建立一个通讯协议,通过协议中的规则,判断当前接收到的信息是否完整。根据信息的完整情况,采取不同的处理方式。

通讯协议protocol的核心就是设计一个头部。如果传来的信息不包含这个头部,就说明当前信息和之前的信息是同一条。那么就把当前信息和之前的那条信息合并成一条。

而协议主要包含的功能是封装(Enpack)和解析(Depack)。Enpack是客户端对信息进行数据封装。封装之后可以传递给服务器。Depack是服务器对信息进行数据解析。

其中有个Const部分,用于定义头部、头部长度、客户端传入信息长度。

在代码中,我们这样定义:

  1. const (
  2. ConstHeader = "Headers"
  3. ConstHeaderLength = 7
  4. ConstMLength = 4
  5. )

头部的内容为”Headers”,长度为7。所以ConstHeaderLenth=7.
而信息传递过程中,我们会把int类型转换成byte类型。一个int的长度等于4个byte的长度。因此,我们设置ConstMLength=4.代表客户端的传来的信息大小。

自定义协议protocal的代码示例如下:

  1. /**
  2. * protocol
  3. * @Author: Jian Junbo
  4. * @Email: junbojian@qq.com
  5. * @Create: 2017/9/14 11:49
  6. *
  7. * Description: 通讯协议处理
  8. */
  9. package protocol
  10. import (
  11. "bytes"
  12. "encoding/binary"
  13. )
  14. const (
  15. ConstHeader = "Headers"
  16. ConstHeaderLength = 7
  17. ConstMLength = 4
  18. )
  19. //封包
  20. func Enpack(message []byte) []byte {
  21. return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...)
  22. }
  23. //解包
  24. func Depack(buffer []byte) []byte {
  25. length := len(buffer)
  26. var i int
  27. data := make([]byte, 32)
  28. for i = 0; i < length; i++ {
  29. if length < i + ConstHeaderLength + ConstMLength{
  30. break
  31. }
  32. if string(buffer[i:i+ConstHeaderLength]) == ConstHeader {
  33. messageLength := ByteToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstMLength])
  34. if length < i+ConstHeaderLength+ConstMLength+messageLength {
  35. break
  36. }
  37. data = buffer[i+ConstHeaderLength+ConstMLength : i+ConstHeaderLength+ConstMLength+messageLength]
  38. }
  39. }
  40. if i == length {
  41. return make([]byte, 0)
  42. }
  43. return data
  44. }
  45. //字节转换成整形
  46. func ByteToInt(n []byte) int {
  47. bytesbuffer := bytes.NewBuffer(n)
  48. var x int32
  49. binary.Read(bytesbuffer, binary.BigEndian, &x)
  50. return int(x)
  51. }
  52. //整数转换成字节
  53. func IntToBytes(n int) []byte {
  54. x := int32(n)
  55. bytesBuffer := bytes.NewBuffer([]byte{})
  56. binary.Write(bytesBuffer, binary.BigEndian, x)
  57. return bytesBuffer.Bytes()
  58. }

Server端主要通过协议来解析客户端发送来的信息。建立一个函数,用来完成连接对接收信息的处理。其中建立了通道readerChannel,并把接收来的信息放在通道里。在放入通道之前,使用protocol和Depack对信息进行解析。

  1. //连接处理
  2. func handleConnection(conn net.Conn) {
  3. //缓冲区,存储被截断的数据
  4. tmpBuffer := make([]byte, 0)
  5. //接收解包
  6. readerChannel := make(chan []byte, 10000)
  7. go reader(readerChannel)
  8. buffer := make([]byte, 1024)
  9. for{
  10. n, err := conn.Read(buffer)
  11. if err != nil{
  12. Log(conn.RemoteAddr().String(), "connection error: ", err)
  13. return
  14. }
  15. tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))
  16. readerChannel <- tmpBuffer //接收的信息写入通道
  17. }
  18. defer conn.Close()
  19. }

如果信息读取发生错误(包括读取到信息结束符EOF),都会打印错误信息,并跳出循环。

  1. Log(conn.RemoteAddr().String(), "connection error: ", err)
  2. return

由于通道内的数据是[]byte型的。需要转换成string。这个工作有专门的获取通道数据的reader(readerChannel chan []byte)来完成。

  1. //获取通道数据
  2. func reader(readerchannel chan []byte) {
  3. for{
  4. select {
  5. case data := <-readerchannel:
  6. Log(string(data)) //打印通道内的信息
  7. }
  8. }
  9. }

查看Server端代码示例:

  1. /**
  2. * MySocketProtocalServer
  3. * @Author: Jian Junbo
  4. * @Email: junbojian@qq.com
  5. * @Create: 2017/9/14 13:54
  6. * Copyright (c) 2017 Jian Junbo All rights reserved.
  7. *
  8. * Description: 服务端,接收客户端传来的信息
  9. */
  10. package main
  11. import (
  12. "net"
  13. "fmt"
  14. "os"
  15. "log"
  16. "protocol"
  17. )
  18. func main() {
  19. netListen, err := net.Listen("tcp", "localhost:7373")
  20. CheckErr(err)
  21. defer netListen.Close()
  22. Log("Waiting for client ...") //启动后,等待客户端访问。
  23. for{
  24. conn, err := netListen.Accept() //监听客户端
  25. if err != nil {
  26. Log(conn.RemoteAddr().String(), "发了了错误:", err)
  27. continue
  28. }
  29. Log(conn.RemoteAddr().String(), "tcp connection success")
  30. go handleConnection(conn)
  31. }
  32. }
  33. //连接处理
  34. func handleConnection(conn net.Conn) {
  35. //缓冲区,存储被截断的数据
  36. tmpBuffer := make([]byte, 0)
  37. //接收解包
  38. readerChannel := make(chan []byte, 10000)
  39. go reader(readerChannel)
  40. buffer := make([]byte, 1024)
  41. for{
  42. n, err := conn.Read(buffer)
  43. if err != nil{
  44. Log(conn.RemoteAddr().String(), "connection error: ", err)
  45. return
  46. }
  47. tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))
  48. readerChannel <- tmpBuffer //接收的信息写入通道
  49. }
  50. defer conn.Close()
  51. }
  52. //获取通道数据
  53. func reader(readerchannel chan []byte) {
  54. for{
  55. select {
  56. case data := <-readerchannel:
  57. Log(string(data)) //打印通道内的信息
  58. }
  59. }
  60. }
  61. //日志处理
  62. func Log(v ...interface{}) {
  63. log.Println(v...)
  64. }
  65. //错误处理
  66. func CheckErr(err error) {
  67. if err != nil {
  68. fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
  69. os.Exit(1)
  70. }
  71. }

客户端使用Enpack封装要发送到服务端的信息后,写入连接conn中。

  1. /**
  2. * MySocketProtocalClient
  3. * @Author: Jian Junbo
  4. * @Email: junbojian@qq.com
  5. * @Create: 2017/9/14 15:23
  6. * Copyright (c) 2017 Jian Junbo All rights reserved.
  7. *
  8. * Description:
  9. */
  10. package main
  11. import (
  12. "net"
  13. "time"
  14. "strconv"
  15. "protocol"
  16. "fmt"
  17. "os"
  18. )
  19. //发送100次请求
  20. func send(conn net.Conn) {
  21. for i := 0; i < 100; i++ {
  22. session := GetSession()
  23. words := "{\"ID\":\""+strconv.Itoa(i)+"\",\"Session\":\""+session+"20170914165908\",\"Meta\":\"golang\",\"Content\":\"message\"}"
  24. conn.Write(protocol.Enpack([]byte(words)))
  25. fmt.Println(words) //打印发送出去的信息
  26. }
  27. fmt.Println("send over")
  28. defer conn.Close()
  29. }
  30. //用当前时间做识别。当前时间的十进制整数
  31. func GetSession() string {
  32. gs1 := time.Now().Unix()
  33. gs2 := strconv.FormatInt(gs1, 10)
  34. return gs2
  35. }
  36. func main() {
  37. server := "localhost:7373"
  38. tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
  39. if err != nil{
  40. fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
  41. os.Exit(1)
  42. }
  43. conn, err := net.DialTCP("tcp", nil, tcpAddr)
  44. if err != nil{
  45. fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
  46. os.Exit(1)
  47. }
  48. fmt.Println("connect success")
  49. send(conn)
  50. }

Golang Socket Server自定义协议的简单实现 - 图1