创建 Topic

首先得启动 Broker 和 NameServer 服务,然后启动 RocketMQ dashboard,启动方式见 RocketMQ 集群可视化监控,启动后可视化界面如下:
image.png
接着进入 Topic 菜单,新建一个名为 TopicTest 的 Topic,其实不手动创建该 Topic 也可以,在运行 Producer 时会自动创建该 Topic。
image.png

运行 Example

可以用 RocketMQ 自带的例子来测试发送和消费消息,找到 example 模块下的 quickstart 包
image.png

运行 Producer

修改下自带的 Producer 示例程序,如下所示:

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  4. // 设置 NameServer 地址,Producer 通过 NameServer 去获取 Broker 地址
  5. producer.setNamesrvAddr("127.0.0.1:9876");
  6. producer.start();
  7. for (int i = 0; i < 1; i++) {
  8. try {
  9. Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg);
  11. System.out.printf("%s%n", sendResult);
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. Thread.sleep(1000);
  15. }
  16. }
  17. producer.shutdown();
  18. }
  19. }

然后运行程序,控制台输出结果如下,此时说明已经成功的把消息发送到 Broker 去了。

  1. SendResult [sendStatus=SEND_OK, msgId=7F00000172CC18B4AAC237F7A40A0000, offsetMsgId=7F00000100002A9F000000000002ECD4, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=7], queueOffset=0]

运行 Consumer

修改下自带的 Consumer示例程序,如下所示:

  1. public class Consumer {
  2. public static void main(String[] args) throws InterruptedException, MQClientException {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
  4. consumer.setNamesrvAddr("127.0.0.1:9876");
  5. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  6. consumer.subscribe("TopicTest", "*");
  7. /*
  8. * Register callback to execute on arrival of messages fetched from brokers.
  9. */
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  13. ConsumeConcurrentlyContext context) {
  14. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. /*
  19. * Launch the consumer instance.
  20. */
  21. consumer.start();
  22. System.out.printf("Consumer Started.%n");
  23. }
  24. }

运行上述程序,结果如下,可以看下消费了 1 条消息。

  1. ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=7, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1633956576368, bornHost=/127.0.0.1:14743, storeTimestamp=1633956576430, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000002ECD4, commitLogOffset=191700, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1633956735681, UNIQ_KEY=7F00000172CC18B4AAC237F7A40A0000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]