基本介绍
Message Queue消息队列,简称MQ,其本质为队列,用来存储消息的容器。作为分布式、微服务体系中经常用到的一门技术,使用消息队列具备以下优势:
- 先进先出:作为队列,消息队列肯定是满足先进先出的特性,消息的顺序在入队时刻确定,在高并发中某一时刻也只会有一条消息被消费掉,因此在某种程度上,可将系统的压力堆积在消息队列上。
- 发布订阅:这一特性提高了系统的执行效率,对于上下游消息通知模式,下游多端可以同时对传递的消息进行处理。
- 可持久化:可以持久化的特性,极大程度提高了消息队列在使用过程中的容错性,即使服务宕机,在重启后依然可以不丢失数据。
- 分布式:作为高并发场景下的中间件,一定是要求可以满足分布式集群化部署的,同时在缓解应用服务的压力同时,消息队列也需要进行高可用容错。
当然,任何一门技术的引入,也将带来一定的风险,消息队列具有诸多优势,同时也需要注意引入消息队列带来的问题:
- 系统复杂度提高:传统的服务调用由同步的上下游模式转变为引入一层中间件的形式,需要考虑数据消费问题,编写代码的复杂性和业务场景复杂度也随之升高,在引入消息队列时,一定是引入成本远大于开发维护成本。
- 数据一致性问题:多端交互时,上下游不能及时反馈,如果消息丢失,消息重复消费如何处理,引入消息队列需要保证整个服务的数据最终一致性问题,异步处理的过程也要容忍短暂的中间态,对数据没有强一致性要求。
最后说明的是需要引入消息队列的场景,还需要满足,不需要等待返回结果。
使用场景
异步解耦
所谓消息队列的异步解耦可以理解为两个不同的特性,如下图所示:
相比于传统调用方式,在MQ调用中,A系统接收用户请求,如果需要对接其他系统,无需等待同步调用结果,可以将消息投递给消息队列,后续业务逻辑由消息队列顺序执行,如果有新的业务系统加入,也可以尽可能小的改变A系统的代码,便于扩展,从而达到异步解耦的功能,当然前提是不需要整个系统间的处理结果立即响应,允许短暂的不一致。
削峰填谷
MQ的另一大应用场景就是削峰,当大量请求涌入的时候,QPS会瞬间激增,单纯依靠系统之间的调用会使得应用服务器承受高并发,借助于消息队列,可以较大程度的迎接高并发,但是挤压的消息,在高峰期过后需要维持一个相对稳定的消费速度去处理消息,直到积压的消息消费完,这个过程被称作填谷。如下图所示:
所谓的削峰填谷,是为了解决高并发场景下,请求激增的一种处理方式。
消息通知
消息通知是消息队列最基本的功能,一般消息队列都会包含,点对点、发布订阅、主题等多重消息通知模型。这也是基于消费者生产者模型来实现,无论是异步解耦还是削峰填谷,都需要根据具体的场景选择合适的消息通知模型。
这里借用RabbitMQ官网提供的几种MQ工作模式示意图:
这是一种最简单的消息队列模型,也被成为点对点模型,消息的生产者和消费者是确定的,消息的投递也是确定的。
这是消息队列中的工作队列模式,消息进入队列后会被某一个消费者进行消费,消费者一般是集群等模式部署。
发布订阅模式下的消息会被多个消费者消费,类比于Redis中的发布订阅,当消息需要被广播给多个消费者时一般使用这种方式。
Topic主题,是实际开发中使用较多的一种模型,在RabbitMQ中,Topic是更为灵活的发布订阅方式,更加强大的将消息传递给指定的队列,完成消息分发。
无论是哪一种方式,点对点、发布订阅、工作队列等,在不同的消息队列产品中,实现细节上可能存在不同,各自具备特色,但整体概念共通,无论哪种模式,都是为了实现在不同的存储队列中实现消息资源隔离和资源分发以及最终的消费。
MQ实现
消息队列的功能基本相同,但是到最终的产品实现上是需要有具体的规范的,也就是需要遵循的协议,消息队列中常见的协议有AMQP、MQTT和STOMP三种,都是基于TCP/IP的消息传递协议。
https://www.jianshu.com/p/1cdddd2fb6da
协议
AMQP
AMQP,即高级消息队列协议(Advanced Message Queuing Protocol),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
核心角色如下:
- Message(消息):消息服务器处理消息的原子单元,包括一个内容头,一组属性和一个内容体。
消息有优先级,高优先级的消息在等待同一消息队列时会比低优先级的消息先发送,而且当消息必须被丢弃时,低优先级的消息优先被丢弃。
使用AMQP协议,消息服务器不能修改内容体和内容头,但可以在内容头上添加额外信息。 - PubLisher(消息生产者):发送消息
- Consumer(消息消费者):消费消息
- Broker(消息代理):消息队列服务器,负责接收客户端连接,路由消息。
- Queue(消息队列):Broker中的一个角色,一个Broker中可以有多个Queue,负责保存消息直到发送给不同的消费者。算是消息的容器。一个消息可以被投入一个或多个队列中,每个队列的消息都会等待消费者连接到这个队列并被取走。
- Exchange(交换路由):Broker中的一个角色,负责接收生产者发送的消息,并路由给服务器中的队列。可以被理解成一个规则表,指明消息该被投到哪个队列中。
- Channel(信道):信道是一条独立的双向数据流通道。为了解决操作系统无法承受每秒建立特别多的TCP连接。
生产者发送消息时,必须指定消息要被路由到哪些个消息队列中。当消息到消息队列中,消息队列会尝试将消息传给消费者,如果失败,消息队列会存储消息并等待消费者。如果没有消费者,消息队列将选择性的将消息返回给生产者。如果消息别消费掉,消息队列会删除消息,删除的过程或者是及时的,或者是等到消费者消费结果后才删除的。
RabbitMQ是AMQP消息队列最有名的开源实现,当然RabbitMQ同时还可以通过插件支持STOMP、MQTT等协议接入。Kafka、RocketMQ均使用自定义的协议。
MQTT
MQTT,即消息队列遥测传输(Message Queuing Telemetry Transport)。由IBM开发,现在被广泛用于物联网公司。因为他的特点就是轻量,简单,开放和易于实现。所以它常用于很多计算能力有限、带宽低、网络不可靠的远程通信应用场景。
核心角色如下:
- Publisher(发布者):消息发布客户端
- Subscriber(订阅者):消息订阅客户端
- Broker(消息代理):消息服务器端
- Application Message(应用消息):指通过网络传输的应用数据,一般包括主题和负载。
- Topic(主题):应用消息的类型,一般消息发布者会确定消息的主题,订阅者根据自己实际情况选择不同的主题进行消息订阅消费。
- Payload(负载):消息订阅者具体接收的内容。
MQTT协议是通过交换预定义的MQTT控制报文来通信的,控制报文内容由三部分组成:固定报头,可变报头和消息体。固定报头通过标识不同位的值来确定报文类型,包括发布订阅的一些完成状态等;可变报头的内容根据控制报文类型不同而不同,常作为包的标识符;消息体也是根据不同的消息类型有着不同的内容。
MQTT协议中,客户端和服务端是通过请求-应答模式通信的。客户端发送一条控制报文数据给服务器,服务器再发送一条控制报文数据给客户端。
MQTT在发布消息时,有三种Qos等级:
- 至多一次(0级)
- 至少一次(1级)
- 只有一次(2级)
至多一次等级最低,客户端只需要将消息发出去即可,这种等级很低,用于消息不重要但特别多,为了减轻通信压力,就不顾质量,只看数量了。
至少一次等级中等,客户端要保证发出去的消息至少一次被服务端接收到,所以要收到服务端的回应,否则一直发,这种等级一般用于服务端有幂等处理,所以不怕重复消费,还要保证消息不会丢失。
只有一次等级最高,客户端先发消息过去,然后本地记录一个我已发送,但不确定你是否收到的状态,然后服务端接收到消息后,回给客户端一个我已接收的报文,同时服务端记录一个我不确定你知不知道我已接收的状态,然后客户端收到这个已接收的消息后,就确定服务端收到这个消息了,于是把自己本地记录的已发送未确定的状态删除,同时再给客户端发送一个我已经知道你收到的报文,服务端收到这个报文,也会把自己之前记录的状态删掉,整个一条报文只有一次的通信才算完成,这种等级就比较严格了,但质量上去了,相对低等级的,数量就会相对小些,但可靠就是王道,不多不少才是最好的。
只有一次的发送和确定,其实思想和三次握手差不多,都是两端互相确认的过程,所以会一来一回的。如果传输过程中出现丢包,都会由发送者重发上一条消息。
STOMP
STOMP,即流文本定向消息协议(Streaming Text Orientated Messaging Protocal),是一个相对简单的文本消息传输协议,主要特点就是简单易懂,没有特别多的套路。
核心角色如下:
- 客户端:既可以是生产者,也可以是消费者
- 服务端:消息中心
ActiveMQ以及它的下一代实现Apache Apollo,是STOMP协议的典型实现。
产品对比
在选择消息队列时往往需要根据实际的场景或者现有技术体系来进行技术选型,关于常见MQ产品实现对比,如下表所示:
产品 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
维护组织 | Rabbit | Apache | Apache(阿里捐赠) | Apache |
开发语言 | Erlang | Java | Java | Scala& Java |
支持协议 | AMQP、MQTT、STOMP、 HTTP&WebSocket |
AMQP、STOMP、MQTT、JMS | 自定义协议 | 自定义协议(基于TCP的高性能协议) |
客户端支持 | Erlang、Java、 .NET/C#、社区提供API等 |
Java、C、C++、Python、.Net、and more | Java、 C++,、Go | Java、Scala、Go、Python、C/C++、REST APIs等 |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) |
延迟 | 微妙 | 毫秒 | 毫秒 | 毫秒 |
特点 | ActiveMQ已经发展为Classic和Artemis两条路线。 |
RocketMQ官方总结的几种产品对比:https://rocketmq.apache.org/docs/motivation/
JMS
JMS是Java Message Service的简称,JMS作为JavaEE十三大技术规范之一,有着举足轻重的地位,即便在各种优秀MQ产品层出不穷的今天,了解基于JMS的MQ规范,也是很有必要的。
JMS
ActiveMQ作为老牌消息队列,完全实现了JMS,发展时间长,产品本身成熟度高,由Apache出品,在大部分场景下可以满足开发需求。但是由于历史局限性原因等,在高并发、高吞吐量的场景下,其设计已不足以满足三高需求,呈现逐渐衰落的趋势。