1.godis/tcp

(1)实现简单的echo服务器

  1. //echo_easy.go
  2. package main
  3. import (
  4. "fmt"
  5. "net"
  6. "io"
  7. "log"
  8. "bufio"
  9. )
  10. //为了实现优雅关闭,先关闭listener组织新的连接,再遍历所有现有的连接并逐个进行关闭
  11. func ListenAndserve(address string) {
  12. //绑定监听地址
  13. listener, err := net.Listen("tcp", address)
  14. if err != nil {
  15. log.Fatal(fmt.Sprintf("listen err: %v", err))
  16. }
  17. defer listener.Close()
  18. log.Println(fmt.Sprintf("bind: %s, start listening...", address))
  19. for {
  20. //Accept会一直阻塞直到有新的连接建立或者listen中断才会被返回
  21. conn, err := listener.Accept()
  22. if err != nil {
  23. //由于listener被关闭无法继续监听导致的错误
  24. log.Fatal(fmt.Sprintf("accept err: %v", err))
  25. }
  26. //开启新的goroutine,去处理这一个连接
  27. go Handle(conn)
  28. }
  29. }
  30. func Handle(conn net.Conn) {
  31. //使用bufio标准库提供的缓冲区功能去实现
  32. reader := bufio.NewReader(conn)
  33. for {
  34. //ReadString会一直阻塞直到遇到分隔符'\n'
  35. //遇到分隔符后会返回上次遇到分隔符或者连接建立收到的所有数据,包括分隔符本身
  36. //若在遇到分隔符之前遇到异常,ReadString会返回已收到的数据以及错误信息
  37. msg, err := reader.ReadString('\n')
  38. if err != nil {
  39. //通常遇到的错误是连接中断或者关闭,用io.EOF表示
  40. if err == io.EOF {
  41. log.Println("connection close")
  42. } else {
  43. log.Println(err)
  44. }
  45. return
  46. }
  47. b := []byte(msg)
  48. //将收到的信息发送给客户端
  49. fmt.Println(b)
  50. conn.Write(b)
  51. }
  52. }
  53. func main() {
  54. ListenAndserve(":8000")
  55. }

运行后使用 telnet 127.0.0.1 8000 进行连接测试,会返回输入的数值
image.png

(2)粘包以及拆包问题

我们常说的 TCP 服务器并非「实现 TCP 协议的服务器」而是「基于TCP协议的应用层服务器」
TCP 是面向字节流的协议,而应用层协议大多是面向消息的,比如 HTTP 协议的请求/响应,Redis 协议的指令/回复都是以消息为单位进行通信的。,作为应用层服务器我们有责任从 TCP 提供的字节流中正确地解析出应用层消息,会遇到「拆包/粘包」问题。
socket 允许我们通过 read 函数读取新收到的一段数据(当然这段数据并不对应一个 TCP 包)。
在上文的 Echo 服务器示例中我们用\n表示消息结束,从 read 函数读取的数据可能存在下列几种情况:

  1. 收到两段数据: “abc”, “def\n” 它们属于一条消息 “abcdef\n” 这是拆包的情况
  2. 收到一段数据: “abc\ndef\n” 它们属于两条消息 “abc\n”, “def\n” 这是粘包的情况

