在分析事务的源码之前,还是先来聊聊事务的使用。

1.事务的使用

1.1 为什么要用事务

Redis 的单个命令是原子性的(比如 get set mget mset),如果涉及到多个命令的时候,需要把多个命令作为一个不可分割的处理序列,就需要用到事务。

用 setnx 实现分布式锁,我们先 set,然后设置对 key 设置 expire,防止 del 发生异常的时候锁不会被释放,业务处理完了以后再 del,这三个动作我们就希望它们作为一组命令执行。

Redis 的事务有两个特点:

  1. 按进入队列的顺序执行。

  2. 不会受到其他客户端的请求的影响。

Redis 的事务涉及到四个命令:multi(开启事务),exec(执行事务),discard(取消事务),watch(监视)

1.2 事务的用法

案例场景:tom 和 mic 各有 1000 元,tom 需要向 mic 转账 100 元。tom 的账户余额减少 100 元,mic 的账户余额增加 100 元。

  1. set tom 1000
  2. set mic 1000
  3. multi
  4. decrby tom 100
  5. incrby mic 100
  6. exec

通过 multi 的命令开启事务。事务不能嵌套,多个 multi 命令效果一样。

multi 执行后,客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 exec 命令被调用时, 所有队列中的命令才会被执行。

通过 exec 的命令执行事务。如果没有执行 exec,所有的命令都不会被执行。

如果中途不想执行事务了,可以调用 discard 可以清空事务队列,放弃执行。

1.3 watch命令

在 Redis 中还提供了一个 watch 命令。

它可以为 Redis 事务提供 CAS 乐观锁行为(Check and Set / Compare and Swap),也就是多个线程更新变量的时候,会跟原值做比较,只有它没有被其他线程修改的情况下,才更新成新的值。

我们可以用 watch 监视一个或者多个 key,如果开启事务之后,至少有一个被监视key 键在 exec 执行之前被修改了, 那么整个事务都会被取消(key 提前过期除外)。可以用 unwatch 取消。

1.4 事务发生错误

1.4.1 exec之前发生错误

比如:入队的命令存在语法错误,包括参数数量,参数名等等(编译器错误)。

在这种情况下事务会被拒绝执行,也就是队列中所有的命令都不会得到执行。

1.4.2 exec之后发生错误

比如,类型错误,比如对 String 使用了 Hash 的命令,这是一种运行时错误。

在这种发生了运行时异常的情况下,只有错误的命令没有被执行,但是其他命令没有受到影响。

这个显然不符合我们对原子性的定义,也就是我们没办法用 Redis 的这种事务机制来实现原子性,保证数据的一致。

为什么在一个事物中存在错误,Redis不回滚?由于不需要回滚,这使的Redis内部更加简单,而且运行速度更快。


2.事务源码分析

2.1 开启事务

事务相关的源码都在multi.c文件中。MULTI命令标志着事务的开始,我们来看一下大概流程。

  1. 判断是否是嵌套事务,如果是嵌套事务,直接返回
  2. 将客户端从非事务状态切换到事务状态,在客户端状态的flags属性中,打开标识REDIS_MULTI(1<<3)

    1. void multiCommand(client *c) {
    2. //禁止嵌套事务
    3. if (c->flags & CLIENT_MULTI) {
    4. addReplyError(c, "MULTI calls can not be nested");
    5. return;
    6. }
    7. //打开事务flag
    8. c->flags |= CLIENT_MULTI;
    9. addReply(c, shared.ok);
    10. }

我们来看一下redis的客户端定义,在server.h文件的结构体client中,有一个属性mstate,这个属性表示事务的状态。

//事务状态
multiState mstate;      /* MULTI/EXEC state */

我们再来看一下事务状态的结构:这里面主要包含存储命令的事务队列和对入队命令的一个计数。

