为什么需要集群?

  1. 性能
    1. Redis 本身的 QPS 已经很高了,但是如果在一些并发量非常高的情况下,性能还是会受到影响。这个时候我们希望有更多的 Redis 服务来完成工作。
  2. 扩展
    1. 出于存储的考虑。因为 Redis 所有的数据都放在内存中,如果数据量大,很容易受到硬件的限制。升级硬件收效和成本比太低,所以我们需要有一种横向扩展的方法。
  3. 可用性
    1. 可用性和安全的问题。如果只有一个 Redis 服务,一旦服务宕机,那么所有的客户端都无法访问,会对业务造成很大的影响。另一个,如果硬件发生故障,而单机的数据无法恢复的话,带来的影响也是灾难性的。

可用性、数据安全、性能都可以通过搭建多个 Reids 服务实现。其中有一个是主节点(master),可以有多个从节点(slave)。主从之间通过数据同步,存储完全相同的数据。如果主节点发生故障,则把某个从节点改成主节点,访问新的主节点。

一,主从配置

我们使用三台云服务器搭建主从集群,通过docker来运行redis实例,在每个 slave 节点的 redis.conf 配置文件增加一行

  1. slaveof ip port

在主从切换的时候,这个配置会被重写成:

replicaof ip port

或者在启动服务时通过参数指定 master 节点:

./redis-server --slaveof ip port

或在客户端直接执行 slaveof xx xx,使该 Redis 实例成为从节点。

启动后,查看集群状态:

redis> info replication

默认从节点不能写入数据(只读),只能从 master 节点同步数据。get 成功,set 失败。

断开复制

redis> slaveof no one

此时从节点会变成自己的主节点,不再复制数据。

image.png 注意这行配置,如果集群已经搭建成功,但是没有数据的同步,很有可能是redis实例不在同一台机器上,但是默认bind设置会导致redis只能在本地被访问,注释掉这行代码,并把protected-mode no设置好。 �


二,主从原理

1.连接阶段

  • slave node 启动时(执行 slaveof 命令),会在自己本地保存 master node 的信息,包括 master node 的 host 和 ip。
  • slave node 内部有个定时任务 replicationCron(源码 replication.c),每隔 1秒钟检查是否有新的 master node 要连接和复制,如果发现,就跟 master node 建立socket 网络连接,如果连接成功,从节点为该 socket 建立一个专门处理复制工作的文件事件处理器,负责后续的复制工作,如接收 RDB 文件、接收命令传播等。
  • 当从节点变成了主节点的一个客户端之后,会给主节点发送 ping 请求。

2.初次全量同步

当一个redis服务器初次向主服务器发送salveof命令时,redis从服务器会进行一次全量同步。

  1. slave服务器向master发送psync命令(此时发送的是psync ? -1),告诉master需要同步数据了。
  2. master接收到psync命令后会进行BGSAVE命令生成RDB文件快照。
  3. 生成完后,会将RDB文件发送给slave。
  4. slave接收到文件会载入RDB快照,并且将数据库状态变更为master在执行BGSAVE时的状态一致。
  5. master会发送保存在缓冲区里的所有写命令,告诉slave可以进行同步了
  6. slave执行这些写命令。

3.命令传播

slave已经同步过master了,那么如果后续master进行了写操作,比如说一个简单的set aaa redis,那么master执行过当前命令后,会将当前命令异步发送给slave执行一遍,达成数据一致性。延迟是不可避免的,只能通过优化网络。

repl-disable-tcp-nodelay no

当设置为 yes 时,TCP 会对包进行合并从而减少带宽,但是发送的频率会降低,从节点数据延迟增加,一致性变差;具体发送频率与 Linux 内核的配置有关,默认配置为40ms。当设置为 no 时,TCP 会立马将主节点的数据发送给从节点,带宽增加但延迟变小。

一般来说,只有当应用对 Redis 数据不一致的容忍度较高,且主从节点之间网络状况不好时,才会设置为 yes;多数情况使用默认值 no。

如果从节点有一段时间断开了与主节点的连接是不是要重新全量复制一遍?如果可以增量复制,怎么知道上次复制到哪里?通过 master_repl_offset 记录的偏移量。

redis> info replication

4.重新复制

当slave断开重连之后会进行重新同步,重新同步分完全同步和部分同步。

  1. 当slave断开重连后,会发送psync 命令给master。
  2. master收到psync后会返回+continue回复,表示slave可以执行部分同步了。
  3. master发送断线后的写命令给slave
  4. slave执行写命令。

