Redis介绍

Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。

Redis支持的数据结构

Redis支持诸如字符串(string)、哈希(hashe)、列表(list)、集合(set)、带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index)和流(stream)等数据结构。

Redis应用场景

  • 缓存系统,减轻主数据库(MySQL)的压力。
  • 计数场景,比如微博、抖音中的关注数和粉丝数。
  • 热门排行榜,需要排序的场景特别适合使用ZSET。
  • 利用 LIST 可以实现队列的功能。
  • 利用 HyperLogLog 统计UV、PV等数据。
  • 使用 geospatial index 进行地理位置相关查询。

    准备Redis环境

    读者可以选择在本机安装 redis 或使用云数据库,这里直接使用Docker启动一个 redis 环境,方便学习使用。
    使用下面的命令启动一个名为 redis507 的 5.0.7 版本的 redis server环境。
    docker run --name redis507 -p 6379:6379 -d redis:5.0.7
    注意:此处的版本、容器名和端口号可以根据自己需要设置。
    启动一个 redis-cli 连接上面的 redis server。
    docker run -it --network host --rm redis:5.0.7 redis-cli

    go-redis库

    安装

    Go 社区中目前有很多成熟的 redis client 库,比如[https://github.com/gomodule/redigohttps://github.com/go-redis/redis,读者可以自行选择适合自己的库。本书使用 go-redis 这个库来操作 Redis 数据库。
    使用以下命令下安装 go-redis 库。
    go get github.com/go-redis/redis/v8

    连接

    普通连接模式

    go-redis 库中使用 redis.NewClient 函数连接 Redis 服务器。
    1. rdb := redis.NewClient(&redis.Options{
    2. Addr: "localhost:6379",
    3. Password: "", // 密码
    4. DB: 0, // 数据库
    5. PoolSize: 20, // 连接池大小
    6. })
    除此之外,还可以使用 redis.ParseURL 函数从表示数据源的字符串中解析得到 Redis 服务器的配置信息。 ```go opt, err := redis.ParseURL(“redis://:@localhost:6379/“) if err != nil { panic(err) }

rdb := redis.NewClient(opt)

  1. <a name="zGWPM"></a>
  2. #### TLS连接模式
  3. 如果使用的是 TLS 连接方式,则需要使用 tls.Config 配置。
  4. ```go
  5. rdb := redis.NewClient(&redis.Options{
  6. TLSConfig: &tls.Config{
  7. MinVersion: tls.VersionTLS12,
  8. // Certificates: []tls.Certificate{cert},
  9. // ServerName: "your.domain.com",
  10. },
  11. })

Redis Sentinel模式

使用下面的命令连接到由 Redis Sentinel 管理的 Redis 服务器。

  1. rdb := redis.NewFailoverClient(&redis.FailoverOptions{
  2. MasterName: "master-name",
  3. SentinelAddrs: []string{":9126", ":9127", ":9128"},
  4. })

Redis Cluster模式

使用下面的命令连接到 Redis Cluster,go-redis 支持按延迟或随机路由命令。

  1. rdb := redis.NewClusterClient(&redis.ClusterOptions{
  2. Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
  3. // 若要根据延迟或随机路由命令,请启用以下命令之一
  4. // RouteByLatency: true,
  5. // RouteRandomly: true,
  6. })

基本使用

执行命令

下面的示例代码演示了 go-redis 库的基本使用。

  1. // doCommand go-redis基本使用示例
  2. func doCommand() {
  3. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  4. defer cancel()
  5. // 执行命令获取结果
  6. val, err := rdb.Get(ctx, "key").Result()
  7. fmt.Println(val, err)
  8. // 先获取到命令对象
  9. cmder := rdb.Get(ctx, "key")
  10. fmt.Println(cmder.Val()) // 获取值
  11. fmt.Println(cmder.Err()) // 获取错误
  12. // 直接执行命令获取错误
  13. err = rdb.Set(ctx, "key", 10, time.Hour).Err()
  14. // 直接执行命令获取值
  15. value := rdb.Get(ctx, "key").Val()
  16. fmt.Println(value)
  17. }

执行任意命令

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。具体使用方法如下。

  1. // doDemo rdb.Do 方法使用示例
  2. func doDemo() {
  3. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  4. defer cancel()
  5. // 直接执行命令获取错误
  6. err := rdb.Do(ctx, "set", "key", 10, "EX", 3600).Err()
  7. fmt.Println(err)
  8. // 执行命令获取结果
  9. val, err := rdb.Do(ctx, "get", "key").Result()
  10. fmt.Println(val, err)
  11. }

redis.Nil

go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。因此在使用 go-redis 时需要注意对返回错误的判断。在某些场景下我们应该区别处理 redis.Nil 和其他不为 nil 的错误。

  1. // getValueFromRedis redis.Nil判断
  2. func getValueFromRedis(key, defaultValue string) (string, error) {
  3. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  4. defer cancel()
  5. val, err := rdb.Get(ctx, key).Result()
  6. if err != nil {
  7. // 如果返回的错误是key不存在
  8. if errors.Is(err, redis.Nil) {
  9. return defaultValue, nil
  10. }
  11. // 出其他错了
  12. return "", err
  13. }
  14. return val, nil
  15. }

其他示例

zset示例

下面的示例代码演示了如何使用 go-redis 库操作 zset。

  1. // zsetDemo 操作zset示例
  2. func zsetDemo() {
  3. // key
  4. zsetKey := "language_rank"
  5. // value
  6. languages := []*redis.Z{
  7. {Score: 90.0, Member: "Golang"},
  8. {Score: 98.0, Member: "Java"},
  9. {Score: 95.0, Member: "Python"},
  10. {Score: 97.0, Member: "JavaScript"},
  11. {Score: 99.0, Member: "C/C++"},
  12. }
  13. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  14. defer cancel()
  15. // ZADD
  16. err := rdb.ZAdd(ctx, zsetKey, languages...).Err()
  17. if err != nil {
  18. fmt.Printf("zadd failed, err:%v\n", err)
  19. return
  20. }
  21. fmt.Println("zadd success")
  22. // 把Golang的分数加10
  23. newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0, "Golang").Result()
  24. if err != nil {
  25. fmt.Printf("zincrby failed, err:%v\n", err)
  26. return
  27. }
  28. fmt.Printf("Golang's score is %f now.\n", newScore)
  29. // 取分数最高的3个
  30. ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0, 2).Val()
  31. for _, z := range ret {
  32. fmt.Println(z.Member, z.Score)
  33. }
  34. // 取95~100分的
  35. op := &redis.ZRangeBy{
  36. Min: "95",
  37. Max: "100",
  38. }
  39. ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result()
  40. if err != nil {
  41. fmt.Printf("zrangebyscore failed, err:%v\n", err)
  42. return
  43. }
  44. for _, z := range ret {
  45. fmt.Println(z.Member, z.Score)
  46. }
  47. }

