本文由 简悦 SimpRead 转码, 原文地址 www.cdeason.cn

什么是消息中间件

先看百科:消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

几个关键点:

  1. 消息 消息中间件定义了一个协议(模型),基于该协议可以传递消息
  2. 分布式 消息中间件的引入是为了解决分布式系统的问题,对于简单的单应用系统用不着

举例来说,一个常见的最简单的UGC应用,至少包括后端、APP、审核后台、统计平台)。UGC应用的生命力在于用户产生内容的质量,所以需要审核用户提交的内容;另一方面,需要统计用户产生内容的数据。用户A产生的内容C在系统间的流动路径是这样的,APP -> 后端 -> 审核 -> 统计,其中审核和统计可以同时进行,这就遇到了各系统间的数据通信问题。简单的解决方案是所有的系统公用一套数据库(单库或者集群,总之数据库结构一样)。这样的好处是实现简单,可靠;坏处是没有关注平台间的差异性,不同平台需要的数据结构不一样,通用的数据库容易造成各个平台都达不到最好的性能,比如统计需要对所有数据进行计算;另外各个系统之间在数据库发生了耦合,在统计和审核过程中产生的垃圾SQL可能影响在线的业务。

引入消息中间件作为各个系统间通信大使可以有效的解决上述问题:数据在后端产生后,可以扔到消息中间件,其余需要这个数据的系统订阅改消息中间件就可以拿到这条数据并进行自己业务内的操作。这样做的最大好处是系统隔离,系统之间不互相影响,缩小问题的影响范围,避免问题的连锁反应——把三个容易出问题的系统绑在一起,出问题的概率不止提升了三倍!消息中间件就像是快递员,把东西给我,告诉我送给谁,你忙你的去吧。

消息中间件的三大要素:

生产者(Producer)、消息(Message)、消费者(Consumer)。
衡量标准基本围绕这三者的交互,这里只说我在选择消息中间件的时候常用的。

  1. 消息路由

消息如何经过消息中间件到达消费者,在一定程度上决定了消息中间件的灵活性。简单的命名队列,TOPIC订阅能满足大部分的场景,对于复杂的业务可能需要比如基于PATTERN的路由(RabbitMQ),消息复制,消息生命周期管理(beanstalk)

  1. 消息可靠性
    大部分场景下消息是容忍丢失的,或者说对性能的渴求大于可靠性,比如异步发短信,异步发邮件,概数数据统计,日志等。
    另外有的场景是不允许消息丢失的,消息的丢失会带来数据的不一致,不一致的数据很多时候是灾难的开始,比如异步写数据,下单后减库存,转账等。可靠性基本都依赖于持久化。
  2. 消息重放
    不常用但是很有用。这个说的是即使是消费过的消息也能设定offset(一般是时间点)重新消费。这个功能在消息下游数据丢失,新系统导入旧数据的时候非常有用,不用再去理繁杂的数据对应关系,按照正常的业务逻辑处理消息就OK了,非常好用!
  3. 消息堆积
    抗流量神技。像双十一这种超高峰流量都会用到这个功能,这时候一方面会把消息中间件下游业务(Consumer端)的机器挪到核心业务,另一方面消息中间件在高并发投递消息的时候可能出问题,所以把消息暂存在中间件,等流量高峰过去了再投递到下游业务。
  4. 分布式集群支持
    高可用的需求,解决单点问题。
  5. ACK 消息确认
    在下游业务确认后才将消息标记为已消费,处理超时则重新投递消息,这里要求下游业务自己做可重入(幂等)
  6. 消息顺序
    有的业务要求消息投递顺序和消费顺序一致,或者至少要求对于单个用户顺序一致,比如用户的赞/取消操作,顺序反了数据就会错乱
  7. 性能和扩展
    这里指的扩展是能否通过增加Consumer来提高消息的消费速率以及消息中间件的容量是否有理论的上线;性能主要指tps、qps以及并发连接数。
  8. 消息协议
    优先考虑标准协议或者使用广泛的协议,有利于后期的维护和扩展

Redis作为消息中间件