5.三要素

事实上当slave发送psync命令给master之后,master还需要根据以下三点判断是否进行部分同步。

  1. 服务器运行ID

    1. 每个redis服务器开启后会生成运行ID。
    2. 当进行初次同步时,master会将自己的ID告诉slave,slave会记录下来。
    3. 当slave断线重连后,发现ID是这个master的就会尝试进行部分重同步。
    4. 当ID与现在连接的master不一样时会进行完整重同步。
  2. 复制偏移量

    1. 复制偏移量包括master复制偏移量和slave复制偏移量。
    2. 当初次同步过后两个数据库的复制偏移量相同。
    3. 之后master执行一次写命令,那么master的偏移量+1。
    4. master将写命令给slave,slave执行一次,slave偏移量+1,这样版本就能一致。
  3. 复制积压缓冲区

    1. 当slave断开重连后,会发送psync 命令给master。
    2. master首先会对服务器运行id进行判断,如果与自己相同就进行判断偏移量
    3. master会判断自己的偏移量与slave的偏移量是否一致。
    4. 如果不一致,master会去缓冲区中判断slave的偏移量之后的数据是否存在。
    5. 如果存在就会返回+continue回复,表示slave可以执行部分同步了。
    6. master发送断线后的写命令给slave
    7. slave执行写命令。

三,主从复制分析

主从模式解决了数据备份和性能(通过读写分离)的问题,但是还是存在一些不足:

  1. RDB 文件过大的情况下,同步非常耗时。

  2. 在一主一从或者一主多从的情况下,如果主服务器挂了,对外提供的服务就不可用了,单点问题没有得到解决。如果每次都是手动把之前的从服务器切换成主服务器,这个比较费时费力,还会造成一定时间的服务不可用。


四,Master源码分析

Redis[十九]主从集群 - 图2

1. psync命令处理流程

