Redis 速度快,很大一部分原因是因为它所有的数据都存储在内存中。如果断电或者宕机,都会导致内存中的数据丢失。为了实现重启后数据不丢失,Redis 提供了两种持久化的方案,一种是 RDB 快照(Redis DataBase),一种是 AOF(Append Only File)。本节我们基于Redis6.0来分析两种持久化机制。

一,RDB

RDB 是 Redis 默认的持久化方案。当满足一定条件的时候,会把当前内存中的数据写入磁盘,生成一个快照文件 dump.rdb。Redis 重启会通过加载 dump.rdb 文件恢复数据。

1.RDB的触发规则

1.1 自动触发

在redis.conf配置文件提供了rdb触发的配置规则,其中定义了触发把数据保存到磁盘的触发频率。

  1. save 900 1 # 900 秒内至少有一个 key 被修改(包括添加)
  2. save 300 10 # 400 秒内至少有 10 个 key 被修改
  3. save 60 10000 # 60 秒内至少有 10000 个 key 被修改

上面的配置是不冲突的,只要满足任意一个都会触发。

另外我们还可以配置rdb文件的目录和文件位置:

# 文件路径,
dir ./
# 文件名称
dbfilename dump.rdb
# 是否是 LZF 压缩 rdb 文件
rdbcompression yes
# 开启数据校验
rdbchecksum yes

配置说明:

参数 说明
dir rdb 文件默认在启动目录下(相对路径)config get dir 获取
dbfilename 文件名称
rdbcompression 开启压缩可以节省存储空间,但是会消耗一些 CPU 的计算时间,默认开启
rdbchecksum 使用 CRC64 算法来进行数据校验,但是这样做会增加大约 10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能。

为什么停止Redis服务的手没有save,但是重启以后数据还在? 这是因为rdb还有两种触发方式:

  1. shutdown触发,保证服务器正常关闭
  2. flushall,rdb文件是空的,没有什么意义

1.2 手动触发

如果我们需要重启服务或者迁移数据,这个时候就需要手动触 RDB 快照保存。Redis提供了两条命令:

  1. save
    1. save 在生成快照的时候会阻塞当前 Redis 服务器, Redis 不能处理其他命令。如果内存中的数据比较多,会造成 Redis 长时间的阻塞。生产环境不建议使用这个命令。为了解决这个问题,Redis 提供了第二种方式。
  2. bgsave
    1. 执行 bgsave 时,Redis 会在后台异步进行快照操作,快照同时还可以响应客户端请求。
    2. Redis 进程执行 fork 操作创建子进程(copy-on-write),RDB 持久化过程由子进程负责,完成后自动结束。它不会记录 fork 之后的后续命令。阻塞只发生在fork 阶段,一般时间很短。

用 lastsave 命令可以查看最近一次成功生成快照的时间。

2.RDB数据的恢复

2.1 shutdown持久化

首先添加键值用来测试:

redis> set k1 1
redis> set k2 2
redis> set k3 3
redis> set k4 4
redis> set k5 5

执行shutdown命令停止服务器,触发 save操作。执行备份 dump.rdb 文件的命令cp dump.rdb dump.rdb.bak。再次启动服务器,执行keys *命令,发现五个key都在。

2.2 模拟丢失数据

执行flushall命令模拟数据丢失,触发 save,执行shutdown命令停止服务器,再次启动服务器,执行keys *命令,发现什么都没有了。

2.3 通过备份文件恢复数据

删除dump.rdb文件,将第一步中备份的dump.rdb.bak文件重命名为dump.rdb,启动服务器,执行keys *命令,发现五个key通过dump文件恢复回来了。

3.RDB文件的优缺点

3.1 优点

  1. RDB 是一个非常紧凑(compact)的文件,它保存了 redis 在某个时间点上的数据集。这种文件非常适合用于进行备份和灾难恢复。

  2. 生成 RDB 文件的时候,redis 主进程会 fork()一个子进程来处理所有保存工作,主进程不需要进行任何磁盘 IO 操作。

  3. RDB 在恢复大数据集时的速度比 AOF 的恢复速度要快。

