1.环境准备

1.创建Maven工程,quickstart的就好
image.png
2.在自己的机器上启动rocketmq,windows的上篇文章已经讲过
3.maven中导入rocketmq的连接jar包和测试包

  1. <dependency>
  2. <groupId>junit</groupId>
  3. <artifactId>junit</artifactId>
  4. <version>4.13</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-client</artifactId>
  9. <version>4.3.0</version>
  10. </dependency>

注:可以直接从第4章开始看

2.RocketMQ整体存储原理

2.1消息存储的相关文件

2.1.1CommitLog文件

①存储消息的主体内容
②一个CommitLog文件大小为1G
③文件的命名为20位的二进制表示(文件位置:${HOME}/store/commitlog/),如下图
image.png
④如果第一文件0000**0000文件存满1G后,会生成名称为 :00000000001073741824文件(第二个文件)。

2.1.2ConsumeQueue文件

①消费者要消费消息时,需要到2.1.1中的commitlog文件中寻找数据,那么如何定位消息在commitLog文件的位置呢??
②ConsumeQueue文件中存贮每个消息在commitLog中的物理偏移量,消息的大小和消息Tag的hashCode
③图形展示ConsumeQueue文件和CommitLog文件的关系
image.png

2.1.3IndexFile文件

①提供了可以根据时间和key值来查询commitlog消息的索引
②提供的就是另一种查询消息的方式,官方说是个IndexFile实现的是hashMap结构。
③文件名称为 时间戳,按照时间查询就到时间对应的文件查询即可
④按照key查询,那么大概的原理图应该是:
image.png

3.RokectMQ相关概念

  1. 相关概念可以查看(https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md

2.各个概念的图形化
image.png
3.一条消息包含的内容
①所属的Topic名称
②Tag标签
③key值
④消息内容
4.一个消息保存到RocketMq的一个过程
①生产者向nameserver请求发送消息
②nameserver将返回brokerserver的一个ip:port
③生产者向这个brokerserver发送消息(包含topic tag key和消息主题)
④这台brokerserver的commitlog文件会记录这个消息
⑤indexfile中会记录这个消息索引。
⑥消息会自动选择Topic下的一个consumequeue,将消息索引放入这个队列中。
⑦broker返回ack

4.RockectMQ的使用

4.1消息发送

RocketMQ的使用及概念 - 图6发送一个消息需要确定3个问题。
①同步、异步或单项发送。
②发送普通消息、顺序消息还是批量发送(只能同步发)
③消息设不设置属性、设不设置延时。

4.1.1发送同步消息

  1. 步骤

①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者将消息发送。
⑥关闭生产者
2.代码

  1. @Test
  2. public void testSendSyncMsg() throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("syncMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建消息实例
  10. Message message = new Message("syncMsgTopic", "Tag", "key1", "Hello RocketMq".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //5.通过生产者将消息发送。
  12. producer.send(message);
  13. //6.关闭生产者
  14. producer.shutdown();
  15. }

4.1.2发送异步消息

  1. 步骤

①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者将消息发送,并设置回调方法。
⑥关闭生产者
2.代码

  1. @Test
  2. public void testSendAsyncMsg() throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("aSyncMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建消息实例
  10. Message message = new Message("aSyncMsgTopic", "Tag", "key2", "Hello RocketMq Async".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //5.通过生产者将消息发送,并设置回调方法
  12. producer.send(message, new SendCallback() {
  13. //成功时的回调
  14. @Override
  15. public void onSuccess(SendResult sendResult) {
  16. System.out.println("发送异步消息成功了!");
  17. System.out.println(sendResult);
  18. }
  19. //异常时的回调
  20. @Override
  21. public void onException(Throwable throwable) {
  22. System.out.println("发送异步消息失败了");
  23. throwable.printStackTrace();
  24. }
  25. });
  26. //这里我可以使用闭锁,但是为了简单直接就睡一会
  27. Thread.sleep(8000);
  28. //6.关闭生产者
  29. producer.shutdown();
  30. }

4.1.3发送单向消息

  1. 步骤

①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者发送单向消息,没有任何的返回值
⑥关闭生产者
2.代码

  1. @Test
  2. public void testSendSingleMsg() throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("singleMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建消息实例
  10. Message message = new Message("singleMsgTopic", "Tag", "key3", "Hello RocketMq Single".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //5.通过生产者发送单向消息,没有任何的返回值
  12. producer.sendOneway(message);
  13. //6.关闭生产者
  14. producer.shutdown();
  15. }

4.1.4同步顺序消息(异步和单向的不再举例)

1.步骤
①创建生产者实例
②设置nameserver的ip和端口
③启动生产者实例
④创建3条消息
⑤通过producer发送3条消息,并设置序号
⑥关闭生产者
2.代码

  1. @Test
  2. public void testSendSequenceMsg() throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("sequenceMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建3个消息实例
  10. ArrayList<Message> messages = new ArrayList<>();
  11. Message message = new Message("sequenceMsgTopic", "Tag", "key1", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));
  12. Message message1 = new Message("sequenceMsgTopic", "Tag", "key2", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));
  13. Message message2 = new Message("sequenceMsgTopic", "Tag", "key3", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));
  14. messages.add(message);
  15. messages.add(message1);
  16. messages.add(message2);
  17. //5.遍历消息并发送在第一消息队列中
  18. for(Message msg:messages){
  19. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  20. @Override
  21. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  22. //都发在第一个队列中
  23. return list.get(0);
  24. }
  25. }, messages.indexOf(msg)+1);
  26. System.out.println(sendResult);
  27. }
  28. //6.关闭生产者
  29. producer.shutdown();
  30. }