redis在5.0版本推出了stream数据结构,专门是针对消息队列功能的。

1. Redis自带的PUB/SUB机制,即发布-订阅模式。

这种模式生产者(producer)和消费者(consumer)是1-M的关系,即一条消息会被多个消费者消费,当只有一个消费者时即可以看做一个1-1的消息队列,

缺点:

数据可靠性的无法保障,如果数据最终需要落库的场景,如果消息丢失、Redis宕机部分数据没有持久化,甚至突然的网络抖动都可能带来数据的丢失,应该是无法忍受的。
扩展不灵活,没法通过多加consumer来加快消费的进度,因为是广播的形式,多个消费者都会受到消息。如果前端写入数据太多,同步会比较慢,数据不同步的状态越久,风险越大,可以通过channel拆分的方式来解决,虽然不灵活,但可以规避。

这种方案更适合于对数据可靠性要求不高,比如一些统计日志打点。

2 Redis的PUSH/POP机制,利用的Redis的列表(lists)数据结构

比较好的使用模式是,生产者lpush消息,消费者brpop消息,并设定超时时间,可以减少redis的压力。
这种方案相对于第一种方案是数据可靠性提高了,只有在Redis宕机且数据没有持久化的情况下丢失数据,可以根据业务通过AOF日志和缩短持久化间隔来保证很高的可靠性,而且也可以通过多个client来提高消费速度。

缺点:

但相对于专业的消息队列来说,该方案消息的状态过于简单(没有状态),且没有ack机制,消息取出后消费失败依赖于client记录日志或者重新push到队列里面。

消息中间件应用—下单

本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com

以下讨论基于 MySQL,InnoDB 引擎,不考虑主从。

简单的订单业务的基本模型设计用户、商品(库存)、订单、付款,这里只考虑商品和订单,流程是下订单 -> 减库存,这两步必须同时完成,不能下了订单不减库存(超卖),或者减了库存没有生成订单(少卖)。超卖商家库存不足,消费者下了单买不到东西,体验不好;少卖商家库存积压或者需要反复修改商品信息,反复麻烦,体验也不好。

在系统初期,承接流量小,很多创业团队都是单库的模型(是的,大家都在一起。。。)。这种模型带来了极大的方便,不用跨库,更没有跨节点,能够方便的利用数据库提供的事务来实现下单和减库存的原子操作,还能进行各种联表和子查询(运营 MM 需求再变态,我会 SQL 能奈我何)。但也正是这些优点,会成为流量上来后对系统进行扩展的绊脚石。联表、子查询、事务都是将多张表绑定在了一起,拆库、拆表就麻烦了。

后期系统流量逐渐升高,单库的读写性能不够,这时候会考虑将数据库进行拆库、分表。比如商品和订单分为两个集群,集群内又根据各自业务维度进行分库和分表,商品可以根据卖家维度来切分,订单一般根据买家维度切分,并且根据卖家维度做冗余。这个时候出现的问题,还是经典问题——

数据一致性

数据拆分后商品和订单不在一个库里,怎么保证一致性;买家维度的订单数据怎么和卖家维度的订单数据保证一致性。有两个解决思路:

(1)分布式事务

经典的有基于 2PC 的实现。优点是封装得足够好后使用起来和单库虽然有区别(主要是复杂查询语句),但总体来说对业务的改动不会很大。缺点是性能太差,本来引入分布式数据库主要是为了成倍的提高性能,但因为分布式事务的引入将这个性能的提升大打折扣,很多时候这个性能是难以接受的。

(2)消息中间件

本文主角。如上一篇关于消息中间件文章所说,消息中间件的一大职能就是负责各个系统间的交流,非常适合这里的商品和订单系统的同步问题。

引入消息中间件后的下单流程是:

