title: 顺序消息示例 date: 2017/12/21
categories: 文档翻译
Order Message
RocketMQ provides ordered messages using FIFO order.
顺序消息
RocketMQ提供使用先进先出算法的顺序消息实现。
The following example demonstrates sending/recieving of globally and partitionally ordered message.
以下示例展示了如何发送/接收全局顺序消息和分区顺序消息。
Send message sample code
public class OrderedProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.MQProducer producer = new DefaultMQProducer("example_group_name");//Launch the instance.producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}//server shutdownproducer.shutdown();}}
发送消息示例代码
public class OrderedProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.MQProducer producer = new DefaultMQProducer("example_group_name");//Launch the instance.producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}//server shutdownproducer.shutdown();}}
Subscription message sample code
public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;} else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
订阅消息示例代码
public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;} else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
