在项目一文中,我们说明了-请求和回复-数据的组成。现在我们回顾一下。
请求:

  • 参数的个数
  • 每个参数的长度
  • 每个参数的文本

回复:

  • 状态回复
  • 错误回复
  • 整数回复
  • 批量回复
  • 多条批量回复

针对上面的数据组成,我们要设计一个类型,清晰的存储这些数据。
我们在mian.rs声明一个新的模块request。同时在main.rs目录下新建一个同级的request.rs文件。

request(请求)

在request.rs中声明一个结构体:

  1. #[derive(Debug)]
  2. struct Request {
  3. args: Vec<String>
  4. }

该结构体叫Request,args存储所有文本,每个参数的文本长度可以直接String的方法取得。参数的个数由args的len方法取得。
接下来,我们需要明确结构体的一些方法。这里先实现一些必要的方法,后面等需要的时候在添加方法。实现的方法如下:

  • new 获得一个Request实例。
  • add_arg 添加一个参数。
  • to_bytes 将该实例转化成字符数组。

    new

    生成一个Requset实例。

    1. pub fn new() -> Self {
    2. Request { args: Vec::new() }
    3. }

    add_arg

    向实例中添加参数。

    params

  • arg,字符串类型。

    1. pub fn add_arg(&mut self, arg: &str) {
    2. self.args.push(arg.to_string());
    3. }

    to_bytes

    将实例数据转换成BytesMut。

    BytesMut,一个字节数据处理的容器。来源:bytes.

  1. pub fn to_bytes(&self) -> BytesMut {
  2. let mut bytes = BytesMut::new();
  3. bytes.put(&format!("*{}\r\n", self.args.len()).into_bytes()[..]);
  4. self.args.iter().for_each(|arg| {
  5. bytes.put(&format!("${}\r\n", arg.len()).into_bytes()[..]);
  6. bytes.put(&format!("{}\r\n", arg).into_bytes()[..]);
  7. });
  8. bytes
  9. }

Reply(回复)

我们在main.rs中声明一个全新的模块,用来定义redis的回复,对于Reply(回复)这种多个可能的数据,最常用的数据类型是枚举,我们定义一个枚举类型抽象—回复-这一数据。

  1. #[derive(Debug)]
  2. enum Reply {
  3. //状态回复
  4. Status(String),
  5. //错误回复
  6. Error(String),
  7. //整数回复
  8. Int(i64),
  9. //批量回复
  10. Bulk(Option<String),
  11. //多条批量回复
  12. MultiBulk(Option<Vec<Reply>>),
  13. }

状态回复

在redis协议中状态回复是一段以“+”字符开始,“\r\n”结尾的字符串。
解析状态回复的流程如下:

  1. 第一个字符是否为”+”;
  2. 最后字符是否为”\r\n”,
  3. 前两项满足则返回中间的字符串。

在数据解析时,我们使用nom这个解析库。nom通过简单的解析器组合成能复杂的解析器来完成功能。
nom解析器的一般形式如下:

  1. fn parser(input) -> IResult<(input ,output)>

nom解析成功后,返回一个元组,元组的第一个元素为消耗后的input,第二个元素为解析的结果。
在状态回复中,主要的nom解析器为tag和take_while。
tag解析器会把input中的T消耗掉,T为解析器携带的参数。在这里的使用如下:

  1. let (input ,output) = tag("+")(input)?:

如果上述代码成功,(input, output)中input会是消耗”+”的输入。output则是tag传入的T“+”。
take_while的第一个参数为一个返回bool的函数,输出会将所有满足函数的数结合。通过这个解析器,我们就可以获取“+”“\r\n”中间的字符。整个的代码如下:

  1. pub fn parse_statue(i: &str) -> IResult<&str, Reply> {
  2. let (i, _) = tag("+")(i)?;
  3. let (i, res) = take_while(|c| c != '\r' && c != '\n')(i)?;
  4. let (i, _) = tag("\r\n")(i)?;
  5. Ok((i, Reply::Status(res.to_string())))
  6. }

加入i为“+ok\r\n”。那么在第2行执行后,input这种值就为”ok\r\n”。第三行以第二行的input作为输入,output中的值会持续到take_while中的闭包条件满足,也就是当解析的input中出现”\r”或者”\n”时,解析器会停止。当第3执行后,input为“\r\n”,output则为”ok”。然后让我们检测剩下的input是否满足条件。如果满足则返回解析后的结果。

注意:input会在解析器中传递。不论其解析失败还是成功。除非是我们人为丢弃,否则它会一直在解析器之间传递。

事实上如果我们要解析的字符串中含有”\r\n”这样的特殊子符,它会解析失败,但好在,redis服务器会保证不会出现意外情况,当然我们也可以实现一个一定能解析的版本。

