事实上,依个人浅薄意见,什么中间件就该做什么事,Redis的设计初衷是为了缓存,所以如果真的需要用到消息的订阅发布,我们首选的应该是消息队列,stream我们仅仅作为了解即可。

1.数据结构

5.0推出的数据类型。支持多播的可持久化的消息队列,用于实现发布订阅功能,借鉴了kafka的设计。它采用的存储结构OBJ_ENCODING_STREAM与其他的存储结构差距很大。

OBJ_ENCODING_STREAM底层使用压缩前缀树(radix tree) 来存储字符串元素,从源文件 rax.h 的注释可以知道,radix tree 其实是字典树(Trie tree)的压缩优化版本,它会把多个只有一个子节点的连续节点保存的字符压缩到一个节点中。

Trie Tree 的原理 将每个字符串元素 key 按照单个字符拆分,然后对应到每个分支上。这样从根节点遍历到某个分支的叶节点,所有经过的节点保存的字符拼接出的字符串即为这条分支对应的元素 key。更多的介绍可以看我的数据结构知识库学习。

Stream 添加数据的命令格式如下,其中 key 为 Stream 的名称,ID 为消息的唯一标志,不可重复,field string 就是键值对。

  1. XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value]

Redis整体结构图.jpg
我们来看一下redis中对于stream数据结构的定义。

  1. typedef struct streamID {
  2. uint64_t ms; /* Unix time in milliseconds. */
  3. uint64_t seq; /* Sequence number. */
  4. } streamID;
  5. typedef struct stream {
  6. rax *rax; /* 指向 radix tree 的指针 */
  7. uint64_t length; /* 保存的元素的总数,以消息 ID 为统计对象 */
  8. streamID last_id; /* Stream 中的最后一个消息 ID */
  9. rax *cgroups; /* 保存监听该 Stream 的消费端信息 */
  10. } stream;

Stram 底层采用压缩前缀树 radix tree 来存储数据,其最外层的数据结构为rax,我们来看一下代码:

  1. typedef struct rax {
  2. raxNode *head; /*radix tree 的头节点*/
  3. uint64_t numele;/*radix tree 所存储的元素总数,每插入一个 ID,计数加 1*/
  4. uint64_t numnodes;/*radix tree 的节点总数*/
  5. } rax;

压缩前缀树 radix tree 的每个节点以raxNode表示,我们来看一下代码:

  1. typedef struct raxNode {
  2. uint32_t iskey:1; /* 标志当前节点是否包含了一个完整的 key,key 也就是消息 ID */
  3. uint32_t isnull:1; /* 是否有存储值,此处的值是指 XADD 命令中的 [field value] 对 */
  4. uint32_t iscompr:1; /* 是否做了前缀压缩,如果有压缩则当前节点只有一个后继节点,没有压缩则每个字符都有自己的后继节点 */
  5. uint32_t size:29; /* 如果做了前缀压缩,则表示该节点存储的可用于组成完整 key 的字符数,否则表示该节点的子节点个数 */
  6. /*字符数组,存储了当前节点 [field value] 对及其子节点的信息,在实际对这个字段进行操作时,会将其作为 listpack 来处理*/
  7. unsigned char data[];
  8. } raxNode;

2.压缩树的操作

2.1 插入数据

压缩树的插入函数axInsert()调用了raxGenericInsert()实现了插入操作。rax.c文件里面的raxGenericInsert()函数实现非常复杂,我们来对着源码进行一个梳理:

  1. 首先调用raxLowWalk()函数根据传入的字符串(也就是消息 ID) 去压缩树中查找这个新增的消息应该插入的位置
  2. 如果找到压缩树里面已经有这个消息 ID 的字符串存在了,并且它将要插入的位置上的节点没有压缩过:
    1. 如果这个字符串还不是以完整的 key(iskey =0) 存储的,则重新为当前节点申请内存保存新的 data 域,然后更新当前节点父节点的指针
    2. 如果这个字符串在压缩树里是以完整的 key(iskey =1) 存储的,则只需要更新当前节点的 data 域
  3. 如果这个消息id将要插入的位置上的节点压缩过,那么这个节点就需要分裂,以便将新的消息插入进来。源码的注释中提到了 5 种场景

