MQ介绍

什么是MQ?

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
image.png
image.png
MQ的作用主要有以下三个方面:
解耦
1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消 费,并且消费者的增加或者减少对生产者没有影响。
image.png
image.png
异步
异步能提高系统的响应速度、吞吐量。
image.png
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
image.png
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
削峰
以稳定的系统资源应对突发的流量冲击。
image.png
image.png
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。

MQ的优缺点

上面MQ的所用也就是使用MQ的优点。 但是引入MQ也是有他的缺点的:

  • 系统可用性降低

系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑 如何保证MQ的高可用。

  • 系统复杂度提高

引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异 步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不 会被重复调用?怎么保证消息的顺序性等问题。

  • 消息一致性问题

A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处 理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。

几大MQ产品特点比较

常用的MQ产品包括Kafka、RabbitMQ和RocketMQ。我们对这三个产品做下简单的比较,重点需要 理解他们的适用场景。
image.png

RocketMQ集群架构

RocketMQ整体理解 - 图10

角色

一个完整的RocketMQ集群中,有如下几个角色

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于是Topic的分区(存消息的最小单位);用于并行发送和接收消息

image.png

使用

编程模型

RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,掌握这个固定的步骤,对于我们学习源码以及以后使用都是很有帮助的。

  • 消息发送者的固定步骤

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer

  • 消息消费者的固定步骤

1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer

消息样例

基本样例

基本样例部分使用消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送。
同步发送消息:等待消息返回后再继续进行下面的操作。
异步发送消息:引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往 Broker发送消息的时候也要作为服务端提供服务。
单向发送消息:使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。 就是只管把消息发出去就行了。
使用消费者消费消息:消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。

顺序消息

顺序消息包含两种类型:
局部(分区)顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费
RocketMQ保证的是消息的局部有序,而不是全局有序。

广播消息

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态 (MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟 kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。

延迟消息

  1. public class ScheduledMessageProducer {
  2. public static void main(String[] args) throws Exception {
  3. // Instantiate a producer to send scheduled messages
  4. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  5. // Launch producer
  6. producer.start();
  7. int totalMessagesToSend = 100;
  8. for (int i = 0; i < totalMessagesToSend; i++) {
  9. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  10. // This message will be delivered to consumer 10 seconds later.
  11. message.setDelayTimeLevel(3);
  12. // Send the message
  13. producer.send(message);
  14. }
  15. // Shutdown producer after use.
  16. producer.shutdown();
  17. }
  18. }

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一 段时间再发送出去。这是RocketMQ特有的一个功能。
那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3); 开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支 持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

过滤消息

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。
主要是看消息消费者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 这句只订阅TagA 和TagC的消息。
TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时, 一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。
但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。这时候,可以使用SQL表达式来对消息进行过滤。
这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的 TAGS和一个在生产者中加入的a属性。
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:’abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

事务消息

这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。
首先,我们了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起 成功或者一起失败。
其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。
image.png
事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
最后,我们还需要思考下事务消息的作用。
大家想一下这个事务消息跟分布式事务有什么关系?为什么扯到了分布式事务相关的两阶段提交上 了?事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务, RocketMQ提供的事务消息也是目前业内最佳的降级方案。