typedef struct multiState {
    // 事务队列,FIFO 顺序
    multiCmd *commands;     /* Array of MULTI commands */
    // 已入队命令计数
    int count;              /* Total number of MULTI commands */
    int cmd_flags;          /* The accumulated command flags OR-ed together.
                               So if at least a command has a given flag, it
                               will be set in this field. */
    int cmd_inv_flags;      /* Same as cmd_flags, OR-ing the ~flags. so that it
                               is possible to know if all the commands have a
                               certain flag. */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

事务队列以先进先出(FIFO)方式进行,是一个multiCmd类型的数组,即事务命令,该类型定义为:

typedef struct multiCmd {
    // 参数
    robj **argv;
    // 参数数量
    int argc;
    // 命令指针
    struct redisCommand *cmd;
} multiCmd;

当客户端处于事务状态下,当执行除了EXEC、DISCARD、WATCH、MULTI(立刻执行)这四个命令以外的其他命令,服务器将不会立即执行该命令,而且放入事务队列中,向客户端返回QUEUED回复。
image.png

2.2 添加命令到事务队列

开启事务之后,就可以添加命令到redis的事务队列里面了,主要的流程就是:

  1. 判断是不是有意义的添加,不是的话直接返回
  2. 为新的命令分配内存空间,并让指针指向新元素
  3. 设置事务的命令,命令参数的数量,以及命令的参数等等
  4. 把事务命令的计数器+1

image.png

void queueMultiCommand(client *c) {
    multiCmd *mc;
    int j;
    /* 如果事务已经中止,浪费内存是没有意义的,这在客户端通过管道发送这些消息,
     * 或者懒得阅读之前的响应而没有注意到multi已经中止的情况下很有用。*/
    if (c->flags & CLIENT_DIRTY_EXEC)
        return;
    // 为新数组元素分配空间
    c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd) * (c->mstate.count + 1));
    // 指向新元素
    mc = c->mstate.commands + c->mstate.count;
    // 设置事务的命令、命令参数数量,以及命令的参数
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj *) * c->argc);
    memcpy(mc->argv, c->argv, sizeof(robj *) * c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    // 事务命令数量计数器增一
    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
    c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}

2.3 执行事务

执行EXEC命令,服务器遍历这个客户端的事务队列,执行所有命令,最后将执行结果全部返回给客户端。

  1. 判断如果客户端没有执行事务,直接返回
  2. 判断事务是不是应该被终止(监视的key被修改,命令在入队的时候发生错误),如果是的话,走取消事务的逻辑
  3. 如果事务应该正常提交,取消客户端对所有key的监视
  4. 备份事务队列的命令和参数,为事务传播赋能
  5. 遍历执行事务队列中的命令
    1. 将事务队列中的命令、命令参数等设置给客户端
    2. 更新事务队列中的命令和参数,确保附属节点和 AOF 的数据一致性
  6. 还原命令、命令参数&清理事务状态
  7. 事务执行的命令要往其他节点传播,确保服务器和 AOF 文件以及附属节点的数据一致性
  8. 修改事务提交状态