image.png

  1. int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
  2. return raxGenericInsert(rax,s,len,data,old,1);
  3. }
  4. int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite) {
  5. size_t i;
  6. int j = 0;
  7. raxNode *h, **parentlink;
  8. debugf("### Insert %.*s with value %p\n", (int)len, s, data);
  9. i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);
  10. if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {
  11. debugf("### Insert: node representing key exists\n");
  12. /* Make space for the value pointer if needed. */
  13. if (!h->iskey || (h->isnull && overwrite)) {
  14. h = raxReallocForData(h,data);
  15. if (h) memcpy(parentlink,&h,sizeof(h));
  16. }
  17. if (h == NULL) {
  18. errno = ENOMEM;
  19. return 0;
  20. }
  21. /* Update the existing key if there is already one. */
  22. if (h->iskey) {
  23. if (old) *old = raxGetData(h);
  24. if (overwrite) raxSetData(h,data);
  25. errno = 0;
  26. return 0; /* Element already exists. */
  27. }
  28. /* Otherwise set the node as a key. Note that raxSetData()
  29. * will set h->iskey. */
  30. raxSetData(h,data);
  31. rax->numele++;
  32. return 1; /* Element inserted. */
  33. }
  34. /* If the node we stopped at is a compressed node, we need to
  35. * split it before to continue.
  36. *
  37. * Splitting a compressed node have a few possible cases.
  38. * Imagine that the node 'h' we are currently at is a compressed
  39. * node containing the string "ANNIBALE" (it means that it represents
  40. * nodes A -> N -> N -> I -> B -> A -> L -> E with the only child
  41. * pointer of this node pointing at the 'E' node, because remember that
  42. * we have characters at the edges of the graph, not inside the nodes
  43. * themselves.
  44. *
  45. * In order to show a real case imagine our node to also point to
  46. * another compressed node, that finally points at the node without
  47. * children, representing 'O':
  48. *
  49. * "ANNIBALE" -> "SCO" -> []
  50. *
  51. * When inserting we may face the following cases. Note that all the cases
  52. * require the insertion of a non compressed node with exactly two
  53. * children, except for the last case which just requires splitting a
  54. * compressed node.
  55. *
  56. * 1) Inserting "ANNIENTARE"
  57. *
  58. * |B| -> "ALE" -> "SCO" -> []
  59. * "ANNI" -> |-|
  60. * |E| -> (... continue algo ...) "NTARE" -> []
  61. *
  62. * 2) Inserting "ANNIBALI"
  63. *
  64. * |E| -> "SCO" -> []
  65. * "ANNIBAL" -> |-|
  66. * |I| -> (... continue algo ...) []
  67. *
  68. * 3) Inserting "AGO" (Like case 1, but set iscompr = 0 into original node)
  69. *
  70. * |N| -> "NIBALE" -> "SCO" -> []
  71. * |A| -> |-|
  72. * |G| -> (... continue algo ...) |O| -> []
  73. *
  74. * 4) Inserting "CIAO"
  75. *
  76. * |A| -> "NNIBALE" -> "SCO" -> []
  77. * |-|
  78. * |C| -> (... continue algo ...) "IAO" -> []
  79. *
  80. * 5) Inserting "ANNI"
  81. *
  82. * "ANNI" -> "BALE" -> "SCO" -> []
  83. *
  84. * The final algorithm for insertion covering all the above cases is as
  85. * follows.
  86. *
  87. * ============================= ALGO 1 =============================
  88. *
  89. * For the above cases 1 to 4, that is, all cases where we stopped in
  90. * the middle of a compressed node for a character mismatch, do:
  91. *
  92. * Let $SPLITPOS be the zero-based index at which, in the
  93. * compressed node array of characters, we found the mismatching
  94. * character. For example if the node contains "ANNIBALE" and we add
  95. * "ANNIENTARE" the $SPLITPOS is 4, that is, the index at which the
  96. * mismatching character is found.
  97. *
  98. * 1. Save the current compressed node $NEXT pointer (the pointer to the
  99. * child element, that is always present in compressed nodes).
  100. *
  101. * 2. Create "split node" having as child the non common letter
  102. * at the compressed node. The other non common letter (at the key)
  103. * will be added later as we continue the normal insertion algorithm
  104. * at step "6".
  105. *
  106. * 3a. IF $SPLITPOS == 0:
  107. * Replace the old node with the split node, by copying the auxiliary
  108. * data if any. Fix parent's reference. Free old node eventually
  109. * (we still need its data for the next steps of the algorithm).
  110. *
  111. * 3b. IF $SPLITPOS != 0:
  112. * Trim the compressed node (reallocating it as well) in order to
  113. * contain $splitpos characters. Change child pointer in order to link
  114. * to the split node. If new compressed node len is just 1, set
  115. * iscompr to 0 (layout is the same). Fix parent's reference.
  116. *
  117. * 4a. IF the postfix len (the length of the remaining string of the
  118. * original compressed node after the split character) is non zero,
  119. * create a "postfix node". If the postfix node has just one character
  120. * set iscompr to 0, otherwise iscompr to 1. Set the postfix node
  121. * child pointer to $NEXT.
  122. *
  123. * 4b. IF the postfix len is zero, just use $NEXT as postfix pointer.
  124. *
  125. * 5. Set child[0] of split node to postfix node.
  126. *
  127. * 6. Set the split node as the current node, set current index at child[1]
  128. * and continue insertion algorithm as usually.
  129. *
  130. * ============================= ALGO 2 =============================
  131. *
  132. * For case 5, that is, if we stopped in the middle of a compressed
  133. * node but no mismatch was found, do:
  134. *
  135. * Let $SPLITPOS be the zero-based index at which, in the
  136. * compressed node array of characters, we stopped iterating because
  137. * there were no more keys character to match. So in the example of
  138. * the node "ANNIBALE", addig the string "ANNI", the $SPLITPOS is 4.
  139. *
  140. * 1. Save the current compressed node $NEXT pointer (the pointer to the
  141. * child element, that is always present in compressed nodes).
  142. *
  143. * 2. Create a "postfix node" containing all the characters from $SPLITPOS
  144. * to the end. Use $NEXT as the postfix node child pointer.
  145. * If the postfix node length is 1, set iscompr to 0.
  146. * Set the node as a key with the associated value of the new
  147. * inserted key.
  148. *
  149. * 3. Trim the current node to contain the first $SPLITPOS characters.
  150. * As usually if the new node length is just 1, set iscompr to 0.
  151. * Take the iskey / associated value as it was in the orignal node.
  152. * Fix the parent's reference.
  153. *
  154. * 4. Set the postfix node as the only child pointer of the trimmed
  155. * node created at step 1.
  156. */
  157. /* ------------------------- ALGORITHM 1 --------------------------- */
  158. if (h->iscompr && i != len) {
  159. debugf("ALGO 1: Stopped at compressed node %.*s (%p)\n",
  160. h->size, h->data, (void*)h);
  161. debugf("Still to insert: %.*s\n", (int)(len-i), s+i);
  162. debugf("Splitting at %d: '%c'\n", j, ((char*)h->data)[j]);
  163. debugf("Other (key) letter is '%c'\n", s[i]);
  164. /* 1: Save next pointer. */
  165. raxNode **childfield = raxNodeLastChildPtr(h);
  166. raxNode *next;
  167. memcpy(&next,childfield,sizeof(next));
  168. debugf("Next is %p\n", (void*)next);
  169. debugf("iskey %d\n", h->iskey);
  170. if (h->iskey) {
  171. debugf("key value is %p\n", raxGetData(h));
  172. }
  173. /* Set the length of the additional nodes we will need. */
  174. size_t trimmedlen = j;
  175. size_t postfixlen = h->size - j - 1;
  176. int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;
  177. size_t nodesize;
  178. /* 2: Create the split node. Also allocate the other nodes we'll need
  179. * ASAP, so that it will be simpler to handle OOM. */
  180. raxNode *splitnode = raxNewNode(1, split_node_is_key);
  181. raxNode *trimmed = NULL;
  182. raxNode *postfix = NULL;
  183. if (trimmedlen) {
  184. nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
  185. sizeof(raxNode*);
  186. if (h->iskey && !h->isnull) nodesize += sizeof(void*);
  187. trimmed = rax_malloc(nodesize);
  188. }
  189. if (postfixlen) {
  190. nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
  191. sizeof(raxNode*);
  192. postfix = rax_malloc(nodesize);
  193. }
  194. /* OOM? Abort now that the tree is untouched. */
  195. if (splitnode == NULL ||
  196. (trimmedlen && trimmed == NULL) ||
  197. (postfixlen && postfix == NULL))
  198. {
  199. rax_free(splitnode);
  200. rax_free(trimmed);
  201. rax_free(postfix);
  202. errno = ENOMEM;
  203. return 0;
  204. }
  205. splitnode->data[0] = h->data[j];
  206. if (j == 0) {
  207. /* 3a: Replace the old node with the split node. */
  208. if (h->iskey) {
  209. void *ndata = raxGetData(h);
  210. raxSetData(splitnode,ndata);
  211. }
  212. memcpy(parentlink,&splitnode,sizeof(splitnode));
  213. } else {
  214. /* 3b: Trim the compressed node. */
  215. trimmed->size = j;
  216. memcpy(trimmed->data,h->data,j);
  217. trimmed->iscompr = j > 1 ? 1 : 0;
  218. trimmed->iskey = h->iskey;
  219. trimmed->isnull = h->isnull;
  220. if (h->iskey && !h->isnull) {
  221. void *ndata = raxGetData(h);
  222. raxSetData(trimmed,ndata);
  223. }
  224. raxNode **cp = raxNodeLastChildPtr(trimmed);
  225. memcpy(cp,&splitnode,sizeof(splitnode));
  226. memcpy(parentlink,&trimmed,sizeof(trimmed));
  227. parentlink = cp; /* Set parentlink to splitnode parent. */
  228. rax->numnodes++;
  229. }
  230. /* 4: Create the postfix node: what remains of the original
  231. * compressed node after the split. */
  232. if (postfixlen) {
  233. /* 4a: create a postfix node. */
  234. postfix->iskey = 0;
  235. postfix->isnull = 0;
  236. postfix->size = postfixlen;
  237. postfix->iscompr = postfixlen > 1;
  238. memcpy(postfix->data,h->data+j+1,postfixlen);
  239. raxNode **cp = raxNodeLastChildPtr(postfix);
  240. memcpy(cp,&next,sizeof(next));
  241. rax->numnodes++;
  242. } else {
  243. /* 4b: just use next as postfix node. */
  244. postfix = next;
  245. }
  246. /* 5: Set splitnode first child as the postfix node. */
  247. raxNode **splitchild = raxNodeLastChildPtr(splitnode);
  248. memcpy(splitchild,&postfix,sizeof(postfix));
  249. /* 6. Continue insertion: this will cause the splitnode to
  250. * get a new child (the non common character at the currently
  251. * inserted key). */
  252. rax_free(h);
  253. h = splitnode;
  254. } else if (h->iscompr && i == len) {
  255. /* ------------------------- ALGORITHM 2 --------------------------- */
  256. debugf("ALGO 2: Stopped at compressed node %.*s (%p) j = %d\n",
  257. h->size, h->data, (void*)h, j);
  258. /* Allocate postfix & trimmed nodes ASAP to fail for OOM gracefully. */
  259. size_t postfixlen = h->size - j;
  260. size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
  261. sizeof(raxNode*);
  262. if (data != NULL) nodesize += sizeof(void*);
  263. raxNode *postfix = rax_malloc(nodesize);
  264. nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);
  265. if (h->iskey && !h->isnull) nodesize += sizeof(void*);
  266. raxNode *trimmed = rax_malloc(nodesize);
  267. if (postfix == NULL || trimmed == NULL) {
  268. rax_free(postfix);
  269. rax_free(trimmed);
  270. errno = ENOMEM;
  271. return 0;
  272. }
  273. /* 1: Save next pointer. */
  274. raxNode **childfield = raxNodeLastChildPtr(h);
  275. raxNode *next;
  276. memcpy(&next,childfield,sizeof(next));
  277. /* 2: Create the postfix node. */
  278. postfix->size = postfixlen;
  279. postfix->iscompr = postfixlen > 1;
  280. postfix->iskey = 1;
  281. postfix->isnull = 0;
  282. memcpy(postfix->data,h->data+j,postfixlen);
  283. raxSetData(postfix,data);
  284. raxNode **cp = raxNodeLastChildPtr(postfix);
  285. memcpy(cp,&next,sizeof(next));
  286. rax->numnodes++;
  287. /* 3: Trim the compressed node. */
  288. trimmed->size = j;
  289. trimmed->iscompr = j > 1;
  290. trimmed->iskey = 0;
  291. trimmed->isnull = 0;
  292. memcpy(trimmed->data,h->data,j);
  293. memcpy(parentlink,&trimmed,sizeof(trimmed));
  294. if (h->iskey) {
  295. void *aux = raxGetData(h);
  296. raxSetData(trimmed,aux);
  297. }
  298. /* Fix the trimmed node child pointer to point to
  299. * the postfix node. */
  300. cp = raxNodeLastChildPtr(trimmed);
  301. memcpy(cp,&postfix,sizeof(postfix));
  302. /* Finish! We don't need to continue with the insertion
  303. * algorithm for ALGO 2. The key is already inserted. */
  304. rax->numele++;
  305. rax_free(h);
  306. return 1; /* Key inserted. */
  307. }
  308. /* We walked the radix tree as far as we could, but still there are left
  309. * chars in our string. We need to insert the missing nodes. */
  310. while(i < len) {
  311. raxNode *child;
  312. /* If this node is going to have a single child, and there
  313. * are other characters, so that that would result in a chain
  314. * of single-childed nodes, turn it into a compressed node. */
  315. if (h->size == 0 && len-i > 1) {
  316. debugf("Inserting compressed node\n");
  317. size_t comprsize = len-i;
  318. if (comprsize > RAX_NODE_MAX_SIZE)
  319. comprsize = RAX_NODE_MAX_SIZE;
  320. raxNode *newh = raxCompressNode(h,s+i,comprsize,&child);
  321. if (newh == NULL) goto oom;
  322. h = newh;
  323. memcpy(parentlink,&h,sizeof(h));
  324. parentlink = raxNodeLastChildPtr(h);
  325. i += comprsize;
  326. } else {
  327. debugf("Inserting normal node\n");
  328. raxNode **new_parentlink;
  329. raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink);
  330. if (newh == NULL) goto oom;
  331. h = newh;
  332. memcpy(parentlink,&h,sizeof(h));
  333. parentlink = new_parentlink;
  334. i++;
  335. }
  336. rax->numnodes++;
  337. h = child;
  338. }
  339. raxNode *newh = raxReallocForData(h,data);
  340. if (newh == NULL) goto oom;
  341. h = newh;
  342. if (!h->iskey) rax->numele++;
  343. raxSetData(h,data);
  344. memcpy(parentlink,&h,sizeof(h));
  345. return 1; /* Element inserted. */
  346. oom:
  347. /* This code path handles out of memory after part of the sub-tree was
  348. * already modified. Set the node as a key, and then remove it. However we
  349. * do that only if the node is a terminal node, otherwise if the OOM
  350. * happened reallocating a node in the middle, we don't need to free
  351. * anything. */
  352. if (h->size == 0) {
  353. h->isnull = 1;
  354. h->iskey = 1;
  355. rax->numele++; /* Compensate the next remove. */
  356. assert(raxRemove(rax,s,i,NULL) != 0);
  357. }
  358. errno = ENOMEM;
  359. return 0;
  360. }

