使用场景,可以用来做什么

  • 应用解耦
  • 系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、

物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异
常,影响用户使用体验。

  • 流量削峰
  • 应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求

缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
举例:业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能
的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

  • 数据分发
  • 通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需

要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

执行流程

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上
    来,相当于一个路由控制中心。
    2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前
    Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic
    跟Broker的映射关系。
    3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在
    发送消息时自动创建Topic。
    4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从
    NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,
    然后与队列所在的Broker建立长连接从而向Broker发消息。
    5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在
    哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息

    消费模式push和pull

    在具体实现时,PushPull模式本质都是采用消费端主动拉取的方式,即consumer轮询从
    broker拉取消息。

    push模式

    即MQServer主动向消费端推送
    好处就是实时性高。不好处在于消费端的处理能力有限,当瞬间推送很多消息给消费端时,容易造
    成消费端的消息积压,严重时会压垮客户端。

    pull模式

    消费端在需要时,主动到MQ Server拉取
    好处就是主动权掌握在消费端自己手中,根据自己的处理能力量力而行。缺点就是如何控制Pull的
    频率。定时间隔太久担心影响时效性,间隔太短担心做太多“无用功”浪费资源。比较折中的办法就是长
    轮询。

Push模式与Pull模式的区别:

Push方式里,consumer把长轮询的动作封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
Pull方式里,取消息的过程需要用户自己主动调用,首先通过打算消费的Topic拿到
MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次
取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
RocketMQ使用长轮询机制来模拟Push效果,算是兼顾了二者的优点。

角色和相关术语

product生产者
cusmer消费
broker存储消息,类似与邮局

二:RocketMQ高级特性及原理

1:消息发送

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。
比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。
默认使用的是DefaultMQProducer类,DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。可以通过该类提供的getter/setter方法,调整发送者的参数。
发送消息要经过5个步骤
1.1 设置Producer的GroupName.
1.2 设置InstanceName,当一个JVM需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”.
3:设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数
4:设置NameServer的地址
5:组装消息并发送

消息发生返回状态有如下4种
FLUSH_DISK_TIMEOUT 同步: 表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置为SYNC_FLUSH(同步刷盘)才会报这个错误)

FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置为SYNC_MASTER(同步复制)方式,没有在设定时间内完成主从同步

SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOT类似,表示在主备方式下,并且Broker被设置为SYNC_MASTER(异步复制),但是没有找到被设置成slave的broker.

SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略,主从策略来定。这个状态还可理解为,没有发生上面3个问题状态就是SEND_OK;

提升写入的性能
发送一条消息出去要经过3步
1:客户端发送请求到服务器
2:服务器处理该请求
3:服务器向客户端返回应答
一次消息的发送耗时是上述三个步骤的总和。
对一些对速度要求高,但是可靠性要求不高的场景下,比如:日志收集类应用,可以采用Oneway方式发送
Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。用这种方式发送消息的耗时可以缩短到微秒级。
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,
我们不用担心多Producer同时写入会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞地数据刷入文件系统中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都保持较高地写入性能。
目前在阿里内部经过调优地服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法

2:消息消费

几个要点
1:消息消费方式(pull和push)
2:消息消费的模式(广播模式和集群模式)
3:流量控制(可以结合sentiel来实现)
4:并发线程数设置
5:消息的过滤(Tag,Key)
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有3种方法可以提高Connsumer的处理能力
1:提高消费并行度
在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并发度。
通过加机器,或者在已有机器中启动多个ConSumer进程都可以增加Consumer实例数。
注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。
此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumerThreadMin和consumeThreadMax)。
2:以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的ConsumerMEssageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
3:检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除推积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。

3:消息存储

3.1 存储介质

  1. - **关系型数据库DB**
  • Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
  1. - **文件系统**

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率高可靠性高性能数据持久化方式除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

3.2性能对比

文件系统 > 关系型数据库DB

3.3 消息的存储和发送

1)消息存储

目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度
但是磁盘随机写的速度只有大概100KB/s和顺序写的性能相差6000倍!
因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。

2)存储结构

RocketMQ消息的存储是由ConsumeQueueCommitLog配合完成 的,消息真正的物理存储文件
是CommitLogConsumeQueue是消息的逻辑队列类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

4:过滤消息

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的
RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储实现的,Consumer端订阅消息使需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后在从CommitLog里面读取真正的消息实体内容,所以说到底也绕不开其存储结构。
ConsumeQueue的存储结构:有8个字节存储的MessageTag的哈希值,基于Tag的消息过滤正式基于这个值的。
主要有2种过滤方式