指令psync & sync 是同一个处理函数,就是位于replication.c文件的syncCommand函数,我们来看一下这个函数的主要逻辑:

  1. 参数校验:如果是从机或者sentinel节点接收到这个命令,直接返回。如果当前节点确实是主机,但是与这个从机的连接出了问题,这个时候返回一个错误。如果有其他客户端的命令没有处理完,直接返回,之所以返回是因为需要缓冲区来记录复制偏移量。
  2. 部分同步:如果传入的指令是psync,主机会进行部分重同步【masterTryPartialResynchronization】
  3. 全量同步:如果上一步执行失败了或者传入的指令是sync

    1. 将从机的状态修改为等待主机bgsave开始
    2. 判断如果是初次全量同步,修改主机的repid,清除无效的repid,创建主节点的命令积压缓冲区。
    3. 如果已经有线程在进行bgsave操作,并且会生成rdb文件,校验rdb是否可以复用,如果可以复用,复制rdb文件数据到当前从节点的命令积压缓冲区,主机发送给从机 +FULLRESYNC命令,轮训到从机连接可以写入数据的时候就会把RDB文件发送出去。
    4. 如果没有保存RDB数据的进程在运行,并且不能通过socket传输RDB文件,如果当前没有子线程的运行,那就开始bgsave存盘操作【startBgsaveForReplication】。

      /* SYNC and PSYNC command implementation. */
      void syncCommand(client *c) {
      /* 如果是从机或者监视器的话就忽略这个命令 */
      if (c->flags & CLIENT_SLAVE) return;
      
      /* 当前节点确实是主机,但是从机和主机没连接上直接返回 */
      if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
        addReplyError(c, "-NOMASTERLINK Can't SYNC while not connected with my master");
        return;
      }
      
      //如果有客户端的命令没处理完,直接返回,
      //需要一个新的复制缓冲区来记录 BGSAVE 和当前数据集之间的差异,后续我们要复制到其他从机
      if (clientHasPendingReplies(c)) {
        addReplyError(c, "SYNC and PSYNC are invalid with pending output");
        return;
      }
      
      //如果当前命令是psync指令
      //strcasecmp函数会比较传入的两个字符串是否相同,相同返回 0
      if (!strcasecmp(c->argv[0]->ptr, "psync")) {
        //主机尝试部分重同步  【 TODO 重点分析】
        if (masterTryPartialResynchronization(c) == C_OK) {
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
            char *master_replid = c->argv[1]->ptr;
            if (master_replid[0] != '?') server.stat_sync_partial_err++;
        }
      } else {
        /* 如果从机使用 SYNC,我们正在处理复制协议的旧实现(如 redis-cli --slave)。标记客户端,以便我们不希望收到 REPLCONF ACK 反馈。 */
        c->flags |= CLIENT_PRE_PSYNC;
      }
      //==============下面是全量同步的逻辑 什么情况下走这里?主机收到的不是psync或者部分同步失败了===============//
      /* 全量同步次数++ */
      server.stat_sync_full++;
      
      /*将slave设置为等待BGSAVE启动的slave。*/
      c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
      if (server.repl_disable_tcp_nodelay)
        connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
      c->repldbfd = -1;
      c->flags |= CLIENT_SLAVE;
      //将从机尾插入队列
      listAddNodeTail(server.slaves, c);
      
      //条件成立:说明此时是第一次进行全量同步,需要创建命令积压缓冲区
      if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
        //改变复制id
        changeReplicationId();
        //清除(无效)辅助复制 ID。例如,在完全重新同步之后,当我们开始新的复制历史时,就会发生这种情况。
        clearReplicationId2();
        //创建主节点的命令积压缓冲区
        createReplicationBacklog();
      }
      
      /* CASE 1: 已经有保存 rdb 数据的 BGSAVE 进程在运行,并且该进程会把 rdb 数据保存为 rdb 文件 */
      if (server.rdb_child_pid != -1 &&
        server.rdb_child_type == RDB_CHILD_TYPE_DISK) {
        client *slave;
        listNode *ln;
        listIter li;
      
        listRewind(server.slaves, &li);
        while ((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
        }
        /* 判断RDB文件能否复用 ,条件成立说明可以复用。*/
        if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
            /* 服务器已经在为另一个从机注册差异。设置正确的状态,并复制缓冲区。 */
            copyClientOutputBuffer(c, slave); //复制rdb文件数据到当前从节点的命令积压缓冲区
            //主机发送给从机 +FULLRESYNC 命令 ,从节点轮训到从机连接可以写入数据的时候就会把RDB文件发送过去
            replicationSetupSlaveForFullResync(c, slave->psync_initial_offset);
            serverLog(LL_NOTICE, "Waiting for end of BGSAVE for SYNC");
        } else {
            /* 不能复用,我们要等待下一次bgsave操作 */
        }
      
        /* CASE 2: 已经有保存 rdb 数据的进程在运行,但是该进程生成的 rdb 数据会直接通过 socket 发送到对端 */
      } else if (server.rdb_child_pid != -1 &&
               server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
        /* 这个时候说明RDB文件不可用,我们等待下一次BGSAVE操作。 */
        /* CASE 3: 没有保存 rdb 数据的进程在运行 */
      } else {
        //条件成立,说明可以进行socket 传输 rdb
        if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&
            server.repl_diskless_sync_delay) {
            /* Diskless replication RDB child is created inside
             * replicationCron() since we want to delay its start a
             * few seconds to wait for more slaves to arrive. */
        } else { // 不能通过socket传输rdb
            /* 当前没有子进程 */
            if (!hasActiveChildProcess()) {
                //开始bgsave存盘  进行rdb的保存操作,根据从机来选择文件传输方式 TODO
                startBgsaveForReplication(c->slave_capa);
            } else {
            }
        }
      }
      return;
      }
      

      1.1 执行部分重同步

部分重同步的具体逻辑在masterTryPartialResynchronization函数,我们来分析一下:

首先检验replid,偏移量等参数,不满足条件的直接返回上层去进行全量同步。满足条件的就要进行部分同步:首先将从机的状态设置为SLAVE_STATE_ONLINE,将从机尾插到主节点保存的从节点列表,然后给从机发送+CONTINUE命令告诉他能进行部分同步了,最后将主节点命令积压缓冲区指定偏移量的数据添加到从节点的缓冲区,等待其可以写的时候发送过去。