2.2 查找数据

是压缩树中查找特定 key 所在位置的函数是raxLowWalk() ,它的本质就是树的遍历操作。

  1. 从 rax 头节点开始遍历
    1. 如果节点是压缩的
      1. 依次比较当前节点上保存的 key 字符与目标字符
      2. 找到一个与目标字符不相等的就跳出循环
      3. 根据条件判断是否需要去后继节点上继续查找
    2. 如果节点不是压缩的
    3. 当一个字符匹配上的时候,就从这个字符的后继节点上继续查找
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) {
    raxNode *h = rax->head;
    raxNode **parentlink = &rax->head;

    size_t i = 0; /* Position in the string. */
    size_t j = 0; /* Position in the node children (or bytes if compressed).*/
    while(h->size && i < len) {
        debugnode("Lookup current node",h);
        unsigned char *v = h->data;

        if (h->iscompr) {
            for (j = 0; j < h->size && i < len; j++, i++) {
                if (v[j] != s[i]) break;
            }
            if (j != h->size) break;
        } else {

            for (j = 0; j < h->size; j++) {
                if (v[j] == s[i]) break;
            }
            if (j == h->size) break;
            i++;
        }

        if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
        raxNode **children = raxNodeFirstChildPtr(h);
        if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
        memcpy(&h,children+j,sizeof(h));
        parentlink = children+j;
        j = 0; 
    }
    debugnode("Lookup stop node is",h);
    if (stopnode) *stopnode = h;
    if (plink) *plink = parentlink;
    if (splitpos && h->iscompr) *splitpos = j;
    return i;
}

