1.godis/tcp
(1)实现简单的echo服务器
//echo_easy.go
package main
import (
"fmt"
"net"
"io"
"log"
"bufio"
)
//为了实现优雅关闭,先关闭listener组织新的连接,再遍历所有现有的连接并逐个进行关闭
func ListenAndserve(address string) {
//绑定监听地址
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(fmt.Sprintf("listen err: %v", err))
}
defer listener.Close()
log.Println(fmt.Sprintf("bind: %s, start listening...", address))
for {
//Accept会一直阻塞直到有新的连接建立或者listen中断才会被返回
conn, err := listener.Accept()
if err != nil {
//由于listener被关闭无法继续监听导致的错误
log.Fatal(fmt.Sprintf("accept err: %v", err))
}
//开启新的goroutine,去处理这一个连接
go Handle(conn)
}
}
func Handle(conn net.Conn) {
//使用bufio标准库提供的缓冲区功能去实现
reader := bufio.NewReader(conn)
for {
//ReadString会一直阻塞直到遇到分隔符'\n'
//遇到分隔符后会返回上次遇到分隔符或者连接建立收到的所有数据,包括分隔符本身
//若在遇到分隔符之前遇到异常,ReadString会返回已收到的数据以及错误信息
msg, err := reader.ReadString('\n')
if err != nil {
//通常遇到的错误是连接中断或者关闭,用io.EOF表示
if err == io.EOF {
log.Println("connection close")
} else {
log.Println(err)
}
return
}
b := []byte(msg)
//将收到的信息发送给客户端
fmt.Println(b)
conn.Write(b)
}
}
func main() {
ListenAndserve(":8000")
}
运行后使用 telnet 127.0.0.1 8000 进行连接测试,会返回输入的数值
(2)粘包以及拆包问题
我们常说的 TCP 服务器并非「实现 TCP 协议的服务器」而是「基于TCP协议的应用层服务器」。
TCP 是面向字节流的协议,而应用层协议大多是面向消息的,比如 HTTP 协议的请求/响应,Redis 协议的指令/回复都是以消息为单位进行通信的。,作为应用层服务器我们有责任从 TCP 提供的字节流中正确地解析出应用层消息,会遇到「拆包/粘包」问题。
socket 允许我们通过 read 函数读取新收到的一段数据(当然这段数据并不对应一个 TCP 包)。
在上文的 Echo 服务器示例中我们用\n表示消息结束,从 read 函数读取的数据可能存在下列几种情况:
- 收到两段数据: “abc”, “def\n” 它们属于一条消息 “abcdef\n” 这是拆包的情况
- 收到一段数据: “abc\ndef\n” 它们属于两条消息 “abc\n”, “def\n” 这是粘包的情况
应用层协议通常采用下列几种思路之一来定义消息,以保证完整地进行读取:
- 定长消息
- 在消息尾部添加特殊分隔符,如示例中的Echo协议和FTP控制协议。bufio 标准库会缓存收到的数据直到遇到分隔符才会返回,它可以帮助我们正确地分割字节流。
- 将消息分为 header 和 body, 并在 header 中提供 body 总长度,这种分包方式被称为 LTV(length,type,value) 包。这是应用最广泛的策略,如HTTP协议。当从 header 中获得 body 长度后, io.ReadFull 函数会读取指定长度字节流,从而解析应用层消息
(3)优雅关闭
在生产环境下需要保证TCP服务器关闭前完成必要的清理工作,包括将完成正在进行的数据传输,关闭TCP连接等。这种关闭模式称为优雅关闭,可以避免资源泄露以及客户端未收到完整数据导致故障。
TCP 服务器的优雅关闭模式通常为: 先关闭listener阻止新连接进入,然后遍历所有连接逐个进行关闭。(4)echo.go实现
```go //echo.go
package tcp
/**
- A echo server to test whether the server is functioning normally */
import ( “bufio” “context” “github.com/hdt3213/godis/lib/logger” “github.com/hdt3213/godis/lib/sync/atomic” “github.com/hdt3213/godis/lib/sync/wait” “io” “net” “sync” “time” )
// EchoHandler echos received line to client, using for test type EchoHandler struct {
// 保存所有工作状态client的集合(把map当set用)
// 需使用并发安全的容器
activeConn sync.Map
// 关闭状态标识位
closing atomic.Boolean
}
// MakeEchoHandler creates EchoHandler func MakeEchoHandler() *EchoHandler { return &EchoHandler{} }
// EchoClient is client for EchoHandler, using for test type EchoClient struct { Conn net.Conn Waiting wait.Wait }
// Close close connection // 关闭客户端连接 func (c EchoClient) Close() error { // 等待数据发送完成或超时 c.Waiting.WaitWithTimeout(10 time.Second) c.Conn.Close() return nil }
// Handle echos received line to client func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection // 关闭中的 handler 不会处理新连接 _ = conn.Close() return }
client := &EchoClient{
Conn: conn,
}
h.activeConn.Store(client, struct{}{}) // 记住仍然存活的连接
reader := bufio.NewReader(conn)
for {
// may occurs: client EOF, client timeout, server early close
msg, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
logger.Info("connection close")
h.activeConn.Delete(client)
} else {
logger.Warn(err)
}
return
}
// 发送数据前先置为waiting状态,阻止连接被关闭
client.Waiting.Add(1)
// 模拟关闭时未完成发送的情况
//logger.Info("sleeping")
//time.Sleep(10 * time.Second)
b := []byte(msg)
_, _ = conn.Write(b)
// 发送完毕, 结束waiting
client.Waiting.Done()
}
}
// Close stops echo handler // 关闭服务器 func (h EchoHandler) Close() error { logger.Info(“handler shutting down…”) h.closing.Set(true) // 逐个关闭连接 h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(EchoClient) _ = client.Close() return true }) return nil }
<a name="Y037O"></a>
### (5)server.go实现
```go
//server.go
package tcp
/**
* A tcp server
*/
import (
"context"
"fmt"
"github.com/hdt3213/godis/interface/tcp"
"github.com/hdt3213/godis/lib/logger"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// Config stores tcp server properties
type Config struct {
Address string `yaml:"address"`
MaxConnect uint32 `yaml:"max-connect"`
Timeout time.Duration `yaml:"timeout"`
}
// ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
// 监听中断信号并通过 closeChan 通知服务器关闭
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
closeChan := make(chan struct{})
sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-sigCh
switch sig {
case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
closeChan <- struct{}{}
}
}()
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
//cfg.Address = listener.Addr().String()
logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
ListenAndServe(listener, handler, closeChan)
return nil
}
// ListenAndServe binds port and handle requests, blocking until close
// 监听并提供服务,并在收到 closeChan 发来的关闭通知后关闭
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
// listen signal
// 监听关闭通知
go func() {
<-closeChan
logger.Info("shutting down...")
// 停止监听,listener.Accept()会立即返回 io.EOF
_ = listener.Close() // listener.Accept() will return err immediately
// 关闭应用层服务器
_ = handler.Close() // close connections
}()
// listen port
// 在异常退出后释放资源
defer func() {
// close during unexpected error
_ = listener.Close()
_ = handler.Close()
}()
ctx := context.Background()
var waitDone sync.WaitGroup
for {
// 监听端口, 阻塞直到收到新连接或者出现错误
conn, err := listener.Accept()
if err != nil {
break
}
// handle
// 开启 goroutine 来处理新连接
logger.Info("accept link")
waitDone.Add(1)
go func() {
defer func() {
waitDone.Done()
}()
handler.Handle(ctx, conn)
}()
}
waitDone.Wait()
}
(6)echo_test.go实现
//echo_test.go
package tcp
import (
"bufio"
"math/rand"
"net"
"strconv"
"testing"
"time"
)
func TestListenAndServe(t *testing.T) {
var err error
closeChan := make(chan struct{})
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Error(err)
return
}
addr := listener.Addr().String()
go ListenAndServe(listener, MakeEchoHandler(), closeChan)
conn, err := net.Dial("tcp", addr)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
val := strconv.Itoa(rand.Int())
_, err = conn.Write([]byte(val + "\n"))
if err != nil {
t.Error(err)
return
}
bufReader := bufio.NewReader(conn)
line, _, err := bufReader.ReadLine()
if err != nil {
t.Error(err)
return
}
if string(line) != val {
t.Error("get wrong response")
return
}
}
_ = conn.Close()
for i := 0; i < 5; i++ {
// create idle connection
_, _ = net.Dial("tcp", addr)
}
closeChan <- struct{}{}
time.Sleep(time.Second)
}
2.godis/redis/parser
(1)Redis通信协议
Redis自2.0开始采用的是RESP协议,这个协议比较容易去实现,它是一个二进制安全的文本协议,并且工作在TCP协议之上,它以行为单位,并且客户端以及服务端均以\r\n(CRLF)来作为换行符号。
二进制安全是指允许协议中出现任意字符而不会导致故障。比如 C 语言的字符串以 \0 作为结尾不允许字符串中间出现\0, 而 Go 语言的 string 则允许出现 \0,我们说 Go 语言的 string 是二进制安全的,而 C 语言字符串不是二进制安全的。
RESP 的二进制安全性允许我们在 key 或者 value 中包含 \r 或者 \n 这样的特殊字符。在使用 redis 存储 protobuf、msgpack 等二进制数据时,二进制安全性尤为重要。
RESP 定义了5种格式:
- 简单字符串(Simple String): 服务器用来返回简单的结果,比如”OK”。非二进制安全,且不允许换行。
- 错误信息(Error): 服务器用来返回简单的错误信息,比如”ERR Invalid Synatx”。非二进制安全,且不允许换行。
- 整数(Integer): llen、scard 等命令的返回值, 64位有符号整数
- 字符串(Bulk String): 二进制安全字符串, 比如 get 等命令的返回值
- 数组(Array, 又称 Multi Bulk Strings): Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式
RESP 通过第一个字符来表示格式:
- 简单字符串:以”+” 开始, 如:”+OK\r\n”
- 错误:以”-“ 开始,如:”-ERR Invalid Synatx\r\n”
- 整数:以”:”开始,如:”:1\r\n”
- 字符串:以 $ 开始
- 数组:以 * 开始
Bulk String有两行,第一行为 $+正文长度,第二行为实际内容
$3\r\nSET\r\n
Bulk String 是二进制安全的可以包含任意字节,就是说可以在 Bulk String 内部包含 “\r\n” 字符(行尾的CRLF被隐藏):
$4 这里省略了\r\n
a\r\nb 这里省略了\r\n
$-1 表示 nil, 比如使用 get 命令查询一个不存在的key时,响应即为$-1。
Array 格式第一行为 “*”+数组长度,其后是相应数量的 Bulk String。如, [“foo”, “bar”]的报文:
*2
$3
foo
$3
bar
同时,客户端也使用 Array 格式向服务端发送指令。命令本身将作为第一个参数,如 SET key value指令的RESP报文:
*3
$3
SET
$3
key
$5
value
将换行符打印出来就是
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
(2)协议解析器
我们在 实现TCP服务器 一文中已经介绍过TCP服务器的实现,协议解析器将实现其 Handler 接口充当应用层服务器。
协议解析器将接收 Socket 传来的数据,并将其数据还原为 [][]byte 格式,比如 “*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n” 将被还原为 [‘SET’, ‘key’, ‘value’]
来自客户端的请求均为数组格式,它在第一行中标记报文的总行数并使用CRLF作为分行符。
bufio 标准库可以将从 reader 读到的数据缓存到 buffer 中,直至遇到分隔符或读取完毕后返回,所以我们使用 reader.ReadBytes(‘\n’) 来保证每次读取到完整的一行。
需要注意的是RESP是二进制安全的协议,它允许在正文中使用CRLF字符。举例来说 Redis 可以正确接收并执行SET “a\r\nb” 1指令, 这条指令的正确报文是这样的:
*3
$3
SET
$4
a\r\nb
$7
myvalue
当 ReadBytes 读取到第五行 “a\r\nb\r\n”时会将其误认为两行:
*3
$3
SET
$4
a // 错误的分行
b // 错误的分行
$7
myvalue
因此当读取到第四行$4后, 不应该继续使用 ReadBytes(‘\n’) 读取下一行, 应使用 io.ReadFull(reader, msg) 方法来读取指定长度的内容。
msg = make([]byte, 4 + 2) // 正文长度4 + 换行符长度2
_, err = io.ReadFull(reader, msg)
首先我们来定义解析器的接口:
// Payload stores redis.Reply or error
type Payload struct {
Data redis.Reply
Err error
}
// ParseStream 通过 io.Reader 读取数据并将结果通过 channel 将结果返回给调用者
// 流式处理的接口适合供客户端/服务端使用
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
// ParseOne 解析 []byte 并返回 redis.Reply
func ParseOne(data []byte) (redis.Reply, error) {
ch := make(chan *Payload)
reader := bytes.NewReader(data)
go parse0(reader, ch)
payload := <-ch // parse0 will close the channel
if payload == nil {
return nil, errors.New("no reply")
}
return payload.Data, payload.Err
}
接下来我们可以看一下解析器核心流程的伪代码
func parse0(reader io.Reader, ch chan<- *Payload) {
// 初始化读取状态
readingMultiLine := false
expectedArgsCount := 0
var args [][]byte
var bulkLen int64
for {
// 上文中我们提到 RESP 是以行为单位的
// 因为行分为简单字符串和二进制安全的BulkString,我们需要封装一个 readLine 函数来兼容
line, err = readLine(reader, bulkLen)
if err != nil {
// 处理错误
return
}
// 接下来我们对刚刚读取的行进行解析
// 我们简单的将 Reply 分为两类:
// 单行: StatusReply, IntReply, ErrorReply
// 多行: BulkReply, MultiBulkReply
if !readingMultiLine {
if isMulitBulkHeader(line) {
// 我们收到了 MulitBulkReply 的第一行
// 获得 MulitBulkReply 中 BulkString 的个数
expectedArgsCount = parseMulitBulkHeader(line)
// 等待 MulitBulkReply 后续行
readingMultiLine = true
} else if isBulkHeader(line) {
// 我们收到了 BulkReply 的第一行
// 获得 BulkReply 第二行的长度, 通过 bulkLen 告诉 readLine 函数下一行 BulkString 的长度
bulkLen = parseBulkHeader()
// 这个 Reply 中一共有 1 个 BulkString
expectedArgsCount = 1
// 等待 BulkReply 后续行
readingMultiLine = true
} else {
// 处理 StatusReply, IntReply, ErrorReply 等单行 Reply
reply := parseSingleLineReply(line)
// 通过 ch 返回结果
emitReply(ch)
}
} else {
// 进入此分支说明我们正在等待 MulitBulkReply 或 BulkReply 的后续行
// MulitBulkReply 的后续行有两种,BulkHeader 或者 BulkString
if isBulkHeader(line) {
bulkLen = parseBulkHeader()
} else {
// 我们正在读取一个 BulkString, 它可能是 MulitBulkReply 或 BulkReply
args = append(args, line)
}
if len(args) == expectedArgsCount { // 我们已经读取了所有后续行
// 通过 ch 返回结果
emitReply(ch)
// 重置状态, 准备解析下一条 Reply
readingMultiLine = false
expectedArgsCount = 0
args = nil
bulkLen = 0
}
}
}
}
工具函数的实现:
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
if state.bulkLen == 0 { // read simple line
msg, err = bufReader.ReadBytes('\n')
if err != nil {
return nil, true, err
}
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
return nil, false, errors.New("protocol error: " + string(msg))
}
} else { // read bulk line (binary safe)
msg = make([]byte, state.bulkLen+2)
_, err = io.ReadFull(bufReader, msg)
if err != nil {
return nil, true, err
}
if len(msg) == 0 ||
msg[len(msg)-2] != '\r' ||
msg[len(msg)-1] != '\n' {
return nil, false, errors.New("protocol error: " + string(msg))
}
state.bulkLen = 0
}
return msg, false, nil
}
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// first line of multi bulk reply
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = int(expectedLine)
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen == -1 { // null bulk
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
func parseSingleLineReply(msg []byte) (redis.Reply, error) {
str := strings.TrimSuffix(string(msg), "\n")
str = strings.TrimSuffix(str, "\r")
var result redis.Reply
switch msg[0] {
case '+': // status reply
result = reply.MakeStatusReply(str[1:])
case '-': // err reply
result = reply.MakeErrReply(str[1:])
case ':': // int reply
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error: " + string(msg))
}
result = reply.MakeIntReply(val)
default:
// parse as text protocol
strs := strings.Split(str, " ")
args := make([][]byte, len(strs))
for i, s := range strs {
args[i] = []byte(s)
}
result = reply.MakeMultiBulkReply(args)
}
return result, nil
}