介绍
说白了就是消息不要立即发送,过一会儿发送,
延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);
开源版本的RocketMQ中只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
在商业版本的可以设置设置任何时间去发送,如果你想在开源版本去做到任何时间设置去发送,那么只能去改源码.
案例
生产者
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.time.LocalDateTime;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
for (int i = 0; i < 2; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//18个延迟级别messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
/*
* 设置消息的延迟级别,总共18个延迟级别,
* 在开源版本只能设置这18个级别,在商业版本的可以设置设置任何时间去发送,
* 如果你想在开源版本去做到任何时间设置去发送,那么只能去改源码
* 下面设定了一个3,代表生产者发送的消息要到10秒后才到消费者那里
*/
msg.setDelayTimeLevel(3);
LocalDateTime now = LocalDateTime.now();
System.out.println("当前发送消息的时间是: " + now.getYear() + "年" + now.getMonthValue() + "月" + now.getDayOfMonth() + "日" +
now.getHour() + "时" + now.getMinute() + "分钟" + now.getSecond() + "秒");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消费者
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.time.LocalDateTime;
import java.util.List;
/**
* 此示例显示如何使用提供 {@link DefaultMQPushConsumer} 订阅和使用消息。
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//使用指定的消费者组名称实例化
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
/*从上次偏移量开始消耗*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//再订阅一个主题来消费
consumer.subscribe("TopicTest", "*");
//注册回调以在从代理获取的消息到达时执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
LocalDateTime now = LocalDateTime.now();
System.out.println("当前接收消息的时间是: " + now.getYear() + "年" + now.getMonthValue() + "月" + now.getDayOfMonth() + "日" +
now.getHour() + "时" + now.getMinute() + "分钟" + now.getSecond() + "秒");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
演示
上面代码我级别设置了3,也就是消息延迟时间是10秒,
先启动消费者,等待消费者启动起来之后,再启动生产者
然后先观察生产者的控制台, 再观察消费者的控制台.
下面先说结论, 生产者在 2021年10月21日20时6分钟43秒 发送了第一条消息,
消费者在 2021年10月21日20时6分钟54秒 接收到了消费, 说明消息大概延迟了11秒, 毕竟我消费消息,然后打印当前时间也是会消耗掉一些毫秒的.
生产者控制台:
当前发送消息的时间是: 2021年10月21日20时6分钟43秒
SendResult [sendStatus=SEND_OK, msgId=AC100A01775418B4AAC26B4FFC640000, offsetMsgId=AC100A6600002A9F000000000025E350, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=4]
当前发送消息的时间是: 2021年10月21日20时6分钟44秒
SendResult [sendStatus=SEND_OK, msgId=AC100A01775418B4AAC26B4FFC6B0001, offsetMsgId=AC100A6600002A9F000000000025E44D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=5]
消费者的控制台:
Consumer Started.
当前接收消息的时间是: 2021年10月21日20时6分钟54秒
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=243, queueOffset=2, sysFlag=0, bornTimestamp=1634818004068, bornHost=/172.16.10.1:50893, storeTimestamp=1634818014865, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000000025E54A, commitLogOffset=2483530, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest, MAX_OFFSET=3, CONSUME_START_TIME=1634818014080, UNIQ_KEY=AC100A01775418B4AAC26B4FFC640000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
当前接收消息的时间是: 2021年10月21日20时6分钟54秒
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=243, queueOffset=2, sysFlag=0, bornTimestamp=1634818004075, bornHost=/172.16.10.1:50893, storeTimestamp=1634818014868, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000000025E63D, commitLogOffset=2483773, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest, MAX_OFFSET=3, CONSUME_START_TIME=1634818014080, UNIQ_KEY=AC100A01775418B4AAC26B4FFC6B0001, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]