整体认识rocketmq

架构:

Rocketmq特性详解&场景介绍 - 图1
Rocketmq特性详解&场景介绍 - 图2

整体认识:远程通讯,发送消息,存储消息。
Rocketmq特性详解&场景介绍 - 图3

概念:

Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
Topic:消息主题,负责标记一类消息,生产者将消息发送到Topic,消费者从该Topic消费消息
Broker:消息中转角色,负责存储消息,转发消息,一般也称为 Server,在 JMS 规范中称为 Provider
NameServer:服务发现Server,用于生产者和消费者获取Broker的服务;

Rocketmq模块划分:

名称 作用
broker broker模块:c和p端消息存储逻辑
client 客户端api:produce、consumer端 接受与发送api
common 公共组件:常量、基类、数据结构
tools 运维tools:命令行工具模块
store 存储模块:消息、索引、commitlog存储
namesrv 服务管理模块:服务注册topic等信息存储
remoting 远程通讯模块:netty+fastjson
logappender 日志适配模块
example Demo列子
filtersrv 消息过滤器模块
srvutil 辅助模块
filter 过滤模块:消息过滤模块
distribution 部署、运维相关zip包中的代码
openmessaging 兼容openmessaging分布式消息模块

Rocketmq高可用:

Rocketmq特性详解&场景介绍 - 图4

特性:

Producer端:

发送方式:

Sync:同步的发送方式,会等待发送结果后才返回
Async:异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的3000ms.
Oneway:比较简单,发出去后,什么都不管直接返回。Ps:日志

发送结果:

org.apache.rocketmq.client.producer.SendStatus
SEND_OK,:消息发送成功
FLUSH_DISK_TIMEOUT,消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT,消息发送成功,但是服务器同步到Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE,
消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢

普通消息:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
Rocketmq特性详解&场景介绍 - 图5



Rocketmq特性详解&场景介绍 - 图6
1、准备工作 mesasge、网络相关、线程相关
2、从namesrv获取topic路由(缓存机制)
3、组装数据,broker需要的序列化数据(json)
4、Netty发送(源码)

定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。(第三方 job 步长)
固定精度:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel
Rocketmq特性详解&场景介绍 - 图7

顺序消息:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
场景:订单》下单》支付》配送》签收
底层原理:4个队列,一个订单下面不同状态的消息是顺序的只需要发到一个队列中
org.apache.rocketmq.client.producer.MessageQueueSelector如何选择一个队列

Rocketmq特性详解&场景介绍 - 图8

事务消息

Rocketmq特性详解&场景介绍 - 图9


Rocketmq特性详解&场景介绍 - 图10
事务:ACID xa 二段阶段三阶段 tcc 阿里
Rocketmq特性详解&场景介绍 - 图11

http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/

Consumer端:

消费模型:

org.apache.rocketmq.common.protocol.heartbeat.MessageModel#BROADCASTING
org.apache.rocketmq.common.protocol.heartbeat.MessageModel#CLUSTERING

消费选择:

org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_LAST_OFFSET
第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_FIRST_OFFSET
第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_TIMESTAMP
第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

消息重复幂等:

RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重
Ps:见开发文档

rokectmq开发指南.pdf
为什么要有组group:?

sh mqshutdown broker
sh mqshutdown namesrv