1.1 idea导入RocketMQ源码
官网下载地址:https://github.com/apache/rocketmq/tree/master
1.2 启动namesrv模块
1):配置NamesrvStartup启动应用,增加Environment variables(环境变量)
ROCKETMQ_HOME=D:\rocketmq(可自定义)
2): 复制distribution/conf下的broker.conf,logback_broker.xml,logback_broker.xml到D:\rocketmq(可自定义)目录下
3): 修改broker.conf参数
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#存储路径
storePathRootDir=D:\\rocketmq\\store
#commitLog存储路径
storePathCommitLog=D:\\rocketmq\\store\\commitlog
#消费队列存储地址
storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue
#消息索引存储路径
storePathIndex=D:\\rocketmq\\store\\index
#checkpoint文件存储路径
storeCheckpoint=D:\\rocketmq\\store\\checkpoint
#abort文件存储路径
abortFile=D:\\rocketmq\\store\\abort
4): 替换logback_broker.xml,logback_broker.xml中user.home为ROCKETMQ_HOME
5): 执行NamesrvStartup.java,并成功启动
The Name Server boot success. serializeType=JSON
1.2 启动broker模块
启动方式同namesrv模块基本一致<br />1) 配置NamesrvStartup启动应用<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23113393/1645196134551-0e97d932-0ac1-4c46-9d4d-51d484720aa6.png#clientId=uc3e91c96-31ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=593&id=udee8b58a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=593&originWidth=1318&originalType=binary&ratio=1&rotation=0&showTitle=false&size=120498&status=done&style=none&taskId=ua4ad5eb7-e137-41dc-9d6a-aacaf68fc4f&title=&width=1318)<br />2): 增加Program arguments参数配置<br />-c D:\rocketmq\conf\broker.conf<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23113393/1645196204630-a569f635-5901-4594-bb5b-f3c4a1351ac7.png#clientId=uc3e91c96-31ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=691&id=u87fe195f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=691&originWidth=1092&originalType=binary&ratio=1&rotation=0&showTitle=false&size=64602&status=done&style=none&taskId=u828f64ca-ceb6-45b5-91a6-05d933f91c7&title=&width=1092) <br />3):启动成功<br />The broker[broker-a, 192.168.1.16:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23113393/1645196186208-564fcc9d-31e6-43b9-8166-5724e48836ee.png#clientId=uc3e91c96-31ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=235&id=ub93de541&margin=%5Bobject%20Object%5D&name=image.png&originHeight=235&originWidth=1000&originalType=binary&ratio=1&rotation=0&showTitle=false&size=30238&status=done&style=none&taskId=uf507fc11-d0a9-4cca-b6ab-88c9b4285ea&title=&width=1000)
1.3 producer测试发送消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_group_1");
//设置nameServer服务地址
producer.setNamesrvAddr("127.0.0.1:9876");
//生产者服务启动
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
"你好,RocketMQ,hello world!".getBytes() /* Message body */
);
SendResult sendResult = producer.send(msg);
//打印发送结果
System.out.printf("%s%n", sendResult);
//生产者停止
producer.shutdown();
}
}
1.4 producer测试消费消息
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//初始化消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_1");
//设置nameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置从哪开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置订阅的topic
consumer.subscribe("TopicTest", "*");
//消费者消息消费监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("线程名:%s \t收到消息: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//消费者启动
consumer.start();
System.out.printf("消费者启动.%n");
}
}
消息消费成功,其中body(消息体)是字节数组,需要做转换。