执行上面的函数将得到如下输出结果。

  1. zadd success
  2. Golang's score is 100.000000 now.
  3. Golang 100
  4. C/C++ 99
  5. Java 98
  6. Python 95
  7. JavaScript 97
  8. Java 98
  9. C/C++ 99
  10. Golang 100

扫描或遍历所有key

你可以使用KEYS prefix: 命令按前缀获取所有 key。
`vals, err := rdb.Keys(ctx, “prefix
“).Result()`
但是如果需要扫描数百万的 key ,那速度就会比较慢。这种场景下你可以使用Scan 命令来遍历所有符合要求的 key。

  1. // scanKeysDemo1 按前缀查找所有key示例
  2. func scanKeysDemo1() {
  3. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  4. defer cancel()
  5. var cursor uint64
  6. for {
  7. var keys []string
  8. var err error
  9. // 按前缀扫描key
  10. keys, cursor, err = rdb.Scan(ctx, cursor, "prefix:*", 0).Result()
  11. if err != nil {
  12. panic(err)
  13. }
  14. for _, key := range keys {
  15. fmt.Println("key", key)
  16. }
  17. if cursor == 0 { // no more keys
  18. break
  19. }
  20. }
  21. }

Go-redis 允许将上面的代码简化为如下示例。

  1. // scanKeysDemo2 按前缀扫描key示例
  2. func scanKeysDemo2() {
  3. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  4. defer cancel()
  5. // 按前缀扫描key
  6. iter := rdb.Scan(ctx, 0, "prefix:*", 0).Iterator()
  7. for iter.Next(ctx) {
  8. fmt.Println("keys", iter.Val())
  9. }
  10. if err := iter.Err(); err != nil {
  11. panic(err)
  12. }
  13. }