用户 A 下订单后给消息中间件发送消息,商品系统订阅订单消息,并扣除相应的库存。
这里有几个注意点:
a、消息的传递是需要时间的,下单前查看有库存,但在并发条件下,实际减库存时可能库存不够,所以必须在库存扣减成功后才能显示订单成功,即下单后标记已下单,但用户对该状态不可见,等待商品系统减库存成功后,再通知订单系统更新状态
b、对消息的可靠性要求很高,发送消息时返回成功就要保证该消息会被投递,发送失败需要下单业务自己做回滚;
c、消息的可靠性高表示一定会有消息重复,这里需要商品系统自己做幂等,可以通过消息 id 来做去重(如下单之后,会生成一个消息到mq中,这条消息有唯一的id号,如消费失败进行消息重试的时候,要判断这个id是否已经消费过,消费过的会记录到消费成功的记录里),否则会少卖;
d、下单成功失败前端用户都希望尽早得到通知,所以在下单成功后需要设定一个定时消息,在一段时间后如果订单库存还没有扣除成功,这个时候应该通知用户下单失败,并且定期回补这部分多扣的库存。

引入消息中间件,可以很好的解决了分布式数据库数据同步的问题,避免了分布式事务。并且额外的好处是减少了减库存时候的并发锁争抢,性能杠杠的。

消息中间件的应用——秒杀

本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com

业务描述

只谈技术上的业务描述,秒杀一般分为秒杀前、秒杀中、秒杀后三个阶段,各个阶段的需求如下:

1、秒杀前,用户访问秒杀宝贝描述页,不断刷新,等待秒杀开始。

2、秒杀中,用户点击秒杀按钮,洪水般的流量瞬间涌入系统,都想走在前面,抢到宝贝。

3、秒杀后,秒杀结束,商品已被抢完或者秒杀时间已过,用户访问秒杀结束页面,整个秒杀阶段结束。

挑战在哪

秒杀前,大量用户频发刷新秒杀页面,都想第一个看到秒杀开始,越临近开始时间,刷新越频繁,这时候对系统会有巨大的读压力。

秒杀中,用户的秒杀请求短时间内持续涌入,前端访问量暴增,并且会牵扯到写库的需求,还有在对于单一热点商品库存判断和递减,由于写库是排他操作,一个一个排队处理,数据库写压力很大。

秒杀后,和秒杀前差不多,短时间内单页面很多用户访问,但时间一过,页面就没有访问量了。

方案

对于秒杀前后单页面的大量访问,处理起来很简单,提前预热把秒杀页面放入 cdn,大量的请求就可以抗住了。这里有一些需要注意的点,

一是 cdn 放的是静态页面,秒杀需要判断秒杀开始,这个可以使用动态加载 js 的方式(网页),从 cdn 去请求一个 js(秒杀开始后再生成:由后端刷新cdn放js,生成真正的秒杀界面),该 js 负责跳转到秒杀页面(秒杀开始后再生成,避免作弊),如果是 app,可以用 webview 或者请求一个 cdn 上的文件也是可以的。有人说为什么不直接请求接口来获取秒杀开始标识,当然是可以的,但是 cdn 是部署到全国各地的,如果请求服务器,这部分量会占用一部分服务器资源,秒杀诶,能节约资源来进行后续请求就节约啊,不然挂了不好看啊。
二是怎么判断秒杀是否开始,秒杀基本都是约定好在某个时间点开始,所以一种直接的想法就是靠时间咯。这的确是一种做法,但不是很好,
首先是因为秒杀为了分散量,多个商品的秒杀就算是一个时间点,也应该分开进行,这样缓解系统压力;
其次就是为了公平性,这个开始时间会在一定范围内有随机性,这样由于用户看到页面的时间随机,不用只拼网速啦,人品也是有用的;
还有就是秒杀开始后还有其他工作要做,比如刷 cdn 放 js,生成秒杀页面等,时间在分布式系统里是最不靠谱的了(服务器时间是有差别的),会导致数据问题(比如 js 生成了,秒杀页面还没有出来,用户一堆 404),用户会骂娘的。

所以一般会抽象一个发令器(使用消息中间件)出来,秒杀系统激活发令器,秒杀开始,下游其它业务开始处理,这里可以定义这些业务的处理顺序,秒杀系统里可以配置何时发令,当然也可以有随机的规则。消息中间件出现了,这里发令器发令后,下游的业务就是通过消息中间件得到的秒杀开始的信号!!很多解决不了的问题往往是因为视角不对,以更高的视角看问题,很多问题往往只缺一个抽象。