3.流的操作

流的操作命令的处理函数是xaddCommand()。源码有点长,但是逻辑并不复杂,我们来梳理一下:

  1. 将客户端传输过来的命令参数解析,并做语法检查
  2. 调用streamTypeLookupWriteOrCreate()用目标流的key去数据库中找是否存在对应的 Stream 对象,不存在则创建
  3. 向流添加元素前要校验其last_id记录的消息id是否达到了最大值,校验通过则调用streamAppendItem() 函数将元素添加到流中
  4. 根据命令中的maxlen参数调用streamTrimByLength()函数对流进行剪切,以便其保存的元素总数<=maxlen的限定。另外由于主从复制和AOF持久化,判断是否需要调用函数streamRewriteApproxMaxlen()将命令参数进行重写转化,否则在从节点上对流的剪切不一定和主节点一致
  5. 如果有客户端阻塞在流的读取上,需要通知它有新数据到来了
/* XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
    streamID id;
    int id_given = 0; /* Was an ID different than "*" specified? */
    long long maxlen = -1;  /* If left to -1 no trimming is performed. */
    int approx_maxlen = 0;  /* If 1 only delete whole radix tree nodes, so
                               the maximum length is not applied verbatim. */
    int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
    int no_mkstream = 0; /* if set to 1 do not create new stream */

    /* Parse options. */
    int i = 2; /* This is the first argument position where we could
                  find an option, or the ID. */
    //将客户端传输过来的命令参数解析,并做语法检查
    for (; i < c->argc; i++) {
        int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
        char *opt = c->argv[i]->ptr;
        if (opt[0] == '*' && opt[1] == '\0') {
            /* This is just a fast path for the common case of auto-ID
             * creation. */
            break;
        } else if (!strcasecmp(opt,"maxlen") && moreargs) {
            approx_maxlen = 0;
            char *next = c->argv[i+1]->ptr;
            /* Check for the form MAXLEN ~ <count>. */
            if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
                approx_maxlen = 1;
                i++;
            } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
                i++;
            }
            if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
                != C_OK) return;

            if (maxlen < 0) {
                addReplyError(c,"The MAXLEN argument must be >= 0.");
                return;
            }
            i++;
            maxlen_arg_idx = i;
        } else if (!strcasecmp(opt,"nomkstream")) {
            no_mkstream = 1;
        } else {
            /* If we are here is a syntax error or a valid ID. */
            if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
            id_given = 1;
            break;
        }
    }
    int field_pos = i+1;

    /* Check arity. */
    if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
        addReplyError(c,"wrong number of arguments for XADD");
        return;
    }

    /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
     * a new stream and have streamAppendItem fail, leaving an empty key in the
     * database. */
    if (id_given && id.ms == 0 && id.seq == 0) {
        addReplyError(c,"The ID specified in XADD must be greater than 0-0");
        return;
    }

    /* Lookup the stream at key. */
    robj *o;
    stream *s;
    //用目标流的key去数据库中找是否存在对应的 Stream 对象,不存在则创建
    if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return;
    s = o->ptr;

    //校验其last_id记录的消息id是否达到了最大值
    if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
        addReplyError(c,"The stream has exhausted the last possible ID, "
                        "unable to add more items");
        return;
    }

    /* 将元素添加到流中*/
    if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
        &id, id_given ? &id : NULL)
        == C_ERR)
    {
        addReplyError(c,"The ID specified in XADD is equal or smaller than the "
                        "target stream top item");
        return;
    }

    addReplyStreamID(c,&id);

    signalModifiedKey(c,c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
    server.dirty++;
    //如果条件成立,
    if (maxlen >= 0) {
        /* 对流进行剪切 */
        if (streamTrimByLength(s,maxlen,approx_maxlen)) {
            notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
        }
        //判断是否需要将命令参数进行重写转化,否则在从节点上对流的剪切不一定和主节点一致
        if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
    }

    /* Let's rewrite the ID argument with the one actually generated for
     * AOF/replication propagation. */
    robj *idarg = createObjectFromStreamID(&id);
    rewriteClientCommandArgument(c,i,idarg);
    decrRefCount(idarg);

    /* 如果有客户端阻塞在流的读取上,需要通知它有新数据到来了 */
    signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
}