应用层协议通常采用下列几种思路之一来定义消息,以保证完整地进行读取:

  • 定长消息
  • 在消息尾部添加特殊分隔符,如示例中的Echo协议和FTP控制协议。bufio 标准库会缓存收到的数据直到遇到分隔符才会返回,它可以帮助我们正确地分割字节流。
  • 将消息分为 header 和 body, 并在 header 中提供 body 总长度,这种分包方式被称为 LTV(length,type,value) 包。这是应用最广泛的策略,如HTTP协议。当从 header 中获得 body 长度后, io.ReadFull 函数会读取指定长度字节流,从而解析应用层消息

    (3)优雅关闭

    在生产环境下需要保证TCP服务器关闭前完成必要的清理工作,包括将完成正在进行的数据传输,关闭TCP连接等。这种关闭模式称为优雅关闭,可以避免资源泄露以及客户端未收到完整数据导致故障。
    TCP 服务器的优雅关闭模式通常为: 先关闭listener阻止新连接进入,然后遍历所有连接逐个进行关闭。

    (4)echo.go实现

    ```go //echo.go

package tcp

/**

  • A echo server to test whether the server is functioning normally */

import ( “bufio” “context” “github.com/hdt3213/godis/lib/logger” “github.com/hdt3213/godis/lib/sync/atomic” “github.com/hdt3213/godis/lib/sync/wait” “io” “net” “sync” “time” )

// EchoHandler echos received line to client, using for test type EchoHandler struct {

  1. // 保存所有工作状态client的集合(把map当set用)
  2. // 需使用并发安全的容器
  3. activeConn sync.Map
  4. // 关闭状态标识位
  5. closing atomic.Boolean

}

// MakeEchoHandler creates EchoHandler func MakeEchoHandler() *EchoHandler { return &EchoHandler{} }

// EchoClient is client for EchoHandler, using for test type EchoClient struct { Conn net.Conn Waiting wait.Wait }

// Close close connection // 关闭客户端连接 func (c EchoClient) Close() error { // 等待数据发送完成或超时 c.Waiting.WaitWithTimeout(10 time.Second) c.Conn.Close() return nil }

// Handle echos received line to client func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection // 关闭中的 handler 不会处理新连接 _ = conn.Close() return }

  1. client := &EchoClient{
  2. Conn: conn,
  3. }
  4. h.activeConn.Store(client, struct{}{}) // 记住仍然存活的连接
  5. reader := bufio.NewReader(conn)
  6. for {
  7. // may occurs: client EOF, client timeout, server early close
  8. msg, err := reader.ReadString('\n')
  9. if err != nil {
  10. if err == io.EOF {
  11. logger.Info("connection close")
  12. h.activeConn.Delete(client)
  13. } else {
  14. logger.Warn(err)
  15. }
  16. return
  17. }
  18. // 发送数据前先置为waiting状态,阻止连接被关闭
  19. client.Waiting.Add(1)
  20. // 模拟关闭时未完成发送的情况
  21. //logger.Info("sleeping")
  22. //time.Sleep(10 * time.Second)
  23. b := []byte(msg)
  24. _, _ = conn.Write(b)
  25. // 发送完毕, 结束waiting
  26. client.Waiting.Done()
  27. }

}

// Close stops echo handler // 关闭服务器 func (h EchoHandler) Close() error { logger.Info(“handler shutting down…”) h.closing.Set(true) // 逐个关闭连接 h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(EchoClient) _ = client.Close() return true }) return nil }

  1. <a name="Y037O"></a>
  2. ### (5)server.go实现
  3. ```go
  4. //server.go
  5. package tcp
  6. /**
  7. * A tcp server
  8. */
  9. import (
  10. "context"
  11. "fmt"
  12. "github.com/hdt3213/godis/interface/tcp"
  13. "github.com/hdt3213/godis/lib/logger"
  14. "net"
  15. "os"
  16. "os/signal"
  17. "sync"
  18. "syscall"
  19. "time"
  20. )
  21. // Config stores tcp server properties
  22. type Config struct {
  23. Address string `yaml:"address"`
  24. MaxConnect uint32 `yaml:"max-connect"`
  25. Timeout time.Duration `yaml:"timeout"`
  26. }
  27. // ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
  28. // 监听中断信号并通过 closeChan 通知服务器关闭
  29. func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
  30. closeChan := make(chan struct{})
  31. sigCh := make(chan os.Signal)
  32. signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  33. go func() {
  34. sig := <-sigCh
  35. switch sig {
  36. case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  37. closeChan <- struct{}{}
  38. }
  39. }()
  40. listener, err := net.Listen("tcp", cfg.Address)
  41. if err != nil {
  42. return err
  43. }
  44. //cfg.Address = listener.Addr().String()
  45. logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
  46. ListenAndServe(listener, handler, closeChan)
  47. return nil
  48. }
  49. // ListenAndServe binds port and handle requests, blocking until close
  50. // 监听并提供服务,并在收到 closeChan 发来的关闭通知后关闭
  51. func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
  52. // listen signal
  53. // 监听关闭通知
  54. go func() {
  55. <-closeChan
  56. logger.Info("shutting down...")
  57. // 停止监听,listener.Accept()会立即返回 io.EOF
  58. _ = listener.Close() // listener.Accept() will return err immediately
  59. // 关闭应用层服务器
  60. _ = handler.Close() // close connections
  61. }()
  62. // listen port
  63. // 在异常退出后释放资源
  64. defer func() {
  65. // close during unexpected error
  66. _ = listener.Close()
  67. _ = handler.Close()
  68. }()
  69. ctx := context.Background()
  70. var waitDone sync.WaitGroup
  71. for {
  72. // 监听端口, 阻塞直到收到新连接或者出现错误
  73. conn, err := listener.Accept()
  74. if err != nil {
  75. break
  76. }
  77. // handle
  78. // 开启 goroutine 来处理新连接
  79. logger.Info("accept link")
  80. waitDone.Add(1)
  81. go func() {
  82. defer func() {
  83. waitDone.Done()
  84. }()
  85. handler.Handle(ctx, conn)
  86. }()
  87. }
  88. waitDone.Wait()
  89. }


