整体认识rocketmq
架构:
概念:
Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
Topic:消息主题,负责标记一类消息,生产者将消息发送到Topic,消费者从该Topic消费消息
Broker:消息中转角色,负责存储消息,转发消息,一般也称为 Server,在 JMS 规范中称为 Provider
NameServer:服务发现Server,用于生产者和消费者获取Broker的服务;
Rocketmq模块划分:
名称 | 作用 |
---|---|
broker | broker模块:c和p端消息存储逻辑 |
client | 客户端api:produce、consumer端 接受与发送api |
common | 公共组件:常量、基类、数据结构 |
tools | 运维tools:命令行工具模块 |
store | 存储模块:消息、索引、commitlog存储 |
namesrv | 服务管理模块:服务注册topic等信息存储 |
remoting | 远程通讯模块:netty+fastjson |
logappender | 日志适配模块 |
example | Demo列子 |
filtersrv | 消息过滤器模块 |
srvutil | 辅助模块 |
filter | 过滤模块:消息过滤模块 |
distribution | 部署、运维相关zip包中的代码 |
openmessaging | 兼容openmessaging分布式消息模块 |
Rocketmq高可用:
特性:
Producer端:
发送方式:
Sync:同步的发送方式,会等待发送结果后才返回
Async:异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的3000ms.
Oneway:比较简单,发出去后,什么都不管直接返回。Ps:日志
发送结果:
org.apache.rocketmq.client.producer.SendStatus
SEND_OK,:消息发送成功
FLUSH_DISK_TIMEOUT,消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT,消息发送成功,但是服务器同步到Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE,
消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
普通消息:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
1、准备工作 mesasge、网络相关、线程相关
2、从namesrv获取topic路由(缓存机制)
3、组装数据,broker需要的序列化数据(json)
4、Netty发送(源码)
定时消息
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。(第三方 job 步长)
固定精度:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel
顺序消息:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
场景:订单》下单》支付》配送》签收
底层原理:4个队列,一个订单下面不同状态的消息是顺序的只需要发到一个队列中
org.apache.rocketmq.client.producer.MessageQueueSelector如何选择一个队列
事务消息
事务:ACID xa 二段阶段三阶段 tcc 阿里
http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/
Consumer端:
消费模型:
org.apache.rocketmq.common.protocol.heartbeat.MessageModel#BROADCASTING
org.apache.rocketmq.common.protocol.heartbeat.MessageModel#CLUSTERING
消费选择:
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_LAST_OFFSET
第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_FIRST_OFFSET
第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_TIMESTAMP
第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
消息重复幂等:
RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重
Ps:见开发文档
rokectmq开发指南.pdf
为什么要有组group:?
sh mqshutdown broker
sh mqshutdown namesrv