image.png

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int was_master = server.masterhost == NULL;
    // 客户端没有执行事务
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    /*
     * 检查是否需要阻止事务执行,因为:
     *
     * 1) Some WATCHed key was touched.
     *    有被监视的键已经被修改了
     *
     * 2) There was a previous error while queueing commands.
     *    命令在入队时发生错误
     *    (注意这个行为是 2.6.4 以后才修改的,之前是静默处理入队出错命令)
     * 第一种情况返回多个批量回复的空对象
     * 而第二种情况则返回一个 EXECABORT 错误
     */
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                   shared.nullarray[c->resp]);
        // 取消事务
        discardTransaction(c);
        goto handle_monitor;
    }

    uint64_t old_flags = c->flags;

    /* we do not want to allow blocking commands inside multi */
    c->flags |= CLIENT_DENY_BLOCKING;

    // 已经可以保证安全性了,取消客户端对所有键的监视
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */

    server.in_exec = 1;
    /*因为事务中的命令在执行时可能会修改命令和命令的参数,所以为了正确地传播命令,需要现备份这些命令和参数。*/
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    // 执行事务中的命令
    for (j = 0; j < c->mstate.count; j++) {
        /*因为 Redis 的命令必须在客户端的上下文中执行,
         * 所以要将事务队列中的命令、命令参数等设置给客户端*/
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* ACL permissions are also checked at the time of execution in case
         * they were changed after the commands were ququed. */
        int acl_errpos;
        int acl_retval = ACLCheckCommandPerm(c,&acl_errpos);
        if (acl_retval == ACL_OK && c->cmd->proc == publishCommand)
            acl_retval = ACLCheckPubsubPerm(c,1,1,0,&acl_errpos);
        if (acl_retval != ACL_OK) {
            char *reason;
            switch (acl_retval) {
            case ACL_DENIED_CMD:
                reason = "no permission to execute the command or subcommand";
                break;
            case ACL_DENIED_KEY:
                reason = "no permission to touch the specified keys";
                break;
            case ACL_DENIED_CHANNEL:
                reason = "no permission to publish to the specified channel";
                break;
            default:
                reason = "no permission";
                break;
            }
            addACLLogEntry(c,acl_retval,acl_errpos,NULL);
            addReplyErrorFormat(c,
                "-NOPERM ACLs rules changed between the moment the "
                "transaction was accumulated and the EXEC call. "
                "This command is no longer allowed for the "
                "following reason: %s", reason);
        } else {
            call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
            serverAssert((c->flags & CLIENT_BLOCKED) == 0);
        }

        /* 因为执行后命令、命令参数可能会被改变
         * 所以这里需要更新事务队列中的命令和参数
         * 确保附属节点和 AOF 的数据一致性*/
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }

    // restore old DENY_BLOCKING value
    if (!(old_flags & CLIENT_DENY_BLOCKING))
        c->flags &= ~CLIENT_DENY_BLOCKING;
    // 还原命令、命令参数
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 清理事务状态
    discardTransaction(c);

    /* 事务执行的命令要往其他节点传播,确保服务器和 AOF 文件以及附属节点的数据一致性。 */
    if (server.propagate_in_transaction) {
        int is_master = server.masterhost == NULL;
        // 将服务器设为脏,确保 EXEC 命令也会被传播
        server.dirty++;
        //这里会走下面方法的else逻辑,传播 exec 命令
        beforePropagateMultiOrExec(0);
        /* 如果在MULTI/EXEC块中,该实例突然从主实例切换到从实例(使用SLAVEOF命令),则初始MULTI会传播到复制积压中,
         * 但其余的不会。我们需要确保至少在最终执行时终止积压。 */
        if (server.repl_backlog && was_master && !is_master) {
            char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
            feedReplicationBacklog(execcmd,strlen(execcmd));
        }
    }
    //修改事务提交状态
    server.in_exec = 0;

handle_monitor:
    ...
}

2.4 监视与取消监视命令

Watch必须在事务开始之前执行,乐观锁机制,可以实现对任意数量的数据库键的监视,在EXEC命令执行时,会检查被监视的键是否至少有一个已经被修改过了,如果是则拒绝执行事务,并返回空回复nil。

void watchCommand(client *c) {
    int j;
    // 不能在事务开始后执行
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c, "WATCH inside MULTI is not allowed");
        return;
    }
    // 监视输入的任意个键
    for (j = 1; j < c->argc; j++)
        watchForKey(c, c->argv[j]);
    addReply(c, shared.ok);
}
void unwatchCommand(client *c) {
    // 取消客户端对所有键的监视
    unwatchAllKeys(c);
    c->flags &= (~CLIENT_DIRTY_CAS);
    // 重置状态
    addReply(c, shared.ok);
}

①被监视的key

一个被监视的键的结构体定义包含被监视的键和该键所在的数据库。

typedef struct watchedKey {
    // 被监视的键
    robj *key;
    // 键所在的数据库
    redisDb *db;
} watchedKey;

每个数据库redisDb都保存着一个watched_keys字典,字典的键是被WATCH命令所监视的某个键,值是一个链表,该链表记录着所有监视该键的客户端。

typedef struct redisDb {
    dict *dict;                 /* 存储数据的字典 */
    dict *expires;              /* 存储有过期时间的 key */
    dict *blocking_keys;        /* 客户端等待数据的密钥(BLPOP)*/
    dict *ready_keys;           /* 收到推送的被阻止的密钥 */
    dict *watched_keys;         /* cas的方式监视事务的键 */
    int id;                     /* 数据库id */
    long long avg_ttl;          /* 用于统计的平均ttl */
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later;         /* 一个接一个,逐渐整理的key列表。 */
} redisDb;

