1.1 idea导入RocketMQ源码

官网下载地址:https://github.com/apache/rocketmq/tree/master

1.2 启动namesrv模块

1):配置NamesrvStartup启动应用,增加Environment variables(环境变量)
ROCKETMQ_HOME=D:\rocketmq(可自定义)
image.pngimage.png
2): 复制distribution/conf下的broker.conf,logback_broker.xml,logback_broker.xml到D:\rocketmq(可自定义)目录下
3): 修改broker.conf参数

  1. brokerClusterName = DefaultCluster
  2. brokerName = broker-a
  3. brokerId = 0
  4. #nameServer 地址,分号分割
  5. namesrvAddr=127.0.0.1:9876
  6. deleteWhen = 04
  7. fileReservedTime = 48
  8. brokerRole = ASYNC_MASTER
  9. flushDiskType = ASYNC_FLUSH
  10. #存储路径
  11. storePathRootDir=D:\\rocketmq\\store
  12. #commitLog存储路径
  13. storePathCommitLog=D:\\rocketmq\\store\\commitlog
  14. #消费队列存储地址
  15. storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue
  16. #消息索引存储路径
  17. storePathIndex=D:\\rocketmq\\store\\index
  18. #checkpoint文件存储路径
  19. storeCheckpoint=D:\\rocketmq\\store\\checkpoint
  20. #abort文件存储路径
  21. abortFile=D:\\rocketmq\\store\\abort

4): 替换logback_broker.xml,logback_broker.xml中user.home为ROCKETMQ_HOME
image.png
5): 执行NamesrvStartup.java,并成功启动
The Name Server boot success. serializeType=JSON
image.png

1.2 启动broker模块

  1. 启动方式同namesrv模块基本一致<br />1) 配置NamesrvStartup启动应用<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23113393/1645196134551-0e97d932-0ac1-4c46-9d4d-51d484720aa6.png#clientId=uc3e91c96-31ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=593&id=udee8b58a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=593&originWidth=1318&originalType=binary&ratio=1&rotation=0&showTitle=false&size=120498&status=done&style=none&taskId=ua4ad5eb7-e137-41dc-9d6a-aacaf68fc4f&title=&width=1318)<br />2): 增加Program arguments参数配置<br />-c D:\rocketmq\conf\broker.conf<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23113393/1645196204630-a569f635-5901-4594-bb5b-f3c4a1351ac7.png#clientId=uc3e91c96-31ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=691&id=u87fe195f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=691&originWidth=1092&originalType=binary&ratio=1&rotation=0&showTitle=false&size=64602&status=done&style=none&taskId=u828f64ca-ceb6-45b5-91a6-05d933f91c7&title=&width=1092) <br />3):启动成功<br />The broker[broker-a, 192.168.1.16:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23113393/1645196186208-564fcc9d-31e6-43b9-8166-5724e48836ee.png#clientId=uc3e91c96-31ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=235&id=ub93de541&margin=%5Bobject%20Object%5D&name=image.png&originHeight=235&originWidth=1000&originalType=binary&ratio=1&rotation=0&showTitle=false&size=30238&status=done&style=none&taskId=uf507fc11-d0a9-4cca-b6ab-88c9b4285ea&title=&width=1000)

1.3 producer测试发送消息

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
  3. DefaultMQProducer producer = new DefaultMQProducer("producer_group_1");
  4. //设置nameServer服务地址
  5. producer.setNamesrvAddr("127.0.0.1:9876");
  6. //生产者服务启动
  7. producer.start();
  8. Message msg = new Message("TopicTest" /* Topic */,
  9. "TagA" /* Tag */,
  10. "你好,RocketMQ,hello world!".getBytes() /* Message body */
  11. );
  12. SendResult sendResult = producer.send(msg);
  13. //打印发送结果
  14. System.out.printf("%s%n", sendResult);
  15. //生产者停止
  16. producer.shutdown();
  17. }
  18. }

发送成功

image.png

1.4 producer测试消费消息

  1. public class Consumer {
  2. public static void main(String[] args) throws InterruptedException, MQClientException {
  3. //初始化消费组
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_1");
  5. //设置nameServer地址
  6. consumer.setNamesrvAddr("127.0.0.1:9876");
  7. //设置从哪开始消费
  8. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  9. //设置订阅的topic
  10. consumer.subscribe("TopicTest", "*");
  11. //消费者消息消费监听
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. @Override
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  15. ConsumeConcurrentlyContext context) {
  16. System.out.printf("线程名:%s \t收到消息: %s %n", Thread.currentThread().getName(), msgs);
  17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  18. }
  19. });
  20. //消费者启动
  21. consumer.start();
  22. System.out.printf("消费者启动.%n");
  23. }
  24. }

消息消费成功,其中body(消息体)是字节数组,需要做转换。
image.png