3.2 缺点

  1. RDB 方式数据没办法做到实时持久化/秒级持久化。因为 bgsave 每次运行都要执行 fork 操作创建子进程,频繁执行成本过高。

  2. 在一定间隔时间做一次备份,所以如果 redis 意外 down 掉的话,就会丢失最后一次快照之后的所有修改(数据有丢失)。

通常如果数据相对来说比较重要,希望将损失降到最小,则可以使用 AOF 方式进行持久化。


4.RDB持久化源码分析

Redis的整个持久化流程大致如下:

RDB.jpg

rdb的自动间隔保存由serverCron函数触发,满足配置的自动保存条件则调用rdbSaveBackground函数开启新进程进行dump操作。

我们看一下rdbSaveBackground函数的主要逻辑:

  1. 开启父子进程通信管道
  2. 调用redisFork函数开启子进程,如果开启成功
    1. 调用rdbSave函数执行持久化操作
    2. 持久化成功,通过管道向父进程发送持久化的统计数据
    3. 执行子进程退出逻辑
  3. 如果开启子进程失败
    1. 如果子进程已经存在,关闭子进程,返回错误
    2. 记录持久化进程信息
  4. 当前时间事件到此结束,aemain会开启下一个事件(文件事件或事件事件)

    int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
     pid_t childpid;
    
     if (hasActiveChildProcess()) return C_ERR;
    
     server.dirty_before_bgsave = server.dirty;
     server.lastbgsave_try = time(NULL);
     // 开启父子进程通信管道
     openChildInfoPipe();
     // 子进程执行持久化逻辑
     if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
         int retval;
    
         /* Child */
         redisSetProcTitle("redis-rdb-bgsave");
         redisSetCpuAffinity(server.bgsave_cpulist);
         // rdb持久化入口
         retval = rdbSave(filename,rsi);
         if (retval == C_OK) {
             // 通过管道向父进程发送持久化的统计数据
             sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
         }
         // 子进程退出
         exitFromChild((retval == C_OK) ? 0 : 1);
     } else {
         /* Parent */
    
         if (childpid == -1) {
             //关闭子进程
             closeChildInfoPipe();
             server.lastbgsave_status = C_ERR;
             serverLog(LL_WARNING,"Can't save in background: fork: %s",
                 strerror(errno));
             return C_ERR;
         }
         serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);
         // 记录持久化进程信息
         server.rdb_save_time_start = time(NULL);
         server.rdb_child_pid = childpid;
         server.rdb_child_type = RDB_CHILD_TYPE_DISK;
         updateDictResizePolicy();
         return C_OK;
         // 当前时间事件到此结束,aemain会开启下一个事件(文件事件或事件事件)
     }
     return C_OK; /* unreached */
    }
    

    这里有两个核心的函数,确切的说是系统调用,fork和exit。fork会创建一个子进程,而父进程运行到wait函数之后会阻塞自己直到子进程调用exit结束自己的生命周期。

接下来我们再来看一下rdbSave函数,首先创建保存数据库的临时文件,接下来初始化redis自己封装的io工具,调用rdbSaveRio函数同步数据,对临时文件改名,保存完成后,修改没有同步到磁盘的写操作数为0。rdbSaveRio函数的逻辑比较简单,就是遍历数据库,然后拿到所有节点,把所有键值对写入磁盘,生成一个合乎rdb文件规范的dump。

int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp = NULL;
    rio rdb;
    int error = 0;
    // 创建保存数据库的临时文件
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }
    // 初始化redis自己封装的io工具
    rioInitWithFile(&rdb,fp);
    startSaving(RDBFLAGS_NONE);
    //刷盘
    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
    // 同步数据
    if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp)) goto werr;
    if (fsync(fileno(fp))) goto werr;
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    // 对临时文件改名,扶正,rename函数确保原子性
    if (rename(tmpfile,filename) == -1) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }

    serverLog(LL_NOTICE,"DB saved on disk");
    // 保存好后,没有同步到磁盘的写操作数则为0
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    stopSaving(1);
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    if (fp) fclose(fp);
    unlink(tmpfile);
    stopSaving(0);
    return C_ERR;
}

注意:子进程在dump结束后,会将未持久化的变量(dirty)置为0。如果父进程在子进程尚未结束的时候就接受IO操作请求,那么必将污染这个变量,同时新增的IO写操作可能部分写入磁盘,部分没写入磁盘。数据不一致,会导致灾难。所以父进程难道会一直等到子进程结束再接收新的请求么?

