title: 快速开始 date: 2017/12/21

categories: 文档翻译

Batch Example

Why batch?

Sending messages in batch improves performance of delivering small messages.

批量消息示例

为什么要采用批量消息?

批量发送消息可以提升投递小内存消息时的性能。

Usage constraints

Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support.

Besides, the total size of the messages in one batch should be no more than 1MiB.

使用限制

同一批消息必须满足以下条件:相同的主题、相同的waitStoreMsgOK变量设置,而且都不支持延时发送

另外,一个批量消息的大小最好不要大于1MiB。

How to use batch

If you just send messages of no more than 1MiB at a time, it is easy to use batch:

  1. String topic = "BatchTest";
  2. List<Message> messages = new ArrayList<>();
  3. messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
  4. messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
  5. messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
  6. try {
  7. producer.send(messages);
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. //handle the error
  11. }

如何使用批量消息

如果你一次发送的消息总大小不超过1MB,使用批量消息就很简单

  1. String topic = "BatchTest";
  2. List<Message> messages = new ArrayList<>();
  3. messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
  4. messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
  5. messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
  6. try {
  7. producer.send(messages);
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. //handle the error
  11. }

Split into lists

The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (1MiB).

At this time, you’d better split the lists:

  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private final int SIZE_LIMIT = 1000 * 1000;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. @Override public boolean hasNext() {
  9. return currIndex < messages.size();
  10. }
  11. @Override public List<Message> next() {
  12. int nextIndex = currIndex;
  13. int totalSize = 0;
  14. for (; nextIndex < messages.size(); nextIndex++) {
  15. Message message = messages.get(nextIndex);
  16. int tmpSize = message.getTopic().length() + message.getBody().length;
  17. Map<String, String> properties = message.getProperties();
  18. for (Map.Entry<String, String> entry : properties.entrySet()) {
  19. tmpSize += entry.getKey().length() + entry.getValue().length();
  20. }
  21. tmpSize = tmpSize + 20; //for log overhead
  22. if (tmpSize > SIZE_LIMIT) {
  23. //it is unexpected that single message exceeds the SIZE_LIMIT
  24. //here just let it go, otherwise it will block the splitting process
  25. if (nextIndex - currIndex == 0) {
  26. //if the next sublist has no element, add this one and then break, otherwise just break
  27. nextIndex++;
  28. }
  29. break;
  30. }
  31. if (tmpSize + totalSize > SIZE_LIMIT) {
  32. break;
  33. } else {
  34. totalSize += tmpSize;
  35. }
  36. }
  37. List<Message> subList = messages.subList(currIndex, nextIndex);
  38. currIndex = nextIndex;
  39. return subList;
  40. }
  41. }
  42. //then you could split the large list into small ones:
  43. ListSplitter splitter = new ListSplitter(messages);
  44. while (splitter.hasNext()) {
  45. try {
  46. List<Message> listItem = splitter.next();
  47. producer.send(listItem);
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. //handle the error
  51. }
  52. }

切分后用List保存

只有在你发送大内存批量消息而且不确定是否达到大小限制(1MiB)的时候,才会变得复杂。

这时候,你应该把它们切分,然后用List保存

  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private final int SIZE_LIMIT = 1000 * 1000;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. @Override public boolean hasNext() {
  9. return currIndex < messages.size();
  10. }
  11. @Override public List<Message> next() {
  12. int nextIndex = currIndex;
  13. int totalSize = 0;
  14. for (; nextIndex < messages.size(); nextIndex++) {
  15. Message message = messages.get(nextIndex);
  16. int tmpSize = message.getTopic().length() + message.getBody().length;
  17. Map<String, String> properties = message.getProperties();
  18. for (Map.Entry<String, String> entry : properties.entrySet()) {
  19. tmpSize += entry.getKey().length() + entry.getValue().length();
  20. }
  21. tmpSize = tmpSize + 20; //for log overhead
  22. if (tmpSize > SIZE_LIMIT) {
  23. //it is unexpected that single message exceeds the SIZE_LIMIT
  24. //here just let it go, otherwise it will block the splitting process
  25. if (nextIndex - currIndex == 0) {
  26. //if the next sublist has no element, add this one and then break, otherwise just break
  27. nextIndex++;
  28. }
  29. break;
  30. }
  31. if (tmpSize + totalSize > SIZE_LIMIT) {
  32. break;
  33. } else {
  34. totalSize += tmpSize;
  35. }
  36. }
  37. List<Message> subList = messages.subList(currIndex, nextIndex);
  38. currIndex = nextIndex;
  39. return subList;
  40. }
  41. }
  42. //then you could split the large list into small ones:
  43. ListSplitter splitter = new ListSplitter(messages);
  44. while (splitter.hasNext()) {
  45. try {
  46. List<Message> listItem = splitter.next();
  47. producer.send(listItem);
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. //handle the error
  51. }
  52. }