秒杀中的最大压力就是对商品库存的并发争抢,这时候可以参考之前文章中的异步减库存的形式来做,只是这里是异步下订单。
说具体方案之前,先看看这个业务场景要做什么?其实就是在解决一万个人同时拿着钱来东西,但是东西只有十件,你卖给谁的问题,只要没给钱,那我东西不给你就没问题,给了钱就必须拿到东西。
所以这里我们可以对来买东西的人强制他们排队,先到先得,并且排队的数据少量丢失是可以接受的,后面拍着那么多人呢,不怕卖不完。

那最后的方案就是,

用户点击秒杀按钮后,加入秒杀队列(限制长度),加入成功则告知正在秒杀,否则直接跳到秒杀结束页面。
对于秒杀队列中的用户,则进行写订单减库存处理处理,并且记录成功与失败,商品秒杀完成后,标记秒杀结束,并且可以清空秒杀队列,避免后面重复判断和处理。
用户端轮询(可以采用长轮询,时效性好)结果,如果秒杀结束并且自己没有秒杀成功,则跳转到秒杀结束页面,否则继续等待,如果秒杀成功则跳转到秒杀成功页面,支付就是后面的事情了。

这里有几个点需要关注,
一是作弊,就是验证码,各种策略各种造型的验证码;
二是时效性,秒杀完后一段时间(比如半个小时)没付款,应该将该笔秒杀标记无效,并且把库存回补。

消息中间件再次大显身手,秒杀队列,这里需要消息中间件提供限长队列功能(不限长,先查询再入队也是可以的,只是要把握好这个时间间隔可能加入的多余数据对消息中间件的影响),清空队列功能(没有也可以,只是不能更快的释放压力)。
当然除了这些,还有限流功能,接入层只允许系统可负载流量进入,超过一定负载就采取措施缓解流量进入(比如验证码。。。)。

可以看到,对于秒杀中功能,基本就是采取逐级降低流量的方法(大流量系统的重要思路),只让不过分高于有效流量的流量进入后端真正的业务系统,保证业务系统不宕机。还有一个原则就是读比写更容易扩展,无业务系统比业务系统更容易扩展,所以应该化写为读(下订单变为查询订单),降低业务系统压力(前端接入层拦截无效流量)。

如何避免下重复订单

https://www.jianshu.com/p/e618cc818432#tocbar-aj2ibi

Redis

Redis 作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数据结构,如 字符串,散列,列表,集合,带有范围查询的排序集(sorted sets),位图(bitmaps),超级日志(hyperloglogs),具有半径查询和流的地理空间索引。Redis 具有内置复制,Lua 脚本,LRU 驱逐,事务和不同级别的磁盘持久性,并通过 Redis Sentinel 和 Redis Cluster 自动分区。

为了实现其出色的性能,Redis 使用内存数据集(in-memory dataset)

消息队列实现的方式

MQ 应用有很多,比如 ActiveMQ,RabbitMQ,Kafka 等,但是也可以基于 redis 来实现,可以降低系统的维护成本和实现复杂度,

  1. 基于 List 的 LPUSH+BRPOP 的实现
  2. PUB/SUB,订阅 / 发布模式
  3. 基于 Sorted-Set 的实现
  4. 基于 Stream 类型的实现

    缺点:

    如果使用Redis做队列,且要保证一致性,如秒杀下单,光用redis是远远不够的,你必须花大力气做一致性补偿和异步幂等的工作,保证能实现exactly once语义。如果这样的话,就不如用一个正经的MQ来做更实际。RabbitMQ,nsq,kafka等都可以。

如果可以容忍丢东西(比如log收集),那么也有现成的方案,fluentd,logstash……
实际上redis并不适合任何有保障数据持久性的场景。它适合做cache,不重要的存储,或者是可以反复重来的批处理计算任务的临时存储等。

redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送,并不保证可靠。
其他的mq和kafka保证可靠但有一些延迟(非实时系统没有保证延迟)。redis-pub/sub断电就清空,而使用redis-list作为消息推送虽然有持久化,但是又太弱智,也并非完全可靠不会丢。
redis 发布订阅除了表示不同的 topic 外,并不支持分组,比如kafka中发布一个东西,多个订阅者可以分组,同一个组里只有一个订阅者会收到该消息,这样可以用作负载均衡。

如果是一个不大的系统,不一定要用消息队列引擎,库就能解决
先选择性能快的kafka,思考kafka能不能满足业务需求
kafka不能,则选择mq。近两年rocketmq流行起来,性能优异,并且有大规模生产实践的例子,会是不错的选择

基于异步消息队列 List lpush-brpop(rpush-blpop)

List 支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。

命令

LPUSH key value1 [value2] 将一个或多个值插入到列表头部
RPUSH key value1 [value2] 将一个或多个值插入到列表尾部

BLPOP key1 [key2 ] timeout 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
BRPOP key1 [key2 ] timeout 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

LPOP key 移出并获取列表的第一个元素
RPOP key 移出并获取列表的最后一个元素

LRANGE key start stop 获取列表指定范围内的元素

为什么使用blpop 不使用lpop

使用 lpushrpush 操作入队列,blpop 和 brpop 操作出队列。
**
但是当队列为空时,lpop 和 rpop 会一直空轮训,值为null ,消耗资源;所以引入阻塞读 blpop 和 brpop(b 代表 blocking),阻塞读在队列没有数据的时候进入休眠状态,一旦数据到来则立刻醒过来,消息延迟几乎为零。

rpop :非阻塞式

image.png

brpop :阻塞式

image.png
image.png
image.png
image.png

brpop可以接收多个键,实现优先级的队列

image.png
从上图命令中可以看出,brpop可以接收多个键,意义是同时检测多个键
如果所有键都没有元素, 则阻塞,
如果其中一个有元素则从该键中弹出该元素,只弹出一个消息。 (会按照key的顺序进行读取,哪个键中先有元素就弹出谁,可以实现具有优先级的队列)。

问题:需要重试机制

你以为上面的方案很完美?还有个问题需要解决:空闲连接的问题。
如果线程一直阻塞在那里,Redis 客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候 blpop 和 brpop 或抛出异常,
所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。

缺点:

  • 做消费者确认 ACK 麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个 Pending 列表,保证消息处理确认。
  • 不能做广播模式,如 pub/sub,消息发布 / 订阅模型
  • 不能重复消费,一旦消费就会被删除
  • 不支持分组消费

PUB/SUB, 订阅 / 发布模式

  1. SUBSCRIBE,用于订阅信道
  2. PUBLISH,向信道发送消息
  3. UNSUBSCRIBE,取消订阅

此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

优点

  • 典型的广播模式,一个消息可以发布到多个消费者
  • 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息
  • 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息

缺点

  • 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回
  • 不能保证每个消费者接收的时间是一致的
  • 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时


可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。**

基于有序集合Sorted-Set 的实现

Sortes Set(有序列表),类似于 java 的 SortedSet 和 HashMap 的结合体,一方面她是一个 set,保证内部 value 的唯一性,另一方面它可以给每个 value 赋予一个 score,代表这个 value 的排序权重。内部实现是 “跳跃表”。
有序集合的方案是在自己确定消息顺 ID 时比较常用,使用集合成员的 Score 来作为消息 ID,保证顺序,还可以保证消息 ID 的单调递增。通常可以使用时间戳 + 序号的方案。确保了消息 ID 的单调递增,利用 SortedSet 的依据
Score 排序的特征,就可以制作一个有序的消息队列了。

优点

就是可以自定义消息 ID,在消息 ID 有意义时,比较重要。

缺点

缺点也明显,不允许重复消息(因为是集合),同时消息 ID 确定有错误会导致消息的顺序出错。

基于 Stream 类型的实现

Stream 为 redis 5.0 后新增的数据结构。支持多播的可持久化消息队列,实现借鉴了 Kafka 设计。
image.png
Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容消息是持久化的,Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建

每个 Stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id 在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到