4.1:Tag过滤方式

:Consumer端在订阅消息时除了指点Topic还可以指代那个TAG,如果有一个消息有很多个Tag,可以用“||”分割。

4.1.1:Consumer(消费端)端会将这个订阅请求构建成一个SubscriptionData(应该是“订阅数据”的意思),发送一个pull(拉取)消息的请求给Broker端。
4.1.2:Broker端从RocketMQ的存储层—(Store)读取数据之前,会用这些数据先构建一个MessageFilter(消息过滤),然后传给Store(存储层)。
4.1.3:Store(存储层)从ConsumerQueue读取一条记录后,会用它记录的消息 tag hash值取过滤。
4.1.4:在服务端只是根据hahsCode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

4.2:SQL92的过滤方式

仅对push的消费者起作用
tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store(存储层)层的具体过滤过程不太一样
真正的SQL expression的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。
SQL92的表达式上下文为消息的属性。
conf/broker.conf(broker的配置文件里面开启)
image.pngimage.png
RocketMQ仅定义了几种基本的语法,用户可以扩展:

  1. 数字比较: >, >=, <, <=, BETWEEN, =
  2. 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;
  3. 逻辑比较: AND, OR, NOT;
  4. Constant types are: 数字如:123, 3.1415; 字符串如:’abc’,必须是单引号引起来 NULL,特
    殊常量 布尔型如:TRUE or FALSE;

4.3:Filter Server方式

这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据java函数的逻辑对消息进行过滤。
要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Num=3这样的配置。Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的java函数进行过滤,过滤后的消息在传给远端的Consumer。
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存,创建线程等这样的操作,否则容易造成Broker服务器宕机。

5:零拷贝原理

零拷贝小结(zero copy)小结
1:虽然叫做零拷贝,实际上sendfile有2次数据拷贝的。第一次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(引擎协议)。如果网卡支持5G-DMA(The Scatter-Gather Direct Memory Access)技术,就无需从pageCache拷贝至Socket缓冲区。
2:之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设
备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;
3. Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、
FileChannel.map()等;
4. Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操
作系统零拷贝特性。

6:同步复制和异步复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制
方式。

6.1:同步复制

同步复制是等Master和Salve都写入成功,才给客户端反馈成功状态
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易回复,但是同步复制会曾大数据写入延迟,降低系统吞吐量。

6.2:异步复制

异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;

6.3:配置

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置
成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
/opt/rocket/conf/broker.conf 文件:Broker的配置文件
image.png
image.png
image.png

6.4:总结

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH(同步刷盘)方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH(一部刷盘)的刷盘 方式,主从之间配置成SYNC_MASTER(主从同步复制)的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

7:高可用机制

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Salve的区别:
1:在Broker的配置中,参数brokerId的值为0,表明这个Broker是Master
2:大于0表明这个Broker是Slave
3:brokerRole参数也说明这个Broker是Master还是Salve
(SYNC_MASTER/ASYNC_MASTER/SALVE)
4:Master角色的Broker支持读和写,Slave角色的Broker仅支持读
5:Consumer可以链接Master角色的Broker,也可以链接Slave角色的Broker来读取消息。

7.1:消息消费的高可用

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁
忙的时候,Consumer会被自动切换到从Slave 读。
有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从
Slave读取消息,不影响Consumer程序。
这就达到了消费端的高可用性。
Consumer会自动切换去读取Master还是Slave。达到了消费端的高可用

7.2:消息发送的高可用

如何达到发送端的高可用性呢?

8:刷盘机制

8.1:同步刷盘

8.2:异步刷盘

9:负载均衡

10:消息重试

11:死信队列

12:延迟消息

messagedelaylevel,18个级别,可以设置时间
(注:redis延迟队列与mq延迟队列的区别和应用场景)

13:顺序消息

14:事务消息

15:消息查询

16:消息优先级

17:底层网络通信 -Netty

18:限流

三:高级实战

3.1:生产者

3.2:消费者

3.3:Broker

3.4:NameServer

3.5:客户端配置

3.6:系统配置

3.7:动态扩缩容

3.8:各种故障对消息的影响


四:RocketMQ集群与运维


五:源码剖析

5.1:环境

5.2:NameServer

5.3:Producer

5.4:消息存储

5.5:Consumer