不仅仅是server端,client端也存放着被该客户端监视的键的链表集合。

typedef struct client {
    // 被监视的键
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
} //如果是老版本的话,这个结构体叫redisClient

②客户端监视key

其实主要逻辑就是:

  1. 检查 key 是否已经保存在 watched_keys 链表中,如果是的话,直接返回
  2. 检查 key 是否存在于数据库的 watched_keys 字典中
    1. 如果不存在的话,添加它
  3. 将客户端添加到链表的末尾
  4. 将新 watchedKey 结构添加到客户端 watched_keys 链表的表尾

image.png

/* 让客户端监视给定的键 key */
void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* 检查 key 是否已经保存在 watched_keys 链表中,如果是的话,直接返回。 */
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* 检查 key 是否存在于数据库的 watched_keys 字典中 */
    clients = dictFetchValue(c->db->watched_keys,key);
    // 如果不存在的话,添加它
    if (!clients) {
        // 值为链表
        clients = listCreate();
        // 关联键值对到字典
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 将客户端添加到链表的末尾
    listAddNodeTail(clients,c);
    /* 将新 watchedKey 结构添加到客户端 watched_keys 链表的表尾 */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

③取消对所有key的监视

我们来看一下主要流程:

  1. 如果没有key被监视,直接返回
  2. 遍历链表中所有被客户端监视的key
    1. 从数据库的 watched_keys 字典的 key 键中删除链表里包含的客户端节点
    2. 取出客户端链表
    3. 删除链表中的客户端节点
    4. 判断链表已经被清空,就删除这个键
    5. 从链表中移除 key 节点
  3. 减少引用计数,并释放空间

image.png

void unwatchAllKeys(client *c) {
    listIter li;
    listNode *ln;
    // 没有键被监视,直接返回
    if (listLength(c->watched_keys) == 0) return;
    // 遍历链表中所有被客户端监视的键
    listRewind(c->watched_keys, &li);
    while ((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;
        /* 从数据库的 watched_keys 字典的 key 键中删除链表里包含的客户端节点 */
        wk = listNodeValue(ln);
        // 取出客户端链表
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        serverAssertWithInfo(c, NULL, clients != NULL);
        // 删除链表中的客户端节点
        listDelNode(clients, listSearchKey(clients, c));
        /* 如果链表已经被清空,那么删除这个键*/
        if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key);
        /* 从链表中移除 key 节点 */
        listDelNode(c->watched_keys, ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

2.5 监视机制的触发

所有对客户端执行修改的命令执行之后都会调用相关函数对watched_keys字典进行检查,查看被该客户端监视的键是否被上述命令修改过,如果是,则监视被修改键的客户端的REDIS_DIRTY_CAS标识打开,标识该客户端的事务安全性已经被破坏。我们以string的set命令为例看一下流程:前面我们分析过添加一个键值对的时候,底层会调用到db.c文件的
genericSetKey函数。在这个函数中最后一步,就是执行是否需要唤行被监视的key的。

image.png

void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
    //查找 redis 数据库中是否存在指定的 key,因为 redis 的数据库结构可以看成是 HashMap,故其查找方式与 Java 中 HashMap 实现的方式相同。
    if (lookupKeyWrite(db,key) == NULL) {
        //没有就新增
        dbAdd(db,key,val);
    } else {
        //有就覆盖
        dbOverwrite(db,key,val);
    }
    //将value的引用计数+1
    incrRefCount(val);
    //是否需要移除过期时间
    if (!keepttl) removeExpire(db,key);
    if (signal) signalModifiedKey(c,db,key);
}

我们看一下这个方法的逻辑:

void signalModifiedKey(client *c, redisDb *db, robj *key) {
    touchWatchedKey(db,key);
    trackingInvalidateKey(c,key);
}

void signalFlushedDb(int dbid, int async) {
    touchWatchedKeysOnFlush(dbid);
    trackingInvalidateKeysOnFlush(async);
}

我们继续往下追踪,就回到了multi.c文件,我们来看这两个方法的具体流程:

①对key进行增删改

void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
    // 字典为空,没有任何键被监视
    if (dictSize(db->watched_keys) == 0) return;
    // 获取所有监视这个键的客户端
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /*遍历所有客户端,打开他们的 REDIS_DIRTY_CAS 标识*/
    listRewind(clients, &li);
    while ((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        c->flags |= CLIENT_DIRTY_CAS;
    }
}

②Flushdb

void touchWatchedKeysOnFlush(int dbid) {
    listIter li1, li2;
    listNode *ln;
    //遍历所有客户端,然后遍历客户端监视的键,再让相应的客户端变为 DIRTY
    /* 遍历所有客户端*/
    listRewind(server.clients, &li1);
    while ((ln = listNext(&li1))) {
        client *c = listNodeValue(ln);
        // 遍历客户端监视的键
        listRewind(c->watched_keys, &li2);
        while ((ln = listNext(&li2))) {
            // 取出监视的键和键的数据库
            watchedKey *wk = listNodeValue(ln);
            // 如果数据库号码相同,或者执行的命令为 FLUSHALL,那么将客户端设置为 REDIS_DIRTY_CAS
            if (dbid == -1 || wk->db->id == dbid) {
                if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
                    c->flags |= CLIENT_DIRTY_CAS;
            }
        }
    }
}

当服务接收到一个客户端发来的EXEC命令后,首先会检测这个客户端是否打开了REDIS_DIRTY_CAS标识来决定是否执行事务。如果打开了说明至少有一个键被修改过了,事务不再安全,会拒绝执行事务,反之则执行事务。 至此,整个事务的源码流程我们就分析完了,接下来我们对Lua脚本进行一个简单的介绍。


3.Lua

Lua/ˈluə/是一种轻量级脚本语言,它是用 C 语言编写的,跟数据的存储过程有点类似。 使用 Lua 脚本来执行 Redis 命令的好处:

  1. 一次发送多个命令,减少网络开销。
  2. Redis 会将整个脚本作为一个整体执行,不会被其他请求打断,保持原子性。
  3. 对于复杂的组合命令,我们可以放在文件中,可以实现程序之间的命令集复用。

3.1 案例 :对IP进行限流

需求:在 X 秒内只能访问 Y 次。

设计思路:

  1. 用 key 记录 IP,用 value 记录访问次数。

  2. 拿到 IP 以后,对 IP+1。如果是第一次访问,对 key 设置过期时间(参数 1)。否则判断次数,超过限定的次数(参数 2),返回 0。如果没有超过次数则返回 1。超过时间,key 过期之后,可以再次访问。

  3. KEY[1]是 IP, ARGV[1]是过期时间 X,ARGV[2]是限制访问的次数 Y。

-- ip_limit.lua
-- IP 限流,对某个 IP 频率进行限制 ,6 秒钟访问 10 次
local num=redis.call('incr',KEYS[1])
if tonumber(num)==1 then
    redis.call('expire',KEYS[1],ARGV[1])
    return 1
elseif tonumber(num)>tonumber(ARGV[2]) then
    return 0
else
    return 1
end

6 秒钟内限制访问 10 次,调用测试(连续调用 10 次):

./redis-cli --eval "ip_limit.lua" app:ip:limit:192.168.8.111 , 6 10

app:ip:limit:192.168.8.111 是 key 值 ,后面是参数值,中间要加上一个空格 和一个逗号,再加上一个 空格 。

即:./redis-cli –eval [lua 脚本] [key…]空格,空格[args…]

多个参数之间用一个空格分割 。

3.2 缓存Lua脚本

①为什么要缓存

在脚本比较长的情况下,如果每次调用脚本都需要把整个脚本传给 Redis 服务端,会产生比较大的网络开销。为了解决这个问题,Redis 提供了 EVALSHA 命令,允许开发者通过脚本内容的 SHA1 摘要来执行脚本。

②如何缓存

Redis 在执行 script load 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中,执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了则执行脚本,否则会返回错误:”NOSCRIPT No matching script. Please use EVAL.”

127.0.0.1:6379> script load "return 'Hello World'"
"470877a599ac74fbfda41caa908de682c5fc7d4b"
127.0.0.1:6379> evalsha "470877a599ac74fbfda41caa908de682c5fc7d4b" 0
"Hello World"

③案例

Redis 有 incrby 这样的自增命令,但是没有自乘,比如乘以 3,乘以 5。我们可以写一个自乘的运算,让它乘以后面的参数:

local curVal = redis.call("get", KEYS[1])
if curVal == false then
    curVal = 0
else
    curVal = tonumber(curVal)
end
    curVal = curVal * tonumber(ARGV[1])
redis.call("set", KEYS[1], curVal)
return curVal

把这个脚本变成单行,语句之间使用分号隔开

local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal = tonumber(curVal) end; curVal
= curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal

script load ‘命令’

127.0.0.1:6379> script load 'local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal =
tonumber(curVal) end; curVal = curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal'
"be4f93d8a5379e5e5b768a74e77c8a4eb0434441"

执行

127.0.0.1:6379> set num 2
OK
127.0.0.1:6379> evalsha be4f93d8a5379e5e5b768a74e77c8a4eb0434441 1 num 6
(integer) 12

3.3 脚本超时

Redis 的指令执行本身是单线程的,这个线程还要执行客户端的 Lua 脚本,如果 Lua脚本执行超时或者陷入了死循环,是不是没有办法为客户端提供服务了呢?

为了防止某个脚本执行时间过长导致 Redis 无法提供服务,Redis 提供了lua-time-limit 参数限制脚本的最长运行时间,默认为 5 秒钟。

lua-time-limit 5000(redis.conf 配置文件中)

当脚本运行时间超过这一限制后,Redis 将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。

Redis 提供了一个 script kill 的命令来中止脚本的执行。新开一个客户端:

script kill

如果当前执行的 Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么通过script kill 命令是不能终止脚本运行的。

因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行。

遇到这种情况,只能通过 shutdown nosave 命令来强行终止 redis。

shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。

Redis 不是只有一个线程吗?它已经卡死了,怎么接受 spript kill 指令的?

我们在server.c文件中找到 script 命令对应的执行函数:

{"script",scriptCommand,-2, "no-script @scripting",0, NULL,0, 0,  0, 0, 0, 0}

在这个函数的实现中,判断如果我们输入的是 script kill 命令,会把server文件中的全局变量lua_kill设置为1。

else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
        if (server.lua_caller == NULL) {
            addReplyError(c,"-NOTBUSY No scripts in execution right now.");
        } else if (server.lua_caller->flags & CLIENT_MASTER) {
            addReplyError(c,"-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed.");
        } else if (server.lua_write_dirty) {
            addReplyError(c,"-UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.");
        } else {
            server.lua_kill = 1;
            addReply(c,shared.ok);
        }
    }

注意,在我们通过eval函数执行Lua脚本的时候,scripting.c文件中的evalGenericCommand函数就是eval对应的执行函数,在这个函数里面有一段逻辑:他其实是为Lua脚本的执行设置一个钩子函数。

if (server.lua_time_limit > 0 && ldb.active == 0) {
    lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
    delhook = 1;
}

我们继续看一下钩子函数的实现:这里面有一段逻辑判断如果lua_kill=1,就回去异步干掉当前执行的lua脚本。

void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
    long long elapsed = mstime() - server.lua_time_start;
    UNUSED(ar);
    UNUSED(lua);
    if (elapsed >= server.lua_time_limit && server.lua_timedout == 0) {

        server.lua_timedout = 1;
        blockingOperationStarts();

        protectClient(server.lua_caller);
    }
    if (server.lua_timedout) processEventsWhileBlocked();
    if (server.lua_kill) {
        serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
        lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
        lua_error(lua);
    }
}

而Redis的单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程,并不是说Redis内部只有一个线程。如果有一些特殊的需求,可以用 Lua 来实现,但是要注意那些耗时的操作。