消息消费

长轮询方式消费
Rocketmq 的消息是由 consumer 端主动到 broker 拉取的, consumer 向 broker 发送拉消息 请求, PullMessageService 服务通过一个线程将阻塞队列 LinkedBlockingQueue 中的 PullRequest 到 broker 拉取消息。

broker 返回消息后,通过opaque 从 responseTable获取 ResponseFuture。 通过InvokeCallback,回调 PullCallback,最终会回调业务代码中重写 MessageListenerConcurrently.consumeMessage() 方法的类完成消息消费。

consumer 消费时序

  1. <br />![rocketMq 消息消费.png](https://cdn.nlark.com/yuque/0/2019/png/339606/1576848453917-7b45155b-adac-4525-acf5-497a40913cfa.png#crop=0&crop=0&crop=1&crop=1&height=1090&id=IJI5C&name=rocketMq%20%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9.png&originHeight=1090&originWidth=1666&originalType=binary&ratio=1&rotation=0&showTitle=false&size=73020&status=done&style=none&title=&width=1666)

broker 接收消费请求

broker 接收consumer 长轮询请求.png

consumer 接收长轮询消息

consumer 接收长轮询响应.png

消息存储

存储目录文件 :

DemondeMacBook-Pro:store demon$ tree
.
├── abort
├── checkpoint
├── commitlog
│ └── 00000000000000000000
├── config
│ ├── consumerFilter.json
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json
│ ├── topics.json
│ └── topics.json.bak
├── consumequeue
│ └── IFCOIN_CONTRACT_CHANGE
│ ├── 0
│ │ └── 00000000000000000000
│ ├── 1
│ │ └── 00000000000000000000
│ ├── 2
│ │ └── 00000000000000000000
│ └── 3
│ └── 00000000000000000000
├── index
│ └── 20191220121807047
└── lock

通讯部分

通讯部分异步调用交互图,注:同步请求不需要回调

rocketMq 通讯部分交互.png