title: 顺序消息示例 date: 2017/12/21

categories: 文档翻译

Order Message

RocketMQ provides ordered messages using FIFO order.

顺序消息

RocketMQ提供使用先进先出算法的顺序消息实现。

The following example demonstrates sending/recieving of globally and partitionally ordered message.

以下示例展示了如何发送/接收全局顺序消息和分区顺序消息。

Send message sample code

  1. public class OrderedProducer {
  2. public static void main(String[] args) throws Exception {
  3. //Instantiate with a producer group name.
  4. MQProducer producer = new DefaultMQProducer("example_group_name");
  5. //Launch the instance.
  6. producer.start();
  7. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  8. for (int i = 0; i < 100; i++) {
  9. int orderId = i % 10;
  10. //Create a message instance, specifying topic, tag and message body.
  11. Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
  12. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  13. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  14. @Override
  15. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  16. Integer id = (Integer) arg;
  17. int index = id % mqs.size();
  18. return mqs.get(index);
  19. }
  20. }, orderId);
  21. System.out.printf("%s%n", sendResult);
  22. }
  23. //server shutdown
  24. producer.shutdown();
  25. }
  26. }

发送消息示例代码

  1. public class OrderedProducer {
  2. public static void main(String[] args) throws Exception {
  3. //Instantiate with a producer group name.
  4. MQProducer producer = new DefaultMQProducer("example_group_name");
  5. //Launch the instance.
  6. producer.start();
  7. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  8. for (int i = 0; i < 100; i++) {
  9. int orderId = i % 10;
  10. //Create a message instance, specifying topic, tag and message body.
  11. Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
  12. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  13. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  14. @Override
  15. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  16. Integer id = (Integer) arg;
  17. int index = id % mqs.size();
  18. return mqs.get(index);
  19. }
  20. }, orderId);
  21. System.out.printf("%s%n", sendResult);
  22. }
  23. //server shutdown
  24. producer.shutdown();
  25. }
  26. }

Subscription message sample code

  1. public class OrderedConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  6. consumer.registerMessageListener(new MessageListenerOrderly() {
  7. AtomicLong consumeTimes = new AtomicLong(0);
  8. @Override
  9. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
  10. ConsumeOrderlyContext context) {
  11. context.setAutoCommit(false);
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. this.consumeTimes.incrementAndGet();
  14. if ((this.consumeTimes.get() % 2) == 0) {
  15. return ConsumeOrderlyStatus.SUCCESS;
  16. } else if ((this.consumeTimes.get() % 3) == 0) {
  17. return ConsumeOrderlyStatus.ROLLBACK;
  18. } else if ((this.consumeTimes.get() % 4) == 0) {
  19. return ConsumeOrderlyStatus.COMMIT;
  20. } else if ((this.consumeTimes.get() % 5) == 0) {
  21. context.setSuspendCurrentQueueTimeMillis(3000);
  22. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  23. }
  24. return ConsumeOrderlyStatus.SUCCESS;
  25. }
  26. });
  27. consumer.start();
  28. System.out.printf("Consumer Started.%n");
  29. }
  30. }

订阅消息示例代码

  1. public class OrderedConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  6. consumer.registerMessageListener(new MessageListenerOrderly() {
  7. AtomicLong consumeTimes = new AtomicLong(0);
  8. @Override
  9. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
  10. ConsumeOrderlyContext context) {
  11. context.setAutoCommit(false);
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. this.consumeTimes.incrementAndGet();
  14. if ((this.consumeTimes.get() % 2) == 0) {
  15. return ConsumeOrderlyStatus.SUCCESS;
  16. } else if ((this.consumeTimes.get() % 3) == 0) {
  17. return ConsumeOrderlyStatus.ROLLBACK;
  18. } else if ((this.consumeTimes.get() % 4) == 0) {
  19. return ConsumeOrderlyStatus.COMMIT;
  20. } else if ((this.consumeTimes.get() % 5) == 0) {
  21. context.setSuspendCurrentQueueTimeMillis(3000);
  22. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  23. }
  24. return ConsumeOrderlyStatus.SUCCESS;
  25. }
  26. });
  27. consumer.start();
  28. System.out.printf("Consumer Started.%n");
  29. }
  30. }