消息中间件: 关注于数据的发送和接收, 利用高效可靠的异步消息机制集成分布式系统
RocketMQ 官方文档: https://rocketmq.apache.org/docs/quick-start/RocketMQ
中国开发者中心:http://rocketmq.cloud/zh-cn/
Kafka 官方文档: http://kafka.apache.org/documentation/
RabbitMQ 官方文档: https://www.rabbitmq.com/documentation.html
一、消息队列的好处(为啥用)
解耦,呼叫系统需要向质检,营帐,crm等推送主从话单,以后使用mq和未来更多的产品进行解耦,只推送消息到mq,不用管太多事
异步,一个请求会调用多个系统,处理时长很长,产品让请求需要在200ms以内,使用mq发送请求消息到mq队列,并返回结果,各个系统自己提取参数在各自本地进行操作
削峰,上午各个公司座席上班时是高峰期,每秒有上千需要处理的话单产生,容易造成pgsql宕机,但是午饭后下午时间是低峰期,使用mq后,请求都打到mq中,系统再从mq中拉取,高峰期积压,低峰时再消费
应用:
作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
连接流计算任务和数据;
用于将消息广播给大量接收者。
二、消息队列的缺点
引入消息队列后,需要考虑MQ的可用性,万一MQ崩溃了岂不是要爆炸?
而且复杂性明显提高了,需要考虑一些消息队列的常见问题和解决方案,
还有就是一致性问题,一条消息由多个消费者消费,万一有一个消费者消费失败了,就会导致数据不一致。
1. 系统复杂性增加
系统可用性下降,一旦MQ出现问题,系统崩溃
如何保证消息队列高可用
- RabbitMQ高可用:
他不是分布式的,(单机模式,普通集群模式,镜像集群模式)
普通集群模式: 实际数据在master上,别的从mater拉取
镜像集群模式: 每个节点都有真实数据,并同步(如何开启?在后台管理开启径向集群策略)- Kafka高可用:
纯分布式,每台带有Kafka的服务器都是Broker节点,基于zookeeper
通过partition的副本保证高可用,当leader死掉,follower会自动称为leader
2. 消息出现问题
如何解决消息重复消费问题 ?或如何解决消息的幂等性?
一般MQ不能保证消息重复
KafKa消息重复如何解决: 消息有个offset代表消息顺序,消费者定期将offset提交到zookeeper告诉MQ已经消费到哪里了,由于定期提交,还是会消费重复数据
解决思路:
- 插入数据库前先查库,存在则不插入,丢弃
- 存入redis的set类型,天然幂等性
- 消费者将消息一定数量的唯一id放入redis缓存中,如果缓存中有,就不进行DB操作,如果没有,插入redis并操作
- 根据数据库唯一键,保证数据库不会插入成功
消息是如何可靠传输的?如果丢了怎么办?
- RabbitMQ丢失消息
- 生产者网络等原因丢失
- channel.txSelect()开启事务, tx.Rollback后重发
- 调成confirm模式,如果失败MQ会回调,重发
- MQ挂掉丢失
- 消息持久化到磁盘,将queue设置为持久化,消息也设置成持久化的
- 消费者执行异常
- 关闭autoAck,不自动告诉MQ已经消费消息,自行发送ACK给MQ
- 生产者网络等原因丢失
- Kafaka处理消息丢失
- 生产者传到MQ后,MQ leader宕机,还未同步数据(切换leader场景)
- 设置4个参数
- 消费者丢失消息
- 生产者传到MQ后,MQ leader宕机,还未同步数据(切换leader场景)
RabbitMQ 如何保证消息顺序性 需要保证顺序的数据放到同一个queue里
积压了几个小时的消息,怎么解决?
Kafka
在消费端使用内存队列,队列里的数据使用 hash 进行分发,每个线程对应一个队列,这样可以保证数据的顺序。
https://www.cnblogs.com/yoke/p/11477167.html
三、技术选型
- 首先,必须是开源的产品,这个非常重要。开源意味着,如果有一天你使用的消息队列遇到了一个影响你系统业务的 Bug,你至少还有机会通过修改源代码来迅速修复或规避这个 Bug,解决你的系统火烧眉毛的问题
- 产品必须是近年来比较流行并且有一定社区活跃度的产品.比较容易在网上搜索到类似的问题,然后很快的找到解决方案。流行的产品与周边生态系统会有一个比较好的集成和兼容
- 业内常用的MQ有哪些?每一种MQ各自的表现如何?
- 这些MQ在同等机器条件下,能抗多少QPS(每秒抗几千QPS还是几万QPS)?
- 性能有多高(发送一条消息给他要2ms还是20ms)?
- 可用性能不能得到保证(要是MQ部署的机器挂了怎么办)?
ActiveMQ | RabbitMQ | RocketMQ | Kafka | Pulsar | |
---|---|---|---|---|---|
源代码 | erlang AMQP 协议 |
Java | Scala | ||
单机吞吐量QPS | 万级, | 万级 | 10w | 10w 吞吐量极高 | |
时效性 | ms毫秒 | 微妙级 | ms | 需要topic不要过多,会大幅度影响性能 | |
是否支持事务 | 支持 | 不支持 | 不支持 | ||
可用性 | 主从架构高可用 | 支持集群化 高可用部署架构 |
分布式 | 分布式,任意扩展 | |
可靠性 | 有低概率丢消息,不支持高并发,高负载,高吞吐的复杂场景,在互联网公司应用较少 | 超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。 | 非常高分布式。基于ZK,可以动态扩容 | ||
功能性 | 异步调用和系统解耦 功能极其完备 |
功能较为完善。 | 支持各种高级功能 延迟消息、事务消息、消息回溯、死信队列、消息积压等等 |
极少的核心功能 | |
缺点 | 不完全可靠,开发不活跃 | 消息中间件的功能明显较少一些 丢数据, 因为数据先写入磁盘缓冲区,未直接落盘。机器故障会造成数据丢失 |
|||
总结 | ActiveMQ是老牌的消息中间件,传统企业应用广泛 | 有非常完善便捷的后台管理界面 可以使用 轻量级的消息队列,非常容易部署和使用 |
基于Java语言开发的,适合深入阅读源码 低延迟和金融级的稳定性 |
处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据 或是应用场景大量使用了大数据、流计算相关的开源产品 |
存储和计算分离的设计 |
实现技术
网络通信
序列化反序列化
一致性分布协议
分布式事务
异步编程模型
数据压缩
内存管理
文件和高性能IO
高可用分布式系统
四、 标准和协议
JMS简介
JMS的全称是Java Message Service,即Java消息服务。<br />它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。<br />把它应用到实际的业务需求中可以在特定的时候利用生产者生成消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。<br />对于消息的传递有两种类型:<br />一种是点对点的,即一个生产者和一个消费者一一对应<br />另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS编程模型
(1) ConnectionFactory
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
(2) Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。
(3) Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
(4) Session
Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
(5) 消息的生产者
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
(6) 消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
(7) MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
AMQP
MQTT和OpenMessageing
五 应用场景
日志
监控
微服务
流计算
ETL
IoT
六、一般的消息传输模式
- 队列模式
一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。
- 主题模式
对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。