int masterTryPartialResynchronization(client *c) {
    ...
    //校验replid是否相等 【master_replid 从机传递过来的】  ,【server.replid 主机保存的】
    if (strcasecmp(master_replid, server.replid) &&
        (strcasecmp(master_replid, server.replid2) ||
         //从节点带来的偏移量:psync_offset  大于 主节点复制缓冲区的起点
         psync_offset > server.second_replid_offset)) {
        /* 如果从机传过来的replid是一个 ? ,那就代表进行一次全同步。 */
        if (master_replid[0] != '?') {
            //校验replid是否相等 master_replid 从机传递过来的  ,server.replid 主机保存的
            if (strcasecmp(master_replid, server.replid) &&
                strcasecmp(master_replid, server.replid2)) {
            } else {

            }
        } else {

        }
        //说明此时需要全同步
        goto need_full_resync;
    }

    /* 继续进行判断,不满足条件就直接返回就去全同步 */
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) {

        if (psync_offset > server.master_repl_offset) {

        }
        goto need_full_resync;
    }

    // ===================走到这里,说明可以进行部分同步了。==================//

    c->flags |= CLIENT_SLAVE;
    //将从机的状态设置为SLAVE_STATE_ONLINE
    c->replstate = SLAVE_STATE_ONLINE;
    c->repl_ack_time = server.unixtime;
    c->repl_put_online_on_ack = 0;
    //将从机尾插添加到主节点保存的的从节点列表
    listAddNodeTail(server.slaves, c);

    if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
        buflen = snprintf(buf, sizeof(buf), "+CONTINUE %s\r\n", server.replid);
    } else {
        buflen = snprintf(buf, sizeof(buf), "+CONTINUE\r\n");
    }
    //给从机发送 +CONTINUE 命令 告诉他能进行部分复制了
    if (connWrite(c->conn, buf, buflen) != buflen) {
        freeClientAsync(c);
        return C_OK;
    }
    //将主节点命令积压缓冲区指定偏移量的数据添加到 从节点的缓冲区,等待其可以写的时候发送过去
    psync_len = addReplyReplicationBacklog(c, psync_offset);

    refreshGoodSlavesCount();

    moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
                          REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
                          NULL);

    return C_OK; 
    need_full_resync:
      return C_ERR;
}

1.2 全量同步

全量同步的函数是startBgsaveForReplication,我们来看一下他的逻辑:

  1. 首先判断从机可以接收哪种同步方式:通过socket传输rdb 【rdbSaveToSlavesSockets】或者将rdb保存到文件中【rdbSaveBackground】。
  2. 如果BGSAVE失败,从从机列表中删除等待完全重新同步的从机,告诉他们出错了,尽快关闭连接。
  3. 如果是通过RDB文件进行传输的,从主机保存的从机列表取出所有SLAVE_STATE_WAIT_BGSAVE_START状态的客户端,逐个发送+FULLRESYNC 命令,通知可以进行全量同步了。

    int startBgsaveForReplication(int mincapa) {
     ...
     rdbSaveInfo rsi, *rsiptr;
     rsiptr = rdbPopulateSaveInfo(&rsi);
     if (rsiptr) {
         if (socket_target)//通过socket传输rdb
             retval = rdbSaveToSlavesSockets(rsiptr);
         else//将rdb数据保存到rdb文件
             retval = rdbSaveBackground(server.rdb_filename, rsiptr);
     } else {
         retval = C_ERR;
     }
     //省略一些不必要的逻辑
    
     /* 如果是通过rdb文件进行传输的*/
     if (!socket_target) {
         listRewind(server.slaves, &li);
         while ((ln = listNext(&li))) {
             client *slave = ln->value;
             //从主机保存的从机列表取出所有SLAVE_STATE_WAIT_BGSAVE_START状态的客户端
             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                 //给从机发 + FULLRESYNC   全量复制
                 replicationSetupSlaveForFullResync(slave,
                                                    getPsyncInitialOffset());
             }
         }
     }
     if (retval == C_OK) replicationScriptCacheFlush();
     return retval;
    }
    

    2.rdb相关的流程

2.1 通过socket发送rdb的数据