(6)echo_test.go实现

//echo_test.go
package tcp

import (
    "bufio"
    "math/rand"
    "net"
    "strconv"
    "testing"
    "time"
)

func TestListenAndServe(t *testing.T) {
    var err error
    closeChan := make(chan struct{})
    listener, err := net.Listen("tcp", ":0")
    if err != nil {
        t.Error(err)
        return
    }
    addr := listener.Addr().String()
    go ListenAndServe(listener, MakeEchoHandler(), closeChan)

    conn, err := net.Dial("tcp", addr)
    if err != nil {
        t.Error(err)
        return
    }
    for i := 0; i < 10; i++ {
        val := strconv.Itoa(rand.Int())
        _, err = conn.Write([]byte(val + "\n"))
        if err != nil {
            t.Error(err)
            return
        }
        bufReader := bufio.NewReader(conn)
        line, _, err := bufReader.ReadLine()
        if err != nil {
            t.Error(err)
            return
        }
        if string(line) != val {
            t.Error("get wrong response")
            return
        }
    }
    _ = conn.Close()
    for i := 0; i < 5; i++ {
        // create idle connection
        _, _ = net.Dial("tcp", addr)
    }
    closeChan <- struct{}{}
    time.Sleep(time.Second)
}

2.godis/redis/parser

(1)Redis通信协议

Redis自2.0开始采用的是RESP协议,这个协议比较容易去实现,它是一个二进制安全的文本协议,并且工作在TCP协议之上,它以行为单位,并且客户端以及服务端均以\r\n(CRLF)来作为换行符号。
二进制安全是指允许协议中出现任意字符而不会导致故障。比如 C 语言的字符串以 \0 作为结尾不允许字符串中间出现\0, 而 Go 语言的 string 则允许出现 \0,我们说 Go 语言的 string 是二进制安全的,而 C 语言字符串不是二进制安全的。
RESP 的二进制安全性允许我们在 key 或者 value 中包含 \r 或者 \n 这样的特殊字符。在使用 redis 存储 protobuf、msgpack 等二进制数据时,二进制安全性尤为重要。
RESP 定义了5种格式:

  • 简单字符串(Simple String): 服务器用来返回简单的结果,比如”OK”。非二进制安全,且不允许换行。
  • 错误信息(Error): 服务器用来返回简单的错误信息,比如”ERR Invalid Synatx”。非二进制安全,且不允许换行。
  • 整数(Integer): llen、scard 等命令的返回值, 64位有符号整数
  • 字符串(Bulk String): 二进制安全字符串, 比如 get 等命令的返回值
  • 数组(Array, 又称 Multi Bulk Strings): Bulk String 数组客户端发送指令以及 lrange 等命令响应的格式

RESP 通过第一个字符来表示格式:

  • 简单字符串:以”+” 开始, 如:”+OK\r\n”
  • 错误:以”-“ 开始,如:”-ERR Invalid Synatx\r\n”
  • 整数:以”:”开始,如:”:1\r\n”
  • 字符串:以 $ 开始
  • 数组:以 * 开始

Bulk String有两行,第一行为 $+正文长度,第二行为实际内容