其实不是,这里面利用到了写时复制技术,父进程在调用了fork执行后台保存rdb后,就会去执行下一个事件。下面我们来唠叨一下Redis对于写时复制技术的应用。

Redis 主进程可以 fork 一个子进程来执行 RDB 持久化,二者共享同一份内存空间。fork 操作之后主进程可以继续对外提供服务,那么必然存在对内存的写操作,如果共享的内存数据因此发生改变,那子进程保存的 RDB 就不能称为数据快照。因此,这其中必然有一种机制保证了主进程修改的内存数据对子进程不可见,其实这就采用了写时复制技术。

fork 操作之后,内核会把父进程中所有的内存页都设为只读权限,然后将子进程的地址空间指向父进程。当其中某个进程写内存时,CPU 检测到内存页是只读的,于是触发页异常中断(page-fault),从而进入内核态。内核会把触发异常的页复制一份分配给写内存的进程,于是这个进程可以在复制的页上进行写操作,而不会更改共享的内存数据。

如果在 fork 后父子进程都还需要继续进行写操作,那么会产生大量的页异常中断(page-fault),造成内核态切换频繁,性能损失较大。Redis 在 rehash 阶段写操作是无法避免的,所以在 fork 出子进程之后会调用 updateDictResizePolicy函数 关闭 rehash,尽量减少写操作,最大限度地节约内存。

void updateDictResizePolicy(void) {
    if (!hasActiveChildProcess())
        dictEnableResize();
    else
        dictDisableResize();
}

二,AOF

AOF全称【Append Only File】,Redis 默认不开启AOF。AOF 采用日志的形式来记录每个写操作,并追加到文件中。开启后,执行更改 Redis 数据的命令时,就会把命令写入到 AOF 文件中。Redis 重启时会根据日志文件的内容把写指令从前到后执行一次以完成数据的恢复工作。

1.AOF的配置

在redis.conf配置文件中为我们提供了一下配置AOF相关的参数:

# 开关
appendonly no
# 文件名
appendfilename "appendonly.aof"
参数 说明
appendonly Redis 默认只开启 RDB 持久化,开启 AOF 需要修改为 yes
appendfilename “appendonly.aof” 路径也是通过 dir 参数配置 config get dir

由于操作系统的缓存机制,AOF 数据并没有真正地写入硬盘,而是进入了系统的硬盘缓存。

参数 说明
appendfsync everysec AOF 持久化策略(硬盘缓存到磁盘),默认 everysec
- no 表示不执行 fsync,由操作系统保证数据同步到磁盘,速度最快,但是不太安全
- always 表示每次写入都执行 fsync,以保证数据同步到磁盘,效率很低
- everysec 表示每秒执行一次 fsync,可能会导致丢失这 1s 数据。通常选择 everysec ,兼顾安全性和效率。

由于 AOF 持久化是 Redis 不断将写命令记录到 AOF 文件中,随着 Redis 不断的进行,AOF 的文件会越来越大,文件越大,占用服务器内存越大以及 AOF 恢复要求时间越长。比方说set k1 v1,执行一千次结果还是一样的,为了解决这个问题,Redis 新增了重写机制,当 AOF 文件的大小超过所设定的阈值时,Redis 就会启动 AOF 文件的内容压缩,只保留可以恢复数据的最小指令集。redis提供了命令 bgrewriteaof 来重写AOF文件。AOF 文件重写并不是对原文件进行重新整理,而是直接读取服务器现有的键值对,然后用一条命令去代替之前记录这个键值对的多条命令,生成一个新的文件后去替换原来的 AOF 文件。

在redis.conf中提供了有关AOF重写触发机制的参数:

# 重写触发机制
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
参数 说明
auto-aof-rewrite-percentage 默认值为 100。aof 自动重写配置,当目前 aof 文件大小超过上一次重写的 aof 文件大小的
百分之多少进行重写,即当 aof 文件增长到一定大小的时候,Redis 能够调用 bgrewriteaof
对日志文件进行重写。当前 AOF 文件大小是上次日志重写得到 AOF 文件大小的二倍(设
置为 100)时,自动启动新的日志重写过程。
auto-aof-rewrite-min-size 默认 64M。设置允许重写的最小 aof 文件大小,避免了达到约定百分比但尺寸仍然很小的
情况还要重写。

