title: 延时消息示例 date: 2017/12/21

categories: 文档翻译

Schedule example

What is scheduled message?

Scheduled messages differ from normal messages in that they won’t be delivered until a provided time later.

延时消息示例

什么是延时消息?

延时消息提供了一种不同于普通消息的实现形式——它们只会在设定的时限到了之后才被递送出去。

Application

  1. Start consumer to wait for incoming subscribed messages

    1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    5. import org.apache.rocketmq.common.message.MessageExt;
    6. import java.util.List;
    7. public class ScheduledMessageConsumer {
    8. public static void main(String[] args) throws Exception {
    9. // Instantiate message consumer
    10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    11. // Subscribe topics
    12. consumer.subscribe("TestTopic", "*");
    13. // Register message listener
    14. consumer.registerMessageListener(new MessageListenerConcurrently() {
    15. @Override
    16. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
    17. for (MessageExt message : messages) {
    18. // Print approximate delay time period
    19. System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
    20. + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
    21. }
    22. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    23. }
    24. });
    25. // Launch consumer
    26. consumer.start();
    27. }
    28. }

应用

  1. 启动消费者,等待即将接收的订阅消息

    1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    5. import org.apache.rocketmq.common.message.MessageExt;
    6. import java.util.List;
    7. public class ScheduledMessageConsumer {
    8. public static void main(String[] args) throws Exception {
    9. // Instantiate message consumer
    10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    11. // Subscribe topics
    12. consumer.subscribe("TestTopic", "*");
    13. // Register message listener
    14. consumer.registerMessageListener(new MessageListenerConcurrently() {
    15. @Override
    16. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
    17. for (MessageExt message : messages) {
    18. // Print approximate delay time period
    19. System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
    20. + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
    21. }
    22. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    23. }
    24. });
    25. // Launch consumer
    26. consumer.start();
    27. }
    28. }
  2. Send scheduled messages

    1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
    2. import org.apache.rocketmq.common.message.Message;
    3. public class ScheduledMessageProducer {
    4. public static void main(String[] args) throws Exception {
    5. // Instantiate a producer to send scheduled messages
    6. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    7. // Launch producer
    8. producer.start();
    9. int totalMessagesToSend = 100;
    10. for (int i = 0; i < totalMessagesToSend; i++) {
    11. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    12. // This message will be delivered to consumer 10 seconds later.
    13. message.setDelayTimeLevel(3);
    14. // Send the message
    15. producer.send(message);
    16. }
    17. // Shutdown producer after use.
    18. producer.shutdown();
    19. }
    20. }
  3. 发送延时消息

    1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
    2. import org.apache.rocketmq.common.message.Message;
    3. public class ScheduledMessageProducer {
    4. public static void main(String[] args) throws Exception {
    5. // Instantiate a producer to send scheduled messages
    6. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    7. // Launch producer
    8. producer.start();
    9. int totalMessagesToSend = 100;
    10. for (int i = 0; i < totalMessagesToSend; i++) {
    11. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    12. // This message will be delivered to consumer 10 seconds later.
    13. message.setDelayTimeLevel(3);
    14. // Send the message
    15. producer.send(message);
    16. }
    17. // Shutdown producer after use.
    18. producer.shutdown();
    19. }
    20. }
  4. Verification You should see messages are consumed about 10 seconds later than their storing time.

  5. 验证 你应该会在消息被存储之后10秒钟看到它们被消费。