介绍
批量消息是指将多条小的消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
比如说原本我有三条消息,如果三条消息分三次发的话,会走三次网络IO,如果我给三条消息整成一起发送,这样就走一次网络了.
不足
批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右.
使用限制
这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
入门代码案例
说明
要想批量发送消息,很简单,只需要在生产者代码那里 producer.send 方法的时候传入一个List集合即可,这List集合里面存放的就是多个Message消息.
消费者代码那里什么都不需要动,不需要其它额外的配置
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);// 批量发送
生产者
package org.apache.rocketmq.example.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
//如果您一次发送的消息不超过1MiB,那么很容易使用批处理
String topic = "BatchTest";
// 将三个消息都放到一个List,然后把这个List发送过去,这就是批量消息
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
producer.shutdown();
}
}
消费者
package org.apache.rocketmq.example.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//使用指定的消费者组名称实例化
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchProducerGroupName");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
/*从上次偏移量开始消耗*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//再订阅一个主题来消费
consumer.subscribe("BatchTest", "*");
AtomicLong atomicLong = new AtomicLong(1); //创建一个计数器,初始值是1
//注册回调以在从代理获取的消息到达时执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
long andIncrement = atomicLong.getAndIncrement();
System.out.println("当前接收到了消息的个数" + andIncrement);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
验证代码查看效果
先启动消费者, 再启动生产者
可以发现 消费者这里接收到了三条消息, 但其实生产者就发送了一次,发送的参数是一个List集合,这List集合里面就是三条消息.
注意:不要看到 “当前接收到了消息的个数” 这个输出顺序变了就以为是bug, 其实不是, 因为 消费者接收消息的时候是多线程的, 可能打印 “当前接收到了消息的个数1” 这个的线程比打印”当前接收到了消息的个数2”的线程执行的早, 所以就打印在前面了, 顺序乱了,
只要看最大的数是多大就可以了,因为AtomicLong是原子类,在多线程是线程安全的,不会出现计数错误问题.
最大的输出 收到消息个数是 3 , 就说明一共接收到了三条消息.
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581891, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7A9C, commitLogOffset=467499676, bodyCRC=1841171634, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID001, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390000, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581892, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7B5A, commitLogOffset=467499866, bodyCRC=448347172, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID002, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390001, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 49], transactionId='null'}]]
当前接收到了消息的个数2
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581893, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7C18, commitLogOffset=467500056, bodyCRC=61894046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID003, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390002, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 50], transactionId='null'}]]
当前接收到了消息的个数1
当前接收到了消息的个数3
如果消息超过了限制如何解决
批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右. 一次发送的超过了限制,MQ会报错的,
最简单粗暴的解决方案就是 一次发送的消息不要太多, 还有个解决办法就是将消息分成多份儿来发送.
生产者
下面代码我添加了很多注释,大部分人应该都能看懂
package org.apache.rocketmq.example.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
int cycleIndex = 50000; // 循环次数 这个参数是循环设置多少个参数的
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
// 一次10万条消息,一次发送出去肯定是超过限制了.
for (int i = 0; i < cycleIndex; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
// 直接发送出去会报错
// producer.send(messages);
// 如果超过大小限制了,就用下面的代码把一个大的消息进行拆分多个小的消息,然后多次发送出去
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
producer.shutdown();
}
}
class ListSplitter implements Iterator<List<Message>> {
// 消息
private final List<Message> messages;
//大小限制
private final int sizeLimit = 1000 * 1000;
//当前索引
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
/**
* 判断是否还有数据,
* 判断逻辑: 当前索引是否小于 消息的长度
*
* @return
*/
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;//当前记录的已经用过的索引
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
// 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20
int tmpSize = getMessageSize(message);
// 如果当前取出来的消息长度大于预先设置的sizeLimit(消息最大长度,)直接就跳出循环,然后记录下nextIndex索引位置
if (tmpSize > sizeLimit) { // 如果消息长度超过了 sizeLimit(1百万)
// 如果下一个索引减去当前索引为0,那么就给下一个索引进行加1,这样目的是下次循环的时候,就可以通过nextIndex属性拿取下一个索引的值
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
// 什么时候 多个消息累加的长度+当前取出来的消息的长度 > sizeLimit(预先设置的消息最大长度),就执行break跳出当前循环
//否则就接着 累加消息.
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
/*subList方法,通过起始索引和结束索引获取List的一部分
参数1: 截取元素的起始位置,包含该索引位置元素
参数2: 截取元素的结束位置,不包含该索引位置元素
*/
List<Message> subList = messages.subList(currIndex, nextIndex);
System.out.println("当前的currIndex是: " + currIndex + " 当前的nextIndex是" + nextIndex);
currIndex = nextIndex;
return subList;
}
/**
* 计算消息大小
* 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20
*
* @param message 消息
* @return 消息长度
*/
private int getMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
//消息属性的长度
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //对日志的开销
return tmpSize;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
消费者用上面入门代码案例的消费者
启动并且测试结果
先启动消费者, 再启动生产者. 注意,每次反复测试的时候需要重新启动消费者, 因为我在消费者弄了一个AtomicLong计数器,每次测试的时候不重启消费者的话,那么这个AtomicLong参数会接着累加.
生产者日志:
可以看到,分成了四份发送了50000条消息.
这就是批量消息的好处,50000条消息,如果一个一个发送的话,要走50000次网络IO, 如果用批量消息发送的话,下面这个案例,就走了4次IO.
当前的currIndex是: 0 当前的nextIndex是13275
当前的currIndex是: 13275 当前的nextIndex是26262
当前的currIndex是: 26262 当前的nextIndex是39249
当前的currIndex是: 39249 当前的nextIndex是50000
消费者日志:
可以看到 消费者一共接收到了50000条日志.
... 前面的日志不粘了,太长了.
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610976, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528A36, commitLogOffset=475171382, bodyCRC=1335665873, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39246, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994E, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 54], transactionId='null'}]]
当前接收到了消息的个数49998
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610978, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528BBE, commitLogOffset=475171774, bodyCRC=673483222, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39248, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D9950, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 56], transactionId='null'}]]
当前接收到了消息的个数49999
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610977, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528AFA, commitLogOffset=475171578, bodyCRC=949720135, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39247, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994F, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 55], transactionId='null'}]]
当前接收到了消息的个数50000