3.1 streamTypeLookupWriteOrCreate

这个函数的主要作用就是去数据库查查数是否已经存在了,不存在就创建出来。大概流程如下:

  1. 根据key去数据库查找

    1. 如果找到redisObject对象需要判断其类型是否是OBJ_STREAM,是的话直接返回
    2. 没有找到则调用createStreamObject()新建一个流对象
      robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
      //根据传入的key去数据库查找
      robj *o = lookupKeyWrite(c->db,key);
      //类型检查
      if (checkType(c,o,OBJ_STREAM)) return NULL;
      //没找到就创建
      if (o == NULL) {
        if (no_create) {
            addReplyNull(c);
            return NULL;
        }
        //创建流对象
        o = createStreamObject();
        //保存到数据库
        dbAdd(c->db,key,o);
      }
      return o;
      }
      

      3.2 streamAppendItem

      streamAppendItem()代码有点顶,我也是大概推测一下他的流程,在网上看了一些资料,大概梳理清楚点了。
  2. 如果添加元素到 Stream 中的命令没有指定 ID 参数为特定一个值,则添加元素总是会成功,否则有可能失败。因为streamCompareID() 函数会比较客户端传过来的 ID 和 Stream 中当前的最后一个 ID,二者有可能存在冲突

  3. 添加元素过程中首先取 Stream 对象中的 rax 结构生成迭代器,调用raxSeek(&ri,”$”,NULL,0);找到 radix tree 的最后一个节点,校验这个节点的 data 占用的空间是否超过配置(stream-node-max-bytes,默认 4096),以及其保存的元素总数是否超过配置(stream-node-max-entries,默认 100)。如果超过就将指向 data 的指针清空,没有超过则使用最后一个节点的 data 空间
  4. 如果最后一个节点的 data 数据域已经满了,则调用 lpNew() 函数生成一个 listpack ,其内部其实也就是一个 char 指针。之后的步骤是把向 Stram 中添加的 field 用以下方式组织存入到 listpack 中,接着调用函数rax.c#raxInsert()将其以指定的 id 插入到 radix tree 中;如果最后一个节点的 data 数据域没有满,需要将新的 field 插入到这个 listpack 中,并检查新添加的这些 field 是否和原来的 field 完全相同
    1. image.png
  5. 处理完 field 之后,需要处理 value ,通常将其以方式1组织,但是如果新添加的 field 和节点中原来的 field 完全相同,则只要更新 value 即可,不需要更新 filed,故此时采用方式2组织,最后调用 rax.c#raxInsert()函数将该字符串数据插回 radix tree 中

    1. image.png

      int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
      
      /* Generate the new entry ID. */
      streamID id;
      if (use_id)
        id = *use_id;
      else
        streamNextID(&s->last_id,&id);
      
      if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
      
      /* Add the new entry. */
      raxIterator ri;
      raxStart(&ri,s->rax);
      raxSeek(&ri,"$",NULL,0);
      
      size_t lp_bytes = 0;        /* Total bytes in the tail listpack. */
      unsigned char *lp = NULL;   /* Tail listpack pointer. */
      
      /* Get a reference to the tail node listpack. */
      if (raxNext(&ri)) {
        lp = ri.data;
        lp_bytes = lpBytes(lp);
      }
      raxStop(&ri);
      
      if (lp != NULL) {
        if (server.stream_node_max_bytes &&
            lp_bytes >= server.stream_node_max_bytes)
        {
            lp = NULL;
        } else if (server.stream_node_max_entries) {
            int64_t count = lpGetInteger(lpFirst(lp));
            if (count >= server.stream_node_max_entries) lp = NULL;
        }
      }
      
      int flags = STREAM_ITEM_FLAG_NONE;
      if (lp == NULL) {
        master_id = id;
        streamEncodeID(rax_key,&id);
        /* Create the listpack having the master entry ID and fields. */
        lp = lpNew();
        lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
        lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
        lp = lpAppendInteger(lp,numfields);
        for (int64_t i = 0; i < numfields; i++) {
            sds field = argv[i*2]->ptr;
            lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
        }
        lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
        raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
        /* The first entry we insert, has obviously the same fields of the
         * master entry. */
        flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
      } else {
        serverAssert(ri.key_len == sizeof(rax_key));
        memcpy(rax_key,ri.key,sizeof(rax_key));
      
        /* Read the master ID from the radix tree key. */
        streamDecodeID(rax_key,&master_id);
        unsigned char *lp_ele = lpFirst(lp);
      
        /* Update count and skip the deleted fields. */
        int64_t count = lpGetInteger(lp_ele);
        lp = lpReplaceInteger(lp,&lp_ele,count+1);
        lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
        lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
      
        /* Check if the entry we are adding, have the same fields
         * as the master entry. */
        int64_t master_fields_count = lpGetInteger(lp_ele);
        lp_ele = lpNext(lp,lp_ele);
        if (numfields == master_fields_count) {
            int64_t i;
            for (i = 0; i < master_fields_count; i++) {
                sds field = argv[i*2]->ptr;
                int64_t e_len;
                unsigned char buf[LP_INTBUF_SIZE];
                unsigned char *e = lpGet(lp_ele,&e_len,buf);
                /* Stop if there is a mismatch. */
                if (sdslen(field) != (size_t)e_len ||
                    memcmp(e,field,e_len) != 0) break;
                lp_ele = lpNext(lp,lp_ele);
            }
            /* All fields are the same! We can compress the field names
             * setting a single bit in the flags. */
            if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
        }
      }
      
      lp = lpAppendInteger(lp,flags);
      lp = lpAppendInteger(lp,id.ms - master_id.ms);
      lp = lpAppendInteger(lp,id.seq - master_id.seq);
      if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
        lp = lpAppendInteger(lp,numfields);
      for (int64_t i = 0; i < numfields; i++) {
        sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
        if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
            lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
        lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
      }
      /* Compute and store the lp-count field. */
      int64_t lp_count = numfields;
      lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
      if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
        /* If the item is not compressed, it also has the fields other than
         * the values, and an additional num-fileds field. */
        lp_count += numfields+1;
      }
      lp = lpAppendInteger(lp,lp_count);
      
      /* Insert back into the tree in order to update the listpack pointer. */
      if (ri.data != lp)
        raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
      s->length++;
      s->last_id = id;
      if (added_id) *added_id = id;
      return C_OK;
      }
      

      3.3 streamTrimByLength

      streamTrimByLength()函数会根据给定的长度从压缩树的头节点开始剪切,但是这并不意味着剪切之后压缩树中的元素数量就等于给定条件,因为压缩树要想删除元素只能将包含该元素的整个节点移除。

      int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
      if (s->length <= maxlen) return 0;
      
      raxIterator ri;
      raxStart(&ri,s->rax);
      raxSeek(&ri,"^",NULL,0);
      
      int64_t deleted = 0;
      while(s->length > maxlen && raxNext(&ri)) {
        unsigned char *lp = ri.data, *p = lpFirst(lp);
        int64_t entries = lpGetInteger(p);
      
        /* Check if we can remove the whole node, and still have at
         * least maxlen elements. */
        if (s->length - entries >= maxlen) {
            lpFree(lp);
            raxRemove(s->rax,ri.key,ri.key_len,NULL);
            raxSeek(&ri,">=",ri.key,ri.key_len);
            s->length -= entries;
            deleted += entries;
            continue;
        }
      
        /* If we cannot remove a whole element, and approx is true,
         * stop here. */
        if (approx) break;
      
        /* Otherwise, we have to mark single entries inside the listpack
         * as deleted. We start by updating the entries/deleted counters. */
        int64_t to_delete = s->length - maxlen;
        serverAssert(to_delete < entries);
        lp = lpReplaceInteger(lp,&p,entries-to_delete);
        p = lpNext(lp,p); /* Seek deleted field. */
        int64_t marked_deleted = lpGetInteger(p);
        lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
        p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
      
        /* Skip all the master fields. */
        int64_t master_fields_count = lpGetInteger(p);
        p = lpNext(lp,p); /* Seek the first field. */
        for (int64_t j = 0; j < master_fields_count; j++)
            p = lpNext(lp,p); /* Skip all master fields. */
        p = lpNext(lp,p); /* Skip the zero master entry terminator. */
      
        /* 'p' is now pointing to the first entry inside the listpack.
         * We have to run entry after entry, marking entries as deleted
         * if they are already not deleted. */
        while(p) {
            int flags = lpGetInteger(p);
            int to_skip;
      
            /* Mark the entry as deleted. */
            if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
                flags |= STREAM_ITEM_FLAG_DELETED;
                lp = lpReplaceInteger(lp,&p,flags);
                deleted++;
                s->length--;
                if (s->length <= maxlen) break; /* Enough entries deleted. */
            }
      
            p = lpNext(lp,p); /* Skip ID ms delta. */
            p = lpNext(lp,p); /* Skip ID seq delta. */
            p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
            if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
                to_skip = master_fields_count;
            } else {
                to_skip = lpGetInteger(p);
                to_skip = 1+(to_skip*2);
            }
      
            while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
            p = lpNext(lp,p); /* Skip the final lp-count field. */
        }
      
        /* Here we should perform garbage collection in case at this point
         * there are too many entries deleted inside the listpack. */
        entries -= to_delete;
        marked_deleted += to_delete;
        if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
            /* TODO: perform a garbage collection. */
        }
      
        /* Update the listpack with the new pointer. */
        raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
      
        break; /* If we are here, there was enough to delete in the current
                  node, so no need to go to the next node. */
      }
      
      raxStop(&ri);
      return deleted;
      }
      

      至此,整个stream数据结构就分析完了,下面我们再来看下stream的应用场景。

      4.应用场景

      ①消息发布订阅

相比于现有的PUB/SUB、BLOCKED LIST,其虽然也可以在简单的场景下作为消息队列来使用,但是Redis Stream无疑要完善很多。Redis Stream提供了消息的持久化和主备复制功能、新的RadixTree数据结构来支持更高效的内存使用和消息读取、甚至是类似于Kafka的Consumer Group功能。