介绍

批量消息是指将多条小的消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

比如说原本我有三条消息,如果三条消息分三次发的话,会走三次网络IO,如果我给三条消息整成一起发送,这样就走一次网络了.

不足

批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右.

使用限制

这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

入门代码案例

说明

要想批量发送消息,很简单,只需要在生产者代码那里 producer.send 方法的时候传入一个List集合即可,这List集合里面存放的就是多个Message消息.

消费者代码那里什么都不需要动,不需要其它额外的配置

  1. List<Message> messages = new ArrayList<>();
  2. messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
  3. messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
  4. messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
  5. producer.send(messages);// 批量发送

生产者

  1. package org.apache.rocketmq.example.batch;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.common.message.Message;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. public class SimpleBatchProducer {
  7. public static void main(String[] args) throws Exception {
  8. DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
  9. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  10. producer.start();
  11. //如果您一次发送的消息不超过1MiB,那么很容易使用批处理
  12. String topic = "BatchTest";
  13. // 将三个消息都放到一个List,然后把这个List发送过去,这就是批量消息
  14. List<Message> messages = new ArrayList<>();
  15. messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
  16. messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
  17. messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
  18. producer.send(messages);
  19. producer.shutdown();
  20. }
  21. }

消费者

  1. package org.apache.rocketmq.example.batch;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. import java.util.concurrent.atomic.AtomicLong;
  11. public class Consumer {
  12. public static void main(String[] args) throws InterruptedException, MQClientException {
  13. //使用指定的消费者组名称实例化
  14. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchProducerGroupName");
  15. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  16. /*从上次偏移量开始消耗*/
  17. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  18. //再订阅一个主题来消费
  19. consumer.subscribe("BatchTest", "*");
  20. AtomicLong atomicLong = new AtomicLong(1); //创建一个计数器,初始值是1
  21. //注册回调以在从代理获取的消息到达时执行
  22. consumer.registerMessageListener(new MessageListenerConcurrently() {
  23. @Override
  24. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  25. ConsumeConcurrentlyContext context) {
  26. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  27. long andIncrement = atomicLong.getAndIncrement();
  28. System.out.println("当前接收到了消息的个数" + andIncrement);
  29. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  30. }
  31. });
  32. //启动消费者实例
  33. consumer.start();
  34. System.out.printf("Consumer Started.%n");
  35. }
  36. }

验证代码查看效果

先启动消费者, 再启动生产者

可以发现 消费者这里接收到了三条消息, 但其实生产者就发送了一次,发送的参数是一个List集合,这List集合里面就是三条消息.

注意:不要看到 “当前接收到了消息的个数” 这个输出顺序变了就以为是bug, 其实不是, 因为 消费者接收消息的时候是多线程的, 可能打印 “当前接收到了消息的个数1” 这个的线程比打印”当前接收到了消息的个数2”的线程执行的早, 所以就打印在前面了, 顺序乱了,

只要看最大的数是多大就可以了,因为AtomicLong是原子类,在多线程是线程安全的,不会出现计数错误问题.

最大的输出 收到消息个数是 3 , 就说明一共接收到了三条消息.

  1. Consumer Started.
  2. ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581891, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7A9C, commitLogOffset=467499676, bodyCRC=1841171634, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID001, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390000, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 48], transactionId='null'}]]
  3. ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581892, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7B5A, commitLogOffset=467499866, bodyCRC=448347172, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID002, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390001, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 49], transactionId='null'}]]
  4. 当前接收到了消息的个数2
  5. ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581893, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7C18, commitLogOffset=467500056, bodyCRC=61894046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID003, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390002, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 50], transactionId='null'}]]
  6. 当前接收到了消息的个数1
  7. 当前接收到了消息的个数3

如果消息超过了限制如何解决

批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右. 一次发送的超过了限制,MQ会报错的,

最简单粗暴的解决方案就是 一次发送的消息不要太多, 还有个解决办法就是将消息分成多份儿来发送.

生产者