$3\r\nSET\r\n

Bulk String 是二进制安全的可以包含任意字节,就是说可以在 Bulk String 内部包含 “\r\n” 字符(行尾的CRLF被隐藏):

$4  这里省略了\r\n
a\r\nb  这里省略了\r\n

$-1 表示 nil, 比如使用 get 命令查询一个不存在的key时,响应即为$-1

Array 格式第一行为 “*”+数组长度,其后是相应数量的 Bulk String。如, [“foo”, “bar”]的报文:

*2
$3
foo
$3
bar

同时,客户端也使用 Array 格式服务端发送指令。命令本身将作为第一个参数,如 SET key value指令的RESP报文:

*3
$3
SET
$3
key
$5
value

将换行符打印出来就是
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

(2)协议解析器

我们在 实现TCP服务器 一文中已经介绍过TCP服务器的实现协议解析器将实现其 Handler 接口充当应用层服务器
协议解析器将接收 Socket 传来的数据,并将其数据还原为 [][]byte 格式,比如 “*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n” 将被还原为 [‘SET’, ‘key’, ‘value’]
来自客户端的请求均为数组格式它在第一行中标记报文的总行数并使用CRLF作为分行符。
bufio 标准库可以将从 reader 读到的数据缓存到 buffer 中直至遇到分隔符或读取完毕后返回所以我们使用 reader.ReadBytes(‘\n’) 来保证每次读取到完整的一行。
需要注意的是RESP是二进制安全的协议,它允许在正文中使用CRLF字符。举例来说 Redis 可以正确接收并执行SET “a\r\nb” 1指令, 这条指令的正确报文是这样的:

*3  
$3
SET
$4
a\r\nb 
$7
myvalue

当 ReadBytes 读取到第五行 “a\r\nb\r\n”时会将其误认为两行:

*3  
$3
SET
$4
a  // 错误的分行
b // 错误的分行
$7
myvalue

因此当读取到第四行$4后, 不应该继续使用 ReadBytes(‘\n’) 读取下一行, 应使用 io.ReadFull(reader, msg) 方法来读取指定长度的内容

msg = make([]byte, 4 + 2) // 正文长度4 + 换行符长度2
_, err = io.ReadFull(reader, msg)

首先我们来定义解析器的接口:

// Payload stores redis.Reply or error
type Payload struct {
    Data redis.Reply
    Err  error
}

// ParseStream 通过 io.Reader 读取数据并将结果通过 channel 将结果返回给调用者
// 流式处理的接口适合供客户端/服务端使用
func ParseStream(reader io.Reader) <-chan *Payload {
    ch := make(chan *Payload)
    go parse0(reader, ch)
    return ch
}

// ParseOne 解析 []byte 并返回 redis.Reply 
func ParseOne(data []byte) (redis.Reply, error) {
    ch := make(chan *Payload)
    reader := bytes.NewReader(data)
    go parse0(reader, ch)
    payload := <-ch // parse0 will close the channel
    if payload == nil {
        return nil, errors.New("no reply")
    }
    return payload.Data, payload.Err
}

接下来我们可以看一下解析器核心流程的伪代码

