消息队列

消息队列是在分布式系统中专门用于消息的发送和广播的组件,消息队列把消息从消息发送端路由到消息接收端。一个消息系统包括消息发送端、消息服务端和消息接收端三大部分组成。消息发送端将需要被接收端处理的消息发送出去。消息服务端接收到消息后,根据配置的路由规则,把消息分发到监听了这个消息的接收端去。消息接收端在接收到消息后,通过相应的业务逻辑来进行消息的处理。

消息队列:rabbitmq - 图1 消息队列:rabbitmq - 图2消息队列一般用于应用解耦、流量削峰和异步处理三个场景:

应用解耦

在应用中,不需要把一个业务的全部处理都写在一个应用中,只需要把当前的消息发送出去,需要进行处理的应用接收到消息后,再对业务进行处理。在实现一个功能的时候,就可以把业务的处理分到特定的应用上去,对这个应用进行了解耦,每个应用只需要处理自己的部分即可。例如在会员系统中,会员的等级从2升到了3,会有很多的业务需要进行处理,我们发送一个 {“会员id”: 10, “旧等级”: 2, “新等级”: 3} 的消息到消息队列中,短信应用在接收到这个消息后,发送短信通知用户会员等级升级,积分应用在接收到这个消息后给用户加10个积分,各个业务只需要在接收到消息后处理自己的部分。

消息队列:rabbitmq - 图3

流量削峰

在业务的高峰期,大量的数据库写或者其他的IO操作,很容易造成数据库服务或者其他组件不可用。不直接对数据库进行写或者不直接做其他IO操作,而是把当前操作当成消息发送到消息队列中,消息有比它们高几个数量级的吞吐量,可以暂时缓存大量的这些操作。接收到操作的消息后,一条一条的分发给接收端进行处理,这样就可以避免流量的冲击导致服务不可用。例如在秒杀的服务中,我们把这些秒杀的请求都先发送消息 {“用户id”: 10, “商品id”: 20}到消息队列中。秒杀处理端可以一条一条的进行处理,前1000条消息秒杀成功,后面的消息就秒杀失败。如果这些请求都直接由数据库进行处理,在秒杀场景下很容易就歇菜了。

消息队列:rabbitmq - 图4

异步处理

用户的请求往往的需要快速响应的,有的请求可能包含很耗时的异步操作,如果一直等待耗时操作完成后再返回,会导致用户等待时间过长和系统资源长期被占用并发量大大下降。对于耗时的操作,一般通过消息队列把操作消息发送出去,消息发送成功后直接返回操作成功的请求结果,消息接收端接收到消息后,再进行该耗时操作处理。例如在注册用户时,需要发送验证码校验手机。发送验证码一般需要请求第三方接口进行发送,http的请求时间在200ms左右,大量的这些请求在等待,系统的并发性会大大下降。我们通过发送 {“手机”: 137xxxxxxxx, “验证码”: 123456} 的消息到短信发送应用中,就直接返回发送成功,短信接发送应用接收到消息后再进行耗时的http请求操作。

消息队列:rabbitmq - 图5

rabbitmq

rabbitmq是由专门用于高并发和分布式开发的Erlang语言实现的消息队列,rabbitmq特点是可靠性,使用消息接收确认和持久化到硬盘等机制确保消息不会丢失,rabbitmq的吞吐量低于以高吞吐为设计目的的kafka,在进行消息队列选型时,应该针对不同的应用场景进行选型。在线事务型的应用选择rabbitmq,取保数据不丢失,如果是进行日志的收集,要求高性能和能容忍一定的消息丢失则选择kafka。

名词解释

  • virtual host:虚拟主机,多租户系统,每一个虚拟主机都是独立的
  • exchange:交换机,用于分发消息到queue,没有匹配到queue则丢弃消息
  • queue:队列,rabbitmq内部对象,用于存储消息
  • routing key:路由键,用于绑定exchange和queue

    exchange类型

    direct

    通过routing key匹配消息到queue,即routing key名称与queue名称一样,使用routing key名直接发送消息到queue,接收端直接监听该名称的queue。默认的exchange为direct类型,不通过声明在特定exchange的queue,直接声明的queue会被绑定到默认的direct交换机中。

消息队列:rabbitmq - 图6

topic

消息接收端通过#或者,#匹配一个或多个关键词,只能匹配一个关键词,来接收交换机不同routing key的消息。如监听test.#可以接收到test.a、test.a.1和test.b routing key的消息,而test.*只能监听到test.a和test.b。

消息队列:rabbitmq - 图7

fanout

把消息转发给全部的队列,不需要绑定routing key,即绑定在fanout exchange下的queue,在有消息的时候,会转发给全部的这些queue。消息接收端监听了不同的queue后,就可以接收到fanout广播过来的消息,如果多个节点监听了同一个queue,只会收到一次消息。

消息队列:rabbitmq - 图8

headers

根据headers属性来进行转发,在业务中很少使用,不展开说明。

消息

普通消息

一对一的消息发送,使用direct exchange,一般不需要绑定队列到exchange中,使用默认的exchange即可,可以直接发送和监听。普通消息可以设置如下的属性:

  • auto delete:自动删除,在消息监听端全部下线后,即当前该queue没有消费监听,服务器自动删除该queue。
  • durable:持久化,消息发送后未消费成功的持久化到硬盘中,避免宕机造成消息丢失。

    广播消息

    一对多的消息发送,可以使用topic exchange或者fanout exchange,需要重新声明exchange使用相应的类型。广播消息中queue可以匹配到多条不同的消息,对于不同的节点监听了相同的queue,在广播类型的消息中也不会接收到多次,只有一个queue可以接受到。如果需要一条广播消息被queue多次处理,需要监听不同的queue。

    死信消息

    死信消息是指未能成功被接收端消费,存在以下几种情况:

  • 没有对应的routing key导致转发失败

  • 转发到消息接收端,消息接收端消息响应失败
  • 转发到queue,超过设置超时时间

死信消息一般用于处理重要业务,在消费消息失败后,仍要继续成功的处理该消息,如支付业务消费支付响应结果失败,转发消息到死信队列中,死信队列接收端把该响应结果先存储到数据库等待后续再进行处理。

消息队列:rabbitmq - 图9

延时消息

延时消息是指消息发送后没有马上转发给消息接收端,而是等到设定好的延时时间后再转发到消息接收端,一般用于需要延时进行处理的业务。如购物后2小时内不支付则处理为订单支付超时,如果每次判断通过定时任务来全表扫描超时的支付订单,是非常低效率的,甚至在大的数据量下会造成数据库不可用。可以通过往消息队列中发送延时消息,在2小时后接收到这条消息再去数据库获取相应的订单判断有没有支付成功,如果没有支付成功处理为支付超时。

消息队列:rabbitmq - 图10rabbitmq的延时消息一般可以使用设定超时时间为延时时间的死信队列来模拟或者使用延时消息的插件,对于高版本 > 3.6支持延时消息插件则建议使用延时插件。