Redis 可以实现轻量级的消息队列功能,Redis 的消息队列不是专业的消息队列,它没有非常多高级特性,没有 ack 保证,如果对消息的可靠性有着极致的追求,那么它就不适合使用。

异步消息队列

Redis 的 list(列表)数组结构常用来作为异步消息队列使用,使用rpush|lpush 入队,lpop|rpop出队。

  1. 127.0.0.1:6379> lpush queue php java node.js
  2. (integer) 3
  3. 127.0.0.1:6379> llen queue
  4. (integer) 3
  5. 127.0.0.1:6379> rpop queue
  6. "php"
  7. 127.0.0.1:6379> llen queue
  8. (integer) 2
  9. 127.0.0.1:6379> lpop queue
  10. "node.js"
  11. 127.0.0.1:6379> llen queue
  12. (integer) 1
  13. 127.0.0.1:6379> rpop queue
  14. "java"
  15. 127.0.0.1:6379> llen queue
  16. (integer) 0
  17. 127.0.0.1:6379> rpop queue
  18. (nil)

队列空了怎么办?

客户端使用队列的 pop 操作获取消息,处理完消息后紧接差再获取、处理,如此不断的循环处理。
如果队列为空的话,客户端就会陷入死循环,这样会拉高客户端的 CPU 和 Redis 的QPS,如果空轮询的客户端数量比较多,Redis 的慢查询就会显著增多。
通常我们用 sleep 来解决这个问题,让线程睡一段时间,这样 CPU 和 Redis 的 QPS 都会降下来。

队列延迟

上面用睡眠可以解决队列空的问题,但是这样会导致队列的延迟增加。如果只有一个消费者,延迟就是1s,如果有多个消费者,延迟会有所下降,因为每个消费者的睡眠时间不是在同一时刻的。
其实有更好的办法,那就是 blpop|brpop,这两个命令的前缀 b 代表 blocking,也就是阻塞读。
阻塞读就是在读取数据的时候,如果队列为空,会进入休眠状态,如果队列一旦有新消息,就立即醒来,这样可以完美解决消息延迟的问题。

空闲连接自动断开

上面的方案还存在一个问题,如果队列阻塞时间过长,服务器一般会自动断开连接,这时客户端就会抛出连接超时异常,所以客户端要增加异常捕获,重试连接。

使用延时列表处理分布式锁冲突

如果加锁失败可以有以下 3 种处理方式:

  1. 直接抛异常,通知用户重试
    这方式比较适合由用户发起的请求,将异常抛给前端后,由用户选择是否重新发起请求,达到延时的效果。
  2. sleep 一段时间重试
    sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比较频繁或都队列里消息比较多,sleep 可能不合适。如果因为个别死锁的 key 导致加锁不成功,线程会彻底堵死,导致后续消息永远得不到及时处理。
  3. 将请求转移至延时队列,过一会重试
    这种方式适合异步消息处理,将当前冲突的请求放到另一个队列延后处理以避开冲突。

    延时队列实现

    延时队列可以使用 zset 来实现,把要处理的消息序列化成字符串放入 zet,消息处理的过期时间为 score,然后用多个线程轮询获取到期的任务进行处理,使用多线程是为了保障可用性,万一个线程挂了还有其它线程可以继续处理。但是因为有多线程并发争抢任务,所以要确保任务不被重复执行。
    Redis的 zrem 是多纯种多进程争抢任务的关键,它的返回值决定当前实例有没有抡到任务,因为 lpop 方法可能会被多个线程、多个进程调用,同一个任务可能被会多个进程抢到,通过 zrem 来决定任务的唯一属主。同时,要对 handle_msg 进行异常捕获,避免因为个别任务处理导致循环中止。