同一个消费组 (Consumer Group) 可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

增删改查

  1. xadd 追加消息
  2. xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  3. xrange 获取消息列表,会自动过滤已经删除的消息
  4. xlen 消息长度
  5. del 删除 Stream

独立消费

我们可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表(list)。

创建消费组

Stream 通过 xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。

消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

Stream 消息太多怎么办

image.png
读者很容易想到,要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉就是个问题了。xdel 指令又不会删除消息,它只是给消息做了个标志位。
Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

127.0.0.1:6379> xlen codehole  
(integer) 5  
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1  
1527855160273-0  
127.0.0.1:6379> xlen codehole  
(integer) 3

我们看到 Stream 的长度被砍掉了。

消息如果忘记 ACK 会怎样

Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。

PEL 如何避免消息丢失

在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自 last_delivered_id 之后的新消息。

分区 Partition

Redis 没有原生支持分区的能力,想要使用分区,需要分配多个 Stream,然后在客户端使用一定的策略来讲消息放入不同的 stream。

结论

Stream 的消费模型借鉴了 kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。如果读者稍微研究过 Redis 作者的另一个开源项目 Disque 的话,这极可能是作者意识到 Disque 项目的活跃程度不够,所以将 Disque 的内容移植到了 Redis 里面。这只是本人的猜测,未必是作者的初衷。如果读者有什么不同的想法,可以在评论区一起参与讨论。

参考文章:
https://blog.csdn.net/enmotech/article/details/81230531
http://www.hellokang.net/redis/message-queue-by-redis.html

Redis 怎么做消息队列?

知乎:
https://www.zhihu.com/question/20795043
本文由 简悦 SimpRead 转码, 原文地址 www.zhihu.com
程序人生

消息队列

首先做简单的引入。MQ 主要是用来:

  • 解耦应用
  • 异步化消息
  • 流量削峰填谷

目前使用的较多的有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ 等。

网上的资源对各种情况都有详细的解释,在此不做过多赘述。本文仅介绍如何使用 Redis 实现轻量级 MQ 的过程。

为什么要用 Redis 实现轻量级 MQ?

在业务的实现过程中,就算没有大量的流量,解耦和异步化几乎也是处处可用,此时 MQ 就显得尤为重要。但与此同时 MQ 也是一个蛮重的组件,例如我们如果用 RabbitMQ 就必须为它搭建一个服务器,同时如果要考虑可用性,就要为服务端建立一个集群,而且在生产如果有问题也需要查找功能。在中小型业务的开发过程中,可能业务的其他整个实现都没这个重。过重的组件服务会成倍增加工作量。

所幸的是,Redis 提供的 list 数据结构非常适合做消息队列。

但是如何实现即时消费?如何实现 ack 机制?这些是实现的关键所在。

如何实现即时消费?

网上所流传的方法是使用 Redis 中 list 的操作 BLPOP 或 BRPOP,即列表的阻塞式 (blocking) 弹出。

让我们来看看阻塞式弹出的使用方式:

BRPOP key [key ...] timeout

此命令的说明是:

1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 2、当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。

另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。

以此来看,列表的阻塞式弹出有两个特点:

1、如果 list 中没有任务的时候,该连接将会被阻塞 2、连接的阻塞有一个超时时间,当超时时间设置为 0 时,即可无限等待,直到弹出消息

由此看来,此方式是可行的,但此为传统的观察者模式,业务简单则可使用,如 A 的任务只由 B 去执行。但如果 A 和 Z 的任务,B 和 C 都能执行,那使用这种方式就相形见肘。这个时候就应该使用订阅 / 发布模式,使业务系统更加清晰。

好在 Redis 也支持 Pub/Sub(发布 / 订阅)。在消息 A 入队 list 的同时发布(PUBLISH)消息 B 到频道 channel,此时已经订阅 channel 的 worker 就接收到了消息 B,知道了 list 中有消息 A 进入,即可循环 lpop 或 rpop 来消费 list 中的消息。流程如下:

Redis实现消息中间件 - 图9

其中的 worker 可以是单独的线程,也可以是独立的服务,其充当了 Consumer 和业务处理者角色。下面做实例说明。

