创建 Topic
首先得启动 Broker 和 NameServer 服务,然后启动 RocketMQ dashboard,启动方式见 RocketMQ 集群可视化监控,启动后可视化界面如下:
接着进入 Topic 菜单,新建一个名为 TopicTest 的 Topic,其实不手动创建该 Topic 也可以,在运行 Producer 时会自动创建该 Topic。
运行 Example
可以用 RocketMQ 自带的例子来测试发送和消费消息,找到 example 模块下的 quickstart 包
运行 Producer
修改下自带的 Producer 示例程序,如下所示:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置 NameServer 地址,Producer 通过 NameServer 去获取 Broker 地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
然后运行程序,控制台输出结果如下,此时说明已经成功的把消息发送到 Broker 去了。
SendResult [sendStatus=SEND_OK, msgId=7F00000172CC18B4AAC237F7A40A0000, offsetMsgId=7F00000100002A9F000000000002ECD4, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=7], queueOffset=0]
运行 Consumer
修改下自带的 Consumer示例程序,如下所示:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
运行上述程序,结果如下,可以看下消费了 1 条消息。
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=7, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1633956576368, bornHost=/127.0.0.1:14743, storeTimestamp=1633956576430, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000002ECD4, commitLogOffset=191700, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1633956735681, UNIQ_KEY=7F00000172CC18B4AAC237F7A40A0000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]