下面代码我添加了很多注释,大部分人应该都能看懂

  1. package org.apache.rocketmq.example.batch;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.common.message.Message;
  4. import java.util.ArrayList;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.Map;
  8. public class SplitBatchProducer {
  9. public static void main(String[] args) throws Exception {
  10. int cycleIndex = 50000; // 循环次数 这个参数是循环设置多少个参数的
  11. DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
  12. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  13. producer.start();
  14. //large batch
  15. String topic = "BatchTest";
  16. List<Message> messages = new ArrayList<>(100 * 1000);
  17. // 一次10万条消息,一次发送出去肯定是超过限制了.
  18. for (int i = 0; i < cycleIndex; i++) {
  19. messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
  20. }
  21. // 直接发送出去会报错
  22. // producer.send(messages);
  23. // 如果超过大小限制了,就用下面的代码把一个大的消息进行拆分多个小的消息,然后多次发送出去
  24. ListSplitter splitter = new ListSplitter(messages);
  25. while (splitter.hasNext()) {
  26. List<Message> listItem = splitter.next();
  27. producer.send(listItem);
  28. }
  29. producer.shutdown();
  30. }
  31. }
  32. class ListSplitter implements Iterator<List<Message>> {
  33. // 消息
  34. private final List<Message> messages;
  35. //大小限制
  36. private final int sizeLimit = 1000 * 1000;
  37. //当前索引
  38. private int currIndex;
  39. public ListSplitter(List<Message> messages) {
  40. this.messages = messages;
  41. }
  42. /**
  43. * 判断是否还有数据,
  44. * 判断逻辑: 当前索引是否小于 消息的长度
  45. *
  46. * @return
  47. */
  48. @Override
  49. public boolean hasNext() {
  50. return currIndex < messages.size();
  51. }
  52. @Override
  53. public List<Message> next() {
  54. int nextIndex = currIndex;//当前记录的已经用过的索引
  55. int totalSize = 0;
  56. for (; nextIndex < messages.size(); nextIndex++) {
  57. Message message = messages.get(nextIndex);
  58. // 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20
  59. int tmpSize = getMessageSize(message);
  60. // 如果当前取出来的消息长度大于预先设置的sizeLimit(消息最大长度,)直接就跳出循环,然后记录下nextIndex索引位置
  61. if (tmpSize > sizeLimit) { // 如果消息长度超过了 sizeLimit(1百万)
  62. // 如果下一个索引减去当前索引为0,那么就给下一个索引进行加1,这样目的是下次循环的时候,就可以通过nextIndex属性拿取下一个索引的值
  63. if (nextIndex - currIndex == 0) {
  64. nextIndex++;
  65. }
  66. break;
  67. }
  68. // 什么时候 多个消息累加的长度+当前取出来的消息的长度 > sizeLimit(预先设置的消息最大长度),就执行break跳出当前循环
  69. //否则就接着 累加消息.
  70. if (tmpSize + totalSize > sizeLimit) {
  71. break;
  72. } else {
  73. totalSize += tmpSize;
  74. }
  75. }
  76. /*subList方法,通过起始索引和结束索引获取List的一部分
  77. 参数1: 截取元素的起始位置,包含该索引位置元素
  78. 参数2: 截取元素的结束位置,不包含该索引位置元素
  79. */
  80. List<Message> subList = messages.subList(currIndex, nextIndex);
  81. System.out.println("当前的currIndex是: " + currIndex + " 当前的nextIndex是" + nextIndex);
  82. currIndex = nextIndex;
  83. return subList;
  84. }
  85. /**
  86. * 计算消息大小
  87. * 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20
  88. *
  89. * @param message 消息
  90. * @return 消息长度
  91. */
  92. private int getMessageSize(Message message) {
  93. int tmpSize = message.getTopic().length() + message.getBody().length;
  94. //消息属性的长度
  95. Map<String, String> properties = message.getProperties();
  96. for (Map.Entry<String, String> entry : properties.entrySet()) {
  97. tmpSize += entry.getKey().length() + entry.getValue().length();
  98. }
  99. tmpSize = tmpSize + 20; //对日志的开销
  100. return tmpSize;
  101. }
  102. @Override
  103. public void remove() {
  104. throw new UnsupportedOperationException("Not allowed to remove");
  105. }
  106. }

消费者用上面入门代码案例的消费者

启动并且测试结果

先启动消费者, 再启动生产者. 注意,每次反复测试的时候需要重新启动消费者, 因为我在消费者弄了一个AtomicLong计数器,每次测试的时候不重启消费者的话,那么这个AtomicLong参数会接着累加.

生产者日志:

可以看到,分成了四份发送了50000条消息.
这就是批量消息的好处,50000条消息,如果一个一个发送的话,要走50000次网络IO, 如果用批量消息发送的话,下面这个案例,就走了4次IO.

  1. 当前的currIndex是: 0 当前的nextIndex13275
  2. 当前的currIndex是: 13275 当前的nextIndex26262
  3. 当前的currIndex是: 26262 当前的nextIndex39249
  4. 当前的currIndex是: 39249 当前的nextIndex50000

消费者日志:

可以看到 消费者一共接收到了50000条日志.

  1. ... 前面的日志不粘了,太长了.
  2. ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610976, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528A36, commitLogOffset=475171382, bodyCRC=1335665873, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39246, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994E, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 54], transactionId='null'}]]
  3. 当前接收到了消息的个数49998
  4. ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610978, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528BBE, commitLogOffset=475171774, bodyCRC=673483222, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39248, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D9950, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 56], transactionId='null'}]]
  5. 当前接收到了消息的个数49999
  6. ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610977, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528AFA, commitLogOffset=475171578, bodyCRC=949720135, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39247, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994F, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 55], transactionId='null'}]]
  7. 当前接收到了消息的个数50000