例如,我们可以写出一个将所有匹配指定模式的 key 删除的示例。

  1. // delKeysByMatch 按match格式扫描所有key并删除
  2. func delKeysByMatch(match string, timeout time.Duration) {
  3. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  4. defer cancel()
  5. iter := rdb.Scan(ctx, 0, match, 0).Iterator()
  6. for iter.Next(ctx) {
  7. err := rdb.Del(ctx, iter.Val()).Err()
  8. if err != nil {
  9. panic(err)
  10. }
  11. }
  12. if err := iter.Err(); err != nil {
  13. panic(err)
  14. }
  15. }

此外,对于 Redis 中的 set、hash、zset 数据类型,go-redis 也支持类似的遍历方法。

  1. iter := rdb.SScan(ctx, "set-key", 0, "prefix:*", 0).Iterator()
  2. iter := rdb.HScan(ctx, "hash-key", 0, "prefix:*", 0).Iterator()
  3. iter := rdb.ZScan(ctx, "sorted-hash-key", 0, "prefix:*", 0).Iterator(

Pipeline

Redis Pipeline 允许通过使用单个 client-server-client 往返执行多个命令来提高性能。区别于一个接一个地执行100个命令,你可以将这些命令放入 pipeline 中,然后使用1次读写操作像执行单个命令一样执行它们。这样做的好处是节省了执行命令的网络往返时间(RTT)。
y在下面的示例代码中演示了使用 pipeline 通过一个 write + read 操作来执行多个命令。

  1. pipe := rdb.Pipeline()
  2. incr := pipe.Incr(ctx, "pipeline_counter")
  3. pipe.Expire(ctx, "pipeline_counter", time.Hour)
  4. cmds, err := pipe.Exec(ctx)
  5. if err != nil {
  6. panic(err)
  7. }
  8. // 在执行pipe.Exec之后才能获取到结果
  9. fmt.Println(incr.Val())

上面的代码相当于将以下两个命令一次发给 Redis Server 端执行,与不使用 Pipeline 相比能减少一次RTT。
INCR pipeline_counter
EXPIRE pipeline_counts 3600
或者,你也可以使用Pipelined 方法,它会在函数退出时调用 Exec。

  1. var incr *redis.IntCmd
  2. cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  3. incr = pipe.Incr(ctx, "pipelined_counter")
  4. pipe.Expire(ctx, "pipelined_counter", time.Hour)
  5. return nil
  6. })
  7. if err != nil {
  8. panic(err)
  9. }
  10. // 在pipeline执行后获取到结果
  11. fmt.Println(incr.Val())

我们可以遍历 pipeline 命令的返回值依次获取每个命令的结果。下方的示例代码中使用pipiline一次执行了100个 Get 命令,在pipeline 执行后遍历取出100个命令的执行结果。

  1. cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  2. for i := 0; i < 100; i++ {
  3. pipe.Get(ctx, fmt.Sprintf("key%d", i))
  4. }
  5. return nil
  6. })
  7. if err != nil {
  8. panic(err)
  9. }
  10. for _, cmd := range cmds {
  11. fmt.Println(cmd.(*redis.StringCmd).Val())
  12. }

