介绍
说白了就是消息不要立即发送,过一会儿发送,
延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);
开源版本的RocketMQ中只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
在商业版本的可以设置设置任何时间去发送,如果你想在开源版本去做到任何时间设置去发送,那么只能去改源码.
案例
生产者
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.time.LocalDateTime;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();for (int i = 0; i < 2; i++) {try {Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//18个延迟级别messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h/** 设置消息的延迟级别,总共18个延迟级别,* 在开源版本只能设置这18个级别,在商业版本的可以设置设置任何时间去发送,* 如果你想在开源版本去做到任何时间设置去发送,那么只能去改源码* 下面设定了一个3,代表生产者发送的消息要到10秒后才到消费者那里*/msg.setDelayTimeLevel(3);LocalDateTime now = LocalDateTime.now();System.out.println("当前发送消息的时间是: " + now.getYear() + "年" + now.getMonthValue() + "月" + now.getDayOfMonth() + "日" +now.getHour() + "时" + now.getMinute() + "分钟" + now.getSecond() + "秒");SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}}
消费者
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.time.LocalDateTime;import java.util.List;/*** 此示例显示如何使用提供 {@link DefaultMQPushConsumer} 订阅和使用消息。*/public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//使用指定的消费者组名称实例化DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");/*从上次偏移量开始消耗*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//再订阅一个主题来消费consumer.subscribe("TopicTest", "*");//注册回调以在从代理获取的消息到达时执行consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {LocalDateTime now = LocalDateTime.now();System.out.println("当前接收消息的时间是: " + now.getYear() + "年" + now.getMonthValue() + "月" + now.getDayOfMonth() + "日" +now.getHour() + "时" + now.getMinute() + "分钟" + now.getSecond() + "秒");System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}}
演示
上面代码我级别设置了3,也就是消息延迟时间是10秒,
先启动消费者,等待消费者启动起来之后,再启动生产者
然后先观察生产者的控制台, 再观察消费者的控制台.
下面先说结论, 生产者在 2021年10月21日20时6分钟43秒 发送了第一条消息,
消费者在 2021年10月21日20时6分钟54秒 接收到了消费, 说明消息大概延迟了11秒, 毕竟我消费消息,然后打印当前时间也是会消耗掉一些毫秒的.
生产者控制台:
当前发送消息的时间是: 2021年10月21日20时6分钟43秒SendResult [sendStatus=SEND_OK, msgId=AC100A01775418B4AAC26B4FFC640000, offsetMsgId=AC100A6600002A9F000000000025E350, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=4]当前发送消息的时间是: 2021年10月21日20时6分钟44秒SendResult [sendStatus=SEND_OK, msgId=AC100A01775418B4AAC26B4FFC6B0001, offsetMsgId=AC100A6600002A9F000000000025E44D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=5]
消费者的控制台:
Consumer Started.当前接收消息的时间是: 2021年10月21日20时6分钟54秒ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=243, queueOffset=2, sysFlag=0, bornTimestamp=1634818004068, bornHost=/172.16.10.1:50893, storeTimestamp=1634818014865, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000000025E54A, commitLogOffset=2483530, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest, MAX_OFFSET=3, CONSUME_START_TIME=1634818014080, UNIQ_KEY=AC100A01775418B4AAC26B4FFC640000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]当前接收消息的时间是: 2021年10月21日20时6分钟54秒ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=243, queueOffset=2, sysFlag=0, bornTimestamp=1634818004075, bornHost=/172.16.10.1:50893, storeTimestamp=1634818014868, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000000025E63D, commitLogOffset=2483773, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest, MAX_OFFSET=3, CONSUME_START_TIME=1634818014080, UNIQ_KEY=AC100A01775418B4AAC26B4FFC6B0001, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]