通过socket阐传输RDB的逻辑在rdbSaveToSlavesSockets函数中,我们来看一下:

  1. 首先判断子线程在运行或者管道数据尚未清空,会返回失败
  2. 创建父子线程的两个管道
  3. 遍历主机保存的从机列表,如果从机SLAVE_STATE_WAIT_BGSAVE_START,通知从机进行全量复制
  4. 如果现在在子进程中
    1. 将父进程管道的 rdb_pipe_write 与文件指针rdb进行绑定
    2. 生成rdb数据并写入管道
    3. 将生成rdb数据期间 通过写时复制生成的内存碎片页传输到父进程
  5. 如果现在在父进程中,并且是创建子进程没出错,注册处理管道读的一端 server.rdb_pipe_read 的处理函数 rdbPipeReadHandler 读取数据

    int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
     listNode *ln;
     listIter li;
     pid_t childpid;
     int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
     //如果有子线程在运行,返回失败
     if (hasActiveChildProcess()) return C_ERR;
    
     /* 在子线程退出后,但是管道尚未清空,返回失败 */
     if (server.rdb_pipe_conns) return C_ERR;
    
     /* 在fork之前创建一个管道,创建失败返回 */
     if (pipe(pipefds) == -1) return C_ERR;
     server.rdb_pipe_read = pipefds[0]; /* read end */
     rdb_pipe_write = pipefds[1]; /* write end */
     anetNonBlock(NULL, server.rdb_pipe_read);
    
     /*  创建另一个管道,给父线程用来唤醒子线程去退出。 */
     if (pipe(pipefds) == -1) {
         close(rdb_pipe_write);
         close(server.rdb_pipe_read);
         return C_ERR;
     }
     safe_to_exit_pipe = pipefds[0]; /* read end */
     server.rdb_child_exit_pipe = pipefds[1]; /* write end */
     server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
     server.rdb_pipe_numconns = 0;
     server.rdb_pipe_numconns_writing = 0;
     listRewind(server.slaves,&li);
     //遍历master保存的slaver列表
     while((ln = listNext(&li))) {
         client *slave = ln->value;
         //如果slaver状态是 SLAVE_STATE_WAIT_BGSAVE_START
         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
             server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
             //通知slaver进行全量复制
             replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
         }
     }
    
     /* Create the child process. */
     openChildInfoPipe();
     //创建子进程成功
     if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
         /* Child */
         int retval, dummy;
         rio rdb;
         //将父进程管道的 rdb_pipe_write 与文件指针rdb进行绑定
         rioInitWithFd(&rdb,rdb_pipe_write);
    
         redisSetProcTitle("redis-rdb-to-slaves");
         redisSetCpuAffinity(server.bgsave_cpulist);
         //生成rdb数据并写入管道
         retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
         if (retval == C_OK && rioFlush(&rdb) == 0)
             retval = C_ERR;
    
         if (retval == C_OK) {
             //将生成rdb数据期间 通过写时复制生成的内存碎片页传输到父进程
             sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
         }
    
         rioFreeFd(&rdb);
         close(rdb_pipe_write);
         close(server.rdb_child_exit_pipe);
         dummy = read(safe_to_exit_pipe, pipefds, 1);
         UNUSED(dummy);
         exitFromChild((retval == C_OK) ? 0 : 1);
     }else {
         /* Parent */
         close(safe_to_exit_pipe);
         //创建子进程出错了
         if (childpid == -1) {
             serverLog(LL_WARNING,"Can't save in background: fork: %s",
                 strerror(errno));
             listRewind(server.slaves,&li);
             while((ln = listNext(&li))) {
                 client *slave = ln->value;
                 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
                     slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
                 }
             }
             close(rdb_pipe_write);
             close(server.rdb_pipe_read);
             zfree(server.rdb_pipe_conns);
             server.rdb_pipe_conns = NULL;
             server.rdb_pipe_numconns = 0;
             server.rdb_pipe_numconns_writing = 0;
             closeChildInfoPipe();
         } else {//此时说明创建子进程没出错
    
             serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
                 (long) childpid);
             server.rdb_save_time_start = time(NULL);
             server.rdb_child_pid = childpid;
             server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
             updateDictResizePolicy();
             close(rdb_pipe_write);
             // 注册处理管道读的一端 server.rdb_pipe_read 的处理函数 rdbPipeReadHandler  读取数据
             if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
                 serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
             }
         }
         return (childpid == -1) ? C_ERR : C_OK;
     }
     return C_OK; /* Unreached. */
    }
    

    2.1.1 父进程读取管道中的数据

    遍历等待rdb数据的从节点列表,将rdb数据读取到 server.rdb_pipe_buff 中,在从节点连接上设置写处理函数rdbPipeWriteHandler,等待其可写时发送rdb数据。

    void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
     UNUSED(mask);
     UNUSED(clientData);
     UNUSED(eventLoop);
     int i;
     if (!server.rdb_pipe_buff)
         server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
     serverAssert(server.rdb_pipe_numconns_writing == 0);
    
     while (1) {
         server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
         if (server.rdb_pipe_bufflen < 0) {
             if (errno == EAGAIN || errno == EWOULDBLOCK)
                 return;
             for (i = 0; i < server.rdb_pipe_numconns; i++) {
                 connection *conn = server.rdb_pipe_conns[i];
                 if (!conn)
                     continue;
                 client *slave = connGetPrivateData(conn);
                 freeClient(slave);
                 server.rdb_pipe_conns[i] = NULL;
             }
             killRDBChild();
             return;
         }
    
         if (server.rdb_pipe_bufflen == 0) {
             /* EOF - write end was closed. */
             int stillUp = 0;
             aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
             for (i = 0; i < server.rdb_pipe_numconns; i++) {
                 connection *conn = server.rdb_pipe_conns[i];
                 if (!conn)
                     continue;
                 stillUp++;
             }
             close(server.rdb_child_exit_pipe);
             server.rdb_child_exit_pipe = -1;
             return;
         }
    
         int stillAlive = 0;
         //遍历等待rdb数据的从节点列表
         for (i = 0; i < server.rdb_pipe_numconns; i++) {
             int nwritten;
             connection *conn = server.rdb_pipe_conns[i];
             if (!conn)
                 continue;
    
             client *slave = connGetPrivateData(conn);
             //将 rdb 数据读取到 server.rdb_pipe_buff 中
             if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
                 if (connGetState(conn) != CONN_STATE_CONNECTED) {
                     freeClient(slave);
                     server.rdb_pipe_conns[i] = NULL;
                     continue;
                 }
                 slave->repldboff = 0;
             } else {
                 slave->repldboff = nwritten;
                 atomicIncr(server.stat_net_output_bytes, nwritten);
             }
             if (nwritten != server.rdb_pipe_bufflen) {
                 server.rdb_pipe_numconns_writing++;
                 //在从节点连接上设置写处理函数rdbPipeWriteHandler,等待其可写时发送 rdb 数据
                 connSetWriteHandler(conn, rdbPipeWriteHandler);
             }
             stillAlive++;
         }
    
         if (stillAlive == 0) {       
             killRDBChild();
         }
         if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
             aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
             break;
         }
     }
    }
    

    2.2 发送rdb文件

    将 server.rdb_pipe_buff 中读取到的数据发到从节点上。

    void rdbPipeWriteHandler(struct connection *conn) {
     serverAssert(server.rdb_pipe_bufflen > 0);
     client *slave = connGetPrivateData(conn);
     int nwritten;
     //将 server.rdb_pipe_buff 中读取到的数据发到从节点上
     if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
                               server.rdb_pipe_bufflen - slave->repldboff)) == -1) {
         if (connGetState(conn) == CONN_STATE_CONNECTED)
             return; /* equivalent to EAGAIN */
         serverLog(LL_WARNING, "Write error sending DB to replica: %s",
                   connGetLastError(conn));
         freeClient(slave);
         return;
     } else {
         slave->repldboff += nwritten;
         atomicIncr(server.stat_net_output_bytes, nwritten);
         if (slave->repldboff < server.rdb_pipe_bufflen)
             return; /* more data to write.. */
     }
     rdbPipeWriteHandlerConnRemoved(conn);
    }
    