func parse0(reader io.Reader, ch chan<- *Payload) {
    // 初始化读取状态
    readingMultiLine := false
    expectedArgsCount := 0
    var args [][]byte
    var bulkLen int64
    for {
        // 上文中我们提到 RESP 是以行为单位的
        // 因为行分为简单字符串和二进制安全的BulkString,我们需要封装一个 readLine 函数来兼容
        line, err = readLine(reader, bulkLen)
        if err != nil { 
            // 处理错误
            return
        }
        // 接下来我们对刚刚读取的行进行解析
        // 我们简单的将 Reply 分为两类:
        // 单行: StatusReply, IntReply, ErrorReply
        // 多行: BulkReply, MultiBulkReply

        if !readingMultiLine {
            if isMulitBulkHeader(line) {
                // 我们收到了 MulitBulkReply 的第一行
                // 获得 MulitBulkReply 中 BulkString 的个数
                expectedArgsCount = parseMulitBulkHeader(line)
                // 等待 MulitBulkReply 后续行
                readingMultiLine = true
            } else if isBulkHeader(line) {
                // 我们收到了 BulkReply 的第一行
                // 获得 BulkReply 第二行的长度, 通过 bulkLen 告诉 readLine 函数下一行 BulkString 的长度
                bulkLen = parseBulkHeader()
                // 这个 Reply 中一共有 1 个 BulkString
                expectedArgsCount = 1 
                // 等待 BulkReply 后续行
                readingMultiLine = true
            } else {
                // 处理 StatusReply, IntReply, ErrorReply 等单行 Reply
                reply := parseSingleLineReply(line)
                // 通过 ch 返回结果
                emitReply(ch)
            }
        } else {
            // 进入此分支说明我们正在等待 MulitBulkReply 或 BulkReply 的后续行
            // MulitBulkReply 的后续行有两种,BulkHeader 或者 BulkString
            if isBulkHeader(line) {
                bulkLen = parseBulkHeader()
            } else {
                // 我们正在读取一个 BulkString, 它可能是 MulitBulkReply 或 BulkReply 
                args = append(args, line)
            }
            if len(args) == expectedArgsCount { // 我们已经读取了所有后续行
                // 通过 ch 返回结果
                emitReply(ch)
                // 重置状态, 准备解析下一条 Reply
                readingMultiLine = false
                expectedArgsCount = 0
                args = nil
                bulkLen = 0
            }
        }
    }
}

工具函数的实现:

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
    var msg []byte
    var err error
    if state.bulkLen == 0 { // read simple line
        msg, err = bufReader.ReadBytes('\n')
        if err != nil {
            return nil, true, err
        }
        if len(msg) == 0 || msg[len(msg)-2] != '\r' {
            return nil, false, errors.New("protocol error: " + string(msg))
        }
    } else { // read bulk line (binary safe)
        msg = make([]byte, state.bulkLen+2)
        _, err = io.ReadFull(bufReader, msg)
        if err != nil {
            return nil, true, err
        }
        if len(msg) == 0 ||
            msg[len(msg)-2] != '\r' ||
            msg[len(msg)-1] != '\n' {
            return nil, false, errors.New("protocol error: " + string(msg))
        }
        state.bulkLen = 0
    }
    return msg, false, nil
}

func parseMultiBulkHeader(msg []byte, state *readState) error {
    var err error
    var expectedLine uint64
    expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
    if err != nil {
        return errors.New("protocol error: " + string(msg))
    }
    if expectedLine == 0 {
        state.expectedArgsCount = 0
        return nil
    } else if expectedLine > 0 {
        // first line of multi bulk reply
        state.msgType = msg[0]
        state.readingMultiLine = true
        state.expectedArgsCount = int(expectedLine)
        state.args = make([][]byte, 0, expectedLine)
        return nil
    } else {
        return errors.New("protocol error: " + string(msg))
    }
}

func parseBulkHeader(msg []byte, state *readState) error {
    var err error
    state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
    if err != nil {
        return errors.New("protocol error: " + string(msg))
    }
    if state.bulkLen == -1 { // null bulk
        return nil
    } else if state.bulkLen > 0 {
        state.msgType = msg[0]
        state.readingMultiLine = true
        state.expectedArgsCount = 1
        state.args = make([][]byte, 0, 1)
        return nil
    } else {
        return errors.New("protocol error: " + string(msg))
    }
}

func parseSingleLineReply(msg []byte) (redis.Reply, error) {
    str := strings.TrimSuffix(string(msg), "\n")
    str = strings.TrimSuffix(str, "\r")
    var result redis.Reply
    switch msg[0] {
    case '+': // status reply
        result = reply.MakeStatusReply(str[1:])
    case '-': // err reply
        result = reply.MakeErrReply(str[1:])
    case ':': // int reply
        val, err := strconv.ParseInt(str[1:], 10, 64)
        if err != nil {
            return nil, errors.New("protocol error: " + string(msg))
        }
        result = reply.MakeIntReply(val)
    default:
        // parse as text protocol
        strs := strings.Split(str, " ")
        args := make([][]byte, len(strs))
        for i, s := range strs {
            args[i] = []byte(s)
        }
        result = reply.MakeMultiBulkReply(args)
    }
    return result, nil
}