3.实际上就是下面这个send使用

  1. // 消息 指定消息队列 指定排序的
  2. public SendResult send(Message msg, MessageQueueSelector selector, Object arg)

4.1.5延时消息

1.代码

  1. @Test
  2. public void testSendDlayMsg() throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("delayMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建消息实例
  10. Message message = new Message("delayMsgTopic", "Tag", "key1", "Hello RocketMq Delay".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //5.为消息设置延时等级 1~18级
  12. message.setDelayTimeLevel(3);
  13. //6.通过生产者将消息发送。
  14. SendResult send = producer.send(message);
  15. System.out.println(send);
  16. //7.关闭生产者
  17. producer.shutdown();
  18. }

2.1~18级对应的时间

  1. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

4.1.6批量消息(只能以同步方式发送)

1.代码

  1. @Test
  2. public void testSendBatchMsg()throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("batchMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建消息实例
  10. ArrayList<Message> messages = new ArrayList<>();
  11. messages.add(new Message("batchMsgTopic", "Tag", "key1", "Hello RocketMq Batch1".getBytes(RemotingHelper.DEFAULT_CHARSET)));
  12. messages.add(new Message("batchMsgTopic", "Tag", "key2", "Hello RocketMq Batch2".getBytes(RemotingHelper.DEFAULT_CHARSET)));
  13. messages.add(new Message("batchMsgTopic", "Tag", "key3", "Hello RocketMq Batch3".getBytes(RemotingHelper.DEFAULT_CHARSET)));
  14. messages.add(new Message("batchMsgTopic", "Tag", "key4", "Hello RocketMq Batch4".getBytes(RemotingHelper.DEFAULT_CHARSET)));
  15. //5.发送
  16. SendResult send = producer.send(messages);
  17. System.out.println(send);
  18. //6.关闭生产者
  19. producer.shutdown();
  20. }

2.注意:批量消息大小要小于 4M
3.当消息多了后使用官方提供的切list的类

  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private final int SIZE_LIMIT = 1024 * 1024 * 4;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. @Override public boolean hasNext() {
  9. return currIndex < messages.size();
  10. }
  11. @Override public List<Message> next() {
  12. int startIndex = getStartIndex();
  13. int nextIndex = startIndex;
  14. int totalSize = 0;
  15. for (; nextIndex < messages.size(); nextIndex++) {
  16. Message message = messages.get(nextIndex);
  17. int tmpSize = calcMessageSize(message);
  18. if (tmpSize + totalSize > SIZE_LIMIT) {
  19. break;
  20. } else {
  21. totalSize += tmpSize;
  22. }
  23. }
  24. List<Message> subList = messages.subList(startIndex, nextIndex);
  25. currIndex = nextIndex;
  26. return subList;
  27. }
  28. private int getStartIndex() {
  29. Message currMessage = messages.get(currIndex);
  30. int tmpSize = calcMessageSize(currMessage);
  31. while(tmpSize > SIZE_LIMIT) {
  32. currIndex += 1;
  33. Message message = messages.get(curIndex);
  34. tmpSize = calcMessageSize(message);
  35. }
  36. return currIndex;
  37. }
  38. private int calcMessageSize(Message message) {
  39. int tmpSize = message.getTopic().length() + message.getBody().length();
  40. Map<String, String> properties = message.getProperties();
  41. for (Map.Entry<String, String> entry : properties.entrySet()) {
  42. tmpSize += entry.getKey().length() + entry.getValue().length();
  43. }
  44. tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
  45. return tmpSize;
  46. }
  47. }
  48. ===============================================================
  49. //把大的消息分裂成若干个小的消息
  50. ListSplitter splitter = new ListSplitter(messages);
  51. while (splitter.hasNext()) {
  52. try {
  53. List<Message> listItem = splitter.next();
  54. producer.send(listItem);
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. //处理error
  58. }
  59. }

4.1.7带有属性的消息

  1. @Test
  2. public void testSendHasPropertyMsg()throws Exception{
  3. //1.创建一个生产者,并指定生产者组
  4. DefaultMQProducer producer = new DefaultMQProducer("propertyMsgProducerGroup");
  5. //2.为生产者指定 nameserver的ip和端口
  6. producer.setNamesrvAddr("localhost:9876");
  7. //3.启动生产者实例
  8. producer.start();
  9. //4.创建消息实例
  10. Message message = new Message("propertyMsgTopic", "Tag", "key1", "Hello RocketMq Property1".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //5.设置属性
  12. message.putUserProperty("name","JamesLeBron");
  13. //6.发送
  14. SendResult send = producer.send(message);
  15. System.out.println(send);
  16. //7.关闭生产者
  17. producer.shutdown();
  18. }

4.2消费消息

RocketMQ的使用及概念 - 图7

4.2.1集群推动式普通消费

1.集群消费:同一个消费者组下均摊消息
2.推动式:Broker主动向消费者推送消息
3.代码

  1. @Test
  2. public void testClusterPushCommonConsume() throws Exception{
  3. //1.实例化推动式消费模式
  4. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syncConsume");
  5. //2.设置消费者消费模式,默认也是Clustering
  6. defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
  7. //3.设置nameServer的ip和端口和读取的偏移量(默认最后)
  8. defaultMQPushConsumer.setNamesrvAddr("localhost:9876");
  9. defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  10. //4.设置订阅的主题和标签
  11. defaultMQPushConsumer.subscribe("syncTopic","");
  12. //5.注册监听器
  13. defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  16. System.out.println(list.size());
  17. System.out.println(list);
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. }
  20. });
  21. //6.启动消费者实例
  22. defaultMQPushConsumer.start();
  23. //7.睡眠
  24. Thread.sleep(100000);
  25. }