至此,主从复制集群的master流程就分析完了,接下来,我们来分析一下slaver的源码。


五,Slaver源码分析


Slaver节点的主要就是两部分:主从连接的建立和主从数据同步。

Redis[十九]主从集群 - 图3

1.主从连接的建立

Redis节点接收到slaveof 或者 replicaof 命令的时候就会开始建立主从关系,我们在server.c找一下该命令的处理函数是replication.c文件的replicaofCommand,我们先来看下这个函数的逻辑:

  1. 检查当前节点的状态,如果当前是集群模式的话,不能使用slaveof,replicaof 命令。
  2. 先判断是不是要断开主从连接,比如 slaveof no one ,最终会调用 replicaoftionUnsetMaster函数处理。
  3. 如果不是解除主从连接的命令,调用replicationSetMaster函数处理。

接下来看replicationSetMaster函数的逻辑主要就是设置主节点的ip,端口等等各种变量,然后如果当前节点有从节点,就断开当前节点的从节点连接,因为当前节点本身变成了从节点,他的数据可能要发生很大变化,所以当前节点要和他的从节点重新同步数据。最后把server.repl.state 变量(同步状态)设置为 CONNECT

replication.c文件里面有一个定时任务:replicationCron函数,当server.repl.state 变量(同步状态)为 CONNECT时,就会触发定时任务相关的逻辑:

  1. 先检查主从同步是否超时,如果超时了就走超时处理逻辑。
  2. 当server.repl.state 变量(同步状态)为 CONNECT时,调用connectWithMaster函数建立主从关系。
  3. 如果当前节点已经是在和主节点进行数据同步了,那就通过replicationSendAck函数给主节点发送ACK。
  4. 如果当前节点本身还有从节点,那么这个时候需要定期给当前节点的从节点也发送PING命令。