在那些我们需要一次性执行多个命令的场景下,就可以考虑使用 pipeline 来优化。

事务

Redis 是单线程执行命令的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,Multi/exec能够确保在multi/exec两个语句之间的命令之间没有其他客户端正在执行命令。
在这种场景我们需要使用 TxPipeline 或 TxPipelined 方法将 pipeline 命令使用 MULTI 和EXEC包裹起来。

  1. // TxPipeline demo
  2. pipe := rdb.TxPipeline()
  3. incr := pipe.Incr(ctx, "tx_pipeline_counter")
  4. pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
  5. _, err := pipe.Exec(ctx)
  6. fmt.Println(incr.Val(), err)
  7. // TxPipelined demo
  8. var incr2 *redis.IntCmd
  9. _, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  10. incr2 = pipe.Incr(ctx, "tx_pipeline_counter")
  11. pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
  12. return nil
  13. })
  14. fmt.Println(incr2.Val(), err)

上面代码相当于在一个RTT下执行了下面的redis命令:

  1. MULTI
  2. INCR pipeline_counter
  3. EXPIRE pipeline_counts 3600
  4. EXEC

Watch

我们通常搭配 WATCH命令来执行事务操作。从使用WATCH命令监视某个 key 开始,直到执行EXEC命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。
Watch方法接收一个函数和一个或多个key作为参数。
Watch(fn func(*Tx) error, keys ...string) error
下面的代码片段演示了 Watch 方法搭配 TxPipelined 的使用示例。

  1. // watchDemo 在key值不变的情况下将其值+1
  2. func watchDemo(ctx context.Context, key string) error {
  3. return rdb.Watch(ctx, func(tx *redis.Tx) error {
  4. n, err := tx.Get(ctx, key).Int()
  5. if err != nil && err != redis.Nil {
  6. return err
  7. }
  8. // 假设操作耗时5秒
  9. // 5秒内我们通过其他的客户端修改key,当前事务就会失败
  10. time.Sleep(5 * time.Second)
  11. _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  12. pipe.Set(ctx, key, n+1, time.Hour)
  13. return nil
  14. })
  15. return err
  16. }, key)
  17. }

将上面的函数执行并打印其返回值,如果我们在程序运行后的5秒内修改了被 watch 的 key 的值,那么该事务操作失败,返回redis: transaction failed错误。
最后我们来看一个 go-redis 官方文档中使用 GET 、SET和WATCH命令实现一个 INCR 命令的完整示例。

  1. const routineCount = 100
  2. increment := func(key string) error {
  3. txf := func(tx *redis.Tx) error {
  4. // 获得当前值或零值
  5. n, err := tx.Get(key).Int()
  6. if err != nil && err != redis.Nil {
  7. return err
  8. }
  9. // 实际操作(乐观锁定中的本地操作)
  10. n++
  11. // 仅在监视的Key保持不变的情况下运行
  12. _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
  13. // pipe 处理错误情况
  14. pipe.Set(key, n, 0)
  15. return nil
  16. })
  17. return err
  18. }
  19. for retries := routineCount; retries > 0; retries-- {
  20. err := rdb.Watch(txf, key)
  21. if err != redis.TxFailedErr {
  22. return err
  23. }
  24. // 乐观锁丢失
  25. }
  26. return errors.New("increment reached maximum number of retries")
  27. }
  28. var wg sync.WaitGroup
  29. wg.Add(routineCount)
  30. for i := 0; i < routineCount; i++ {
  31. go func() {
  32. defer wg.Done()
  33. if err := increment("counter3"); err != nil {
  34. fmt.Println("increment error:", err)
  35. }
  36. }()
  37. }
  38. wg.Wait()
  39. n, err := rdb.Get("counter3").Int()
  40. fmt.Println("ended with", n, err)

redis官方文档:https://pkg.go.dev/github.com/go-redis/redis