即时消费实例

示例场景为:worker 要做同步文件功能,等到有文件生成时立马同步。

首先开启一个线程代表 worker,来订阅频道 channel:

@Service
public class SubscribeService {

    @Resource
    private RedisService redisService;
    @Resource
    private SynListener synListener;//订阅者

    @PostConstruct
    public void subscribe() {
        new Thread(new Runnable() {

            @Override
            public void run() {
                LogCvt.info("服务已订阅频道:{}", channel);
                redisService.subscribe(synListener, channel);
            }
        }).start();

    }
}

代码中的 SynListener 即为所声明的订阅者,channel 为订阅的频道名称,具体的订阅逻辑如下:

@Service
public class SynListener extends JedisPubSub {

    @Resource
    private DispatchMessageHandler dispatchMessageHandler;

    @Override
    public void onMessage(String channel, String message) {
        LogCvt.info("channel:{},receives message:{}",channel,message);
        try {
            //处理业务(同步文件)
            dispatchMessageHandler.synFile();
        } catch (Exception e) {
            LogCvt.error(e.getMessage(),e);
        }
    }
}

处理业务的时候,就去 list 中去消费消息:

@Service
public class DispatchMessageHandler {

    @Resource
    private RedisService redisService;
    @Resource
    private MessageHandler messageHandler;

    public void synFile(){
        while(true){
            try {
                String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());
                if (null == message){
                    break;
                }
                Thread.currentThread().setName(Tools.uuid());
                // 队列数据处理
                messageHandler.synfile(message);
            } catch (Exception e) {
                LogCvt.error(e.getMessage(),e);
            }
        }
    }

}

这样我们就达到了消息的实时消费的目的。

如何实现 ack 机制?

ack,即消息确认机制 (Acknowledge)。

首先来看 RabbitMQ 的 ack 机制:

  • Publisher 把消息通知给 Consumer,如果 Consumer 已处理完任务,那么它将向 Broker 发送 ACK 消息,告知某条消息已被成功处理,可以从队列中移除。如果 Consumer 没有发送回 ACK 消息,那么 Broker 会认为消息处理失败,会将此消息及后续消息分发给其他 Consumer 进行处理 (redeliver flag 置为 true)。
  • 这种确认机制和 TCP/IP 协议确立连接类似。不同的是,TCP/IP 确立连接需要经过三次握手,而 RabbitMQ 只需要一次 ACK。
  • 值的注意的是,RabbitMQ 当且仅当检测到 ACK 消息未发出且 Consumer 的连接终止时才会将消息重新分发给其他 Consumer,因此不需要担心消息处理时间过长而被重新分发的情况。

那么在我们用 Redis 实现消息队列的 ack 机制的时候该怎么做呢?

需要注意两点:

  1. work 处理失败后,要回滚消息到原始 pending 队列
  2. 假如 worker 挂掉,也要回滚消息到原始 pending 队列

上面第一点可以在业务中完成,即失败后执行回滚消息。

实现方案

(该方案主要解决 worker 挂掉的情况)

  1. 维护两个队列:pending 队列和 doing 表(hash 表)。
  2. workers 定义为 ThreadPool。
  3. 由 pending 队列出队后,workers 分配一个线程(单个 worker)去处理消息——给目标消息 append 一个当前时间戳和当前线程名称,将其写入 doing 表,然后该 worker 去消费消息,完成后自行在 doing 表擦除信息。
  4. 启用一个定时任务,每隔一段时间去扫描 doing 队列,检查每隔元素的时间戳,如果超时,则由 worker 的 ThreadPoolExecutor 去检查线程是否存在,如果存在则取消当前任务执行,并把事务 rollback。最后把该任务从 doing 队列中 pop 出,再重新 push 进 pending 队列。
  5. 在 worker 的某线程中,如果处理业务失败,则主动回滚,并把任务从 doing 队列中移除,重新 push 进 pending 队列。

总结

Redis 作为消息队列是有很大局限性的。因为其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术,只有对业务最友好的技术,谨此献给所有 developer。