此外,还有两个与AOF重写相关的参数:

参数 说明
no-appendfsync-on-rewrite 在 aof 重写或者写入 rdb 文件的时候,会执行大量 IO,此时对于 everysec 和 always 的 aof模式来说,执行 fsync 会造成阻塞过长时间,no-appendfsync-on-rewrite 字段设置为默认设置为 no。如果对延迟要求很高的应用,这个字段可以设置为 yes,否则还是设置为 no,这样对持久化特性来说这是更安全的选择。设置为 yes 表示 rewrite 期间对新写操作不 fsync,暂时存在内存中,等 rewrite 完成后再写入,默认为 no,建议修改为 yes。Linux 的默认 fsync策略是 30 秒。可能丢失 30 秒数据。
aof-load-truncated aof 文件可能在尾部是不完整的,当 redis 启动的时候,aof 文件的数据被载入内存。重启可能发生在 redis所在的主机操作系统宕机后,尤其在ext4文件系统没有加上data=ordered选项,出现这种现象。redis 宕机或者异常终止不会造成尾部不完整现象,可以选择让 redis退出,或者导入尽可能多的数据。如果选择的是 yes,当截断的 aof 文件被导入的时候,会自动发布一个 log 给客户端然后 load。如果是 no,用户必须手动 redis-check-aof 修复 AOF文件才可以。默认值为 yes。

重启Redis之后就会进行AOF数据的恢复。

2.AOF文件的优缺点

2.1 优点

AOF 持久化的方法提供了多种的同步频率,即使使用默认的同步频率每秒同步一次,Redis 最多也就丢失 1 秒的数据而已。

2.2 缺点

  1. 对于具有相同数据的的 Redis,AOF 文件通常会比 RDF 文件体积更大(RDB存的是数据快照)。

  2. 虽然 AOF 提供了多种同步的频率,默认情况下,每秒同步一次的频率也具有较高的性能。在高并发的情况下,RDB 比 AOF 具好更好的性能保证。

2.3 比较

如果可以忍受一小段时间内数据的丢失,毫无疑问使用 RDB 是最好的,定时生成RDB 快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快。否则就使用 AOF 重写。但是一般情况下建议不要单独使用某一种持久化机制,而是应该两种一起用,在这种情况下,当 redis 重启的时候会优先载入 AOF 文件来恢复原始的数据,因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。

3.AOF持久化源码分析

AOF主要是两块:一块是增量写磁盘,一块儿是文件全量重写。我们先来看简单的增量写磁盘逻辑。

3.1 增量写磁盘

① 增量写命令追加到缓冲区

  • redis有个缓冲区,未被写入磁盘的命令首先被存入缓冲区,达到条件后再写入磁盘。这个缓冲区定义在redisServer结构体中。

    struct redisServer {
      sds aof_buf;      /* AOF buffer, written before entering the event loop */
    }
    
  • redis每次执行完写操作后,会调用propagate函数将写操作追加到aof_buf缓冲区。

  • propagate函数会判断如果aof功能是开启的,就调用feedAppendOnlyFile函数把新的命令追加到aof_buf缓冲区。

    void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                 int flags) {
      if (server.in_exec && !server.propagate_in_transaction)
          execCommandPropagateMulti(dbid);
      // aof功能打开的前提下,把新的追加aof_buffer
      if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
          feedAppendOnlyFile(cmd, dbid, argv, argc);
      if (flags & PROPAGATE_REPL)
          replicationFeedSlaves(server.slaves, dbid, argv, argc);
    }
    

    ② 缓冲区数据刷入磁盘

前面说过,aof缓冲区同步到磁盘有三种策略。

每秒钟同步一次是一种折中(compromise)策略。同步需要调用系统函数fsync,涉及到操作系统用户态和核心态的切换,同时真正的磁盘IO发生在这里,比较耗性能。

AOF的入口还是在serverCron函数中,如果设置了aof延迟刷盘,那就在这里调用flushAppendOnlyFile函数去同步刷盘。

