介绍

说白了就是消息不要立即发送,过一会儿发送,

延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);

开源版本的RocketMQ中只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

在商业版本的可以设置设置任何时间去发送,如果你想在开源版本去做到任何时间设置去发送,那么只能去改源码.

案例

生产者

  1. package org.apache.rocketmq.example.quickstart;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.common.message.Message;
  6. import org.apache.rocketmq.remoting.common.RemotingHelper;
  7. import java.time.LocalDateTime;
  8. public class Producer {
  9. public static void main(String[] args) throws MQClientException, InterruptedException {
  10. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  11. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  12. producer.start();
  13. for (int i = 0; i < 2; i++) {
  14. try {
  15. Message msg = new Message("TopicTest" /* Topic */,
  16. "TagA" /* Tag */,
  17. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  18. );
  19. //18个延迟级别messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  20. /*
  21. * 设置消息的延迟级别,总共18个延迟级别,
  22. * 在开源版本只能设置这18个级别,在商业版本的可以设置设置任何时间去发送,
  23. * 如果你想在开源版本去做到任何时间设置去发送,那么只能去改源码
  24. * 下面设定了一个3,代表生产者发送的消息要到10秒后才到消费者那里
  25. */
  26. msg.setDelayTimeLevel(3);
  27. LocalDateTime now = LocalDateTime.now();
  28. System.out.println("当前发送消息的时间是: " + now.getYear() + "年" + now.getMonthValue() + "月" + now.getDayOfMonth() + "日" +
  29. now.getHour() + "时" + now.getMinute() + "分钟" + now.getSecond() + "秒");
  30. SendResult sendResult = producer.send(msg);
  31. System.out.printf("%s%n", sendResult);
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. Thread.sleep(1000);
  35. }
  36. }
  37. producer.shutdown();
  38. }
  39. }

消费者

  1. package org.apache.rocketmq.example.quickstart;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.time.LocalDateTime;
  10. import java.util.List;
  11. /**
  12. * 此示例显示如何使用提供 {@link DefaultMQPushConsumer} 订阅和使用消息。
  13. */
  14. public class Consumer {
  15. public static void main(String[] args) throws InterruptedException, MQClientException {
  16. //使用指定的消费者组名称实例化
  17. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");
  18. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  19. /*从上次偏移量开始消耗*/
  20. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  21. //再订阅一个主题来消费
  22. consumer.subscribe("TopicTest", "*");
  23. //注册回调以在从代理获取的消息到达时执行
  24. consumer.registerMessageListener(new MessageListenerConcurrently() {
  25. @Override
  26. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  27. ConsumeConcurrentlyContext context) {
  28. LocalDateTime now = LocalDateTime.now();
  29. System.out.println("当前接收消息的时间是: " + now.getYear() + "年" + now.getMonthValue() + "月" + now.getDayOfMonth() + "日" +
  30. now.getHour() + "时" + now.getMinute() + "分钟" + now.getSecond() + "秒");
  31. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  32. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  33. }
  34. });
  35. //启动消费者实例
  36. consumer.start();
  37. System.out.printf("Consumer Started.%n");
  38. }
  39. }

演示

上面代码我级别设置了3,也就是消息延迟时间是10秒,

先启动消费者,等待消费者启动起来之后,再启动生产者

然后先观察生产者的控制台, 再观察消费者的控制台.

下面先说结论, 生产者在 2021年10月21日20时6分钟43秒 发送了第一条消息,

消费者在 2021年10月21日20时6分钟54秒 接收到了消费, 说明消息大概延迟了11秒, 毕竟我消费消息,然后打印当前时间也是会消耗掉一些毫秒的.

生产者控制台:

  1. 当前发送消息的时间是: 20211021206分钟43
  2. SendResult [sendStatus=SEND_OK, msgId=AC100A01775418B4AAC26B4FFC640000, offsetMsgId=AC100A6600002A9F000000000025E350, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=4]
  3. 当前发送消息的时间是: 20211021206分钟44
  4. SendResult [sendStatus=SEND_OK, msgId=AC100A01775418B4AAC26B4FFC6B0001, offsetMsgId=AC100A6600002A9F000000000025E44D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=5]

消费者的控制台:

  1. Consumer Started.
  2. 当前接收消息的时间是: 20211021206分钟54
  3. ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=243, queueOffset=2, sysFlag=0, bornTimestamp=1634818004068, bornHost=/172.16.10.1:50893, storeTimestamp=1634818014865, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000000025E54A, commitLogOffset=2483530, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest, MAX_OFFSET=3, CONSUME_START_TIME=1634818014080, UNIQ_KEY=AC100A01775418B4AAC26B4FFC640000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
  4. 当前接收消息的时间是: 20211021206分钟54
  5. ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=243, queueOffset=2, sysFlag=0, bornTimestamp=1634818004075, bornHost=/172.16.10.1:50893, storeTimestamp=1634818014868, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000000025E63D, commitLogOffset=2483773, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest, MAX_OFFSET=3, CONSUME_START_TIME=1634818014080, UNIQ_KEY=AC100A01775418B4AAC26B4FFC6B0001, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]