要注意的是,这里出错,可能是一种更好的选择,这意味着客户端出现了问题。

错误回复

错误回复在redis协议中十分相似,除了第一个字节不同外,其他情况的处理都是类似的,所以我们只需要简单的改动状态回复的代码就可以了。

  1. pub fn parse_error(i: &str) -> IResult<&str, Reply> {
  2. let (i, _) = tag("-")(i)?;
  3. let (i, res) = take_while(|c| c != '\r' && c != '\n')(i)?;
  4. let (i, _) = tag("\r\n")(i)?;
  5. Ok((it, Reply::Error(res.to_string())))
  6. }

但事实上nom针对连续解析三次,获取第二次解析结果的行为,提供了一个可用的组合子—delimited。所以上面的代码可以简写为如下的形式。

  1. let (i, res) = delimited(
  2. tag("-"),
  3. take_while1(|c| c != '\r' && c != '\n'),
  4. tag("\r\n"),
  5. )(i)?;
  6. Ok((i, Reply::Error(res.to_string())))

将状态回复也改成delimited的形式。

整数回复

整数回复和前两个回复的解析过程基本一致,只是我们要讲解析后的值再次转化成一个i64的数字,nom在combinator下提供一个组合子实现了这个功能,那就是map。整个代码如下:

  1. pub fn parse_int(i: &str) -> IResult<&str, Reply> {
  2. let (i, res) = delimited(
  3. tag(":"),
  4. map(
  5. take_while1(|c: char| c.is_digit(10) || c == '-'),
  6. |i: &str| i.parse::<i64>().unwrap(),
  7. ),
  8. tag("\r\n"),
  9. )(i)?;
  10. Ok((i, Reply::Int(res)))
  11. }

对比错误回复可以看到,整个程序的框架还没有变,只是每个子解析器变化了。第三行tag解析器的匹配的字符变了。第四行变成了一个新的解析器map,只有第8行的解析器tag还没有变化。这里面最为要注意的是map的使用。map是由两个简单解析器组合而来的新的解析器。
nom的使用就是利用已有的解析器通过搭积木的方式组合出我们需要的解析器。

批量回复

批量回复的实例如下:

  1. "$6\r\nfoobar\r\n"

第一步我们需要匹配的字符串的长度,在只到了字符串长度后,我们解析响应长度的字符串即可。比较特殊的是,批量回复,存在一个特殊的空回复。它的标准格式为:

  1. "$-1\r\n"

具体代码如下:

  1. let (i, _) = tag("$")(i)?;
  2. let (i, len) = take_while1(|c: char| c.is_digit(10) || c == '-')(i)?;
  3. if len == "-1" {
  4. let (i, _) = tag("\r\n")(i)?;
  5. Ok((i, Reply::Bulk(None)))
  6. } else {
  7. let len = len.parse::<usize>().unwrap();
  8. let (i, res) = take_while_m_n(len, len, |_| true)(i)?;
  9. Ok((i, Reply::Bulk(Some(res.to_string()))))
  10. }
  11. }

这里较为特殊的是第3行,空批量回复的处理。然后是take_while_m_n的使用。当然,除了take_whlie_m_n的使用,我们还可以使用take来获取解析定长的数据。

多条批量回复

多批量回复,其实是其他四条回复的嵌套,其中较为特殊的是空白多条批量回复和无内容多条批量回复。
空白多条回复:

  1. *0\r\n

无内容多条回复:

  1. *-1\r\n

正常情况下,多条批量回复和请求类似,
在取得所有的回复个数之后,我们需要对每条回复进行状态回复、错误回复、整数回复、批量回复的可能解析,并且这样的操作要进行n次,n为回复个数。
nom对上述的操作都提供了对应的解析器。重复多次执行一个解析器:count。对一个回复,执行多次解析:alt。
代码如下:

  1. pub fn parse_muti_bulk(i: &str) -> IResult<&str, Reply> {
  2. let (i, counts) = delimited(
  3. tag("*"),
  4. take_while1(|c: char| c.is_digit(10) || c == '-'),
  5. tag("\r\n"),
  6. )(i)?;
  7. if counts == "-1" {
  8. Ok((i, Reply::MultiBulk(None)))
  9. } else if counts == "0" {
  10. Ok((i, Reply::MultiBulk(Some(Vec::new()))))
  11. } else {
  12. let counts = counts.parse::<usize>().unwrap();
  13. let (i, res): (&str, Vec<Reply>) = count(
  14. alt((parse_statue, parse_error, parse_int, parse_bulk)),
  15. counts,
  16. )(i)?;
  17. Ok((i, Reply::MultiBulk(Some(res))))
  18. }
  19. }