connectWithMaster函数其实没啥玩意儿,就是调用了connCreateSocket函数连接主节点,绑定了连接的处理函数syncWithMaster,设置server.repl.state的值等于CONNECTION

syncWithMaster函数主要就是处理主从之间的关系,他会确定主从节点之间数据通过何种方式进行同步【全量同步,部分同步】和数据的传输方式【socket,rdb文件】,确定下来以后,设置server.repl.state的值等于TRANSFER

  1. 首先如果server.repl.state的值等于NONE,说明用户在客户端执行了slaveof no one 命令,这个时候我们要关闭当前连接了。
  2. 检查socket连接的状态是否有效
  3. 如果节点状态是CONNECTING,那就给主节点发送ping命令,然后设置当前节点状态为RECEIVE_PING_REPLY
  4. 如果当前节点状态是RECEIVE_PING_REPLY,执行receiveSynchronousResponse函数接收主节点的响应,并设置节点状态为SEND_HANDSHAKE
  5. 如果当前节点状态为SEND_HANDSHAKE,设置从机端口,为了主机执行info命令的时候,能够知道从机的端口;在设置ip,sendCommand(conn, "REPLCONF","capa", "eof", "capa", "psync2", NULL)这一行代码其实就是当前节点将自己能够支持的数据同步能力告诉主节点,然后敲定数据同步方式。eof 代表支持直接通过 socket 接收同步数据,psync2 代表支持部分复制。
  6. 接下来就是接收主节点响应,如果节点状态是RECEIVE_CAPA_REPLY,把节点状态该设置为SEND_PSYNC
  7. 当节点状态为SEND_PSYNC的时候,通过slaveTryPartialResynchronization函数确定数据同步方式。
  8. 如果主节点返回不支持psync,那就需要重新发送sync进行全量同步。
  9. 如果最后敲定的数据同步方式为全量复制并且不支持socket,那么当前节点需要创建一个缓存文件用来接收主节点发送过来的rdb文件。
  10. 最后调用connSetReadHandler函数在连接上注册处理主节点发送过来的数据的函数为readSyncBulkPayload

slaveTryPartialResynchronization函数上面已经说了,用来确定主节点与当前节点进行数据同步的方式,主要流程如下:

从节点发送psync给主节点,同步处理主节点的响应,主节点可能会返回4中结果中的一种:

  1. +FULLRESYNC: 主节点判断必须进行全量复制
  2. +CONTINUE: 主节点判断可以进行部分复制
  3. -NOMASTERLINK、-LOADING:主节点忙于其他事务,建议稍后重试
  4. -ERR: 主节点不支持 PSYNC 命令或者发生了一些异常

    2.接收处理rdb文件

接下来我们来看readSyncBulkPayload函数的逻辑,这个函数主要就是当前节点接收到主节点传过来的数据进行处理的过程。

  1. 从与主节点的连接中读取数据,当 rdb 数据的传输方式为传输 rdb 文件时,将从连接上读取到的数据写入到之前创建的临时文件中
  2. 如果当前节点开启了 AOF,调用 stopAppendOnly函数将其关闭
  3. 调用 emptyDb函数清空当前节点的 db 数据
  4. 如果主节点是直接通过 socket 传输 rdb 数据,则从节点调用 rdbLoadRio函数从 socket 中读取 rdb 数据并加载进 db
  5. 如果主节点传输的是 rdb 文件,则将临时文件重命名为 rdb 文件名称,并调用 rdbLoad函数完成加载 rdb 文件到内存
  6. 如果当前节点配置开启了 AOF,则调用 restartAOFAfterSYNC函数重新启动 AOF



rdb.c文件的rdbLoadRio函数就是解析rdb数据并将其中 key -value 加载进节点的db数据库中。这块后面如果有精力会写一篇文章专门研究rdb文件的格式&解析。