flushAppendOnlyFile函数首先调用write函数将aof_buf的数据写入文件的内核缓冲区,然后根据配置的刷盘策略不同,执行不同的操作。

  • 如果配置每次都刷盘直接调用fsync函数将内核缓冲区的数据写入磁盘

  • 如果配置的每秒刷盘,会将刷盘作为一个任务放到队列里面

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) {
        /* Check if we need to do fsync even the aof buffer is empty,
         * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
         * called only when aof buffer is not empty, so if users
         * stop write commands before fsync called in one second,
         * the data in page cache cannot be flushed in time. */
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
            server.aof_fsync_offset != server.aof_current_size &&
            server.unixtime > server.aof_last_fsync &&
            !(sync_in_progress = aofFsyncInProgress())) {
            goto try_fsync;
        } else {
            return;
        }
    }

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = aofFsyncInProgress();

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */

    if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
        usleep(server.aof_flush_sleep);
    }

    latencyStartMonitor(latency);
    // 调用write函数将aof_buf的数据写入文件的内核缓冲区
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    /* We want to capture different events for delayed writes:
     * when the delay happens with a pending fsync, or with a saving child
     * active, and when the above two conditions are missing.
     * We also use an additional event name to save all samples which is
     * useful for graphing / monitoring purposes. */
    if (sync_in_progress) {
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (hasActiveChildProcess()) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
    latencyAddSampleIfNeeded("aof-write",latency);

    /* We performed the write so reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        /* Log the AOF write error and record the error code. */
        if (nwritten == -1) {
            if (can_log) {
                serverLog(LL_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                serverLog(LL_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    serverLog(LL_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftruncate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synced on disk. */
            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = C_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        if (server.aof_last_write_status == C_ERR) {
            serverLog(LL_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = C_OK;
        }
    }
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

try_fsync:
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
        return;

    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* redis_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        // 直接调用fsync函数将内核缓冲区的数据写入磁盘
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_fsync_offset = server.aof_current_size;
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) {
            // 创建后台线程,在后台线程里调用sync函数将内核缓冲区的数据写入磁盘
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        server.aof_last_fsync = server.unixtime;
    }
}

后台调度线程会从任务队列中取出任务并执行,相关的逻辑在bioProcessBackgroundJobs函数中,代码很多,但是其实不难,首先从任务队列中取出位于队首的任务,根据任务类型执行任务【执行fsync同步数据】,最后释放存放任务的对象并删除任务队列队首元素。

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    switch (type) {
    case BIO_CLOSE_FILE:
        redis_set_thread_title("bio_close_file");
        break;
    case BIO_AOF_FSYNC:
        redis_set_thread_title("bio_aof_fsync");
        break;
    case BIO_LAZY_FREE:
        redis_set_thread_title("bio_lazy_free");
        break;
    }

    redisSetCpuAffinity(server.bio_cpulist);

    makeThreadKillable();

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        // 从任务队列中取出位于队首的任务
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        // 根据任务类型执行任务
        if (type == BIO_CLOSE_FILE) {
            // 关闭文件
            close(job->fd);
        } else if (type == BIO_AOF_FSYNC) {
            // 执行fsync同步数据
            redis_fsync(job->fd);
        } else if (type == BIO_LAZY_FREE) {
            job->free_fn(job->free_args);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        // 释放存放任务的对象
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        // 删除任务队列队首元素
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

③ write & fsync

write系统调用不保证数据最终写入磁盘,如果遇到机器掉电,内核缓冲区的数据没有写入磁盘,那么数据也就丢了。所以仅调用write是不够的。

image.png

3.2 全量重写

仍然是在serverCron函数里判断是否满足重写的条件,如果满足,其实调用的就是写rdb的函数rewriteAppendOnlyFileBackground,开启一个子进程,逐个把所有的键值对写入新的AOF文件中。然后主线程继续去处理事件。

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;

    if (hasActiveChildProcess()) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
        char tmpfile[256];

        /* Child */
        redisSetProcTitle("redis-aof-rewrite");
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            sendChildCOWInfo(CHILD_TYPE_AOF, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        if (childpid == -1) {
            closeChildInfoPipe();
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %ld",(long) childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}

AOF.jpg


至此,Redis的持久化机制我们就分析完了。