消息消费
长轮询方式消费
Rocketmq 的消息是由 consumer 端主动到 broker 拉取的, consumer 向 broker 发送拉消息 请求, PullMessageService 服务通过一个线程将阻塞队列 LinkedBlockingQueue
broker 返回消息后,通过opaque 从 responseTable获取 ResponseFuture。 通过InvokeCallback,回调 PullCallback,最终会回调业务代码中重写 MessageListenerConcurrently.consumeMessage() 方法的类完成消息消费。
consumer 消费时序
<br />![rocketMq 消息消费.png](https://cdn.nlark.com/yuque/0/2019/png/339606/1576848453917-7b45155b-adac-4525-acf5-497a40913cfa.png#crop=0&crop=0&crop=1&crop=1&height=1090&id=IJI5C&name=rocketMq%20%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9.png&originHeight=1090&originWidth=1666&originalType=binary&ratio=1&rotation=0&showTitle=false&size=73020&status=done&style=none&title=&width=1666)
broker 接收消费请求
consumer 接收长轮询消息
消息存储
存储目录文件 :
DemondeMacBook-Pro:store demon$ tree
.
├── abort
├── checkpoint
├── commitlog
│ └── 00000000000000000000
├── config
│ ├── consumerFilter.json
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json
│ ├── topics.json
│ └── topics.json.bak
├── consumequeue
│ └── IFCOIN_CONTRACT_CHANGE
│ ├── 0
│ │ └── 00000000000000000000
│ ├── 1
│ │ └── 00000000000000000000
│ ├── 2
│ │ └── 00000000000000000000
│ └── 3
│ └── 00000000000000000000
├── index
│ └── 20191220121807047
└── lock
通讯部分
通讯部分异步调用交互图,注:同步请求不需要回调