先说结论
发送消息:默认会发1000条消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
上面这两个命令需要在RocketMQ的安装目录下去执行,另外需要配置环境变量:
export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'
如果会搞的话,就不需要看下面了, 如果不太了解的话,就看下面, 自己操作一下, 如果没搭建集群的话,参考下面博客搭建一下RocketMQ集群. https://zjj1994.blog.csdn.net/article/details/120810055
搭建Rocketmq-console可视化插件
如果你们有别的可视化插件可以略过这里.
这里windows搭建的: 直接看 https://zjj1994.blog.csdn.net/article/details/120809685 博客, 我就不演示了. 很简单
通过Rocketmq-console查看集群情况,
查看到集群搭建成功.
执行官方示例测试集群是否好用
说明
在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在
zjj102上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以
运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认
会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'
然后就可以正常执行了。
这个NameServer地址的读取方式见源码中
org.apache.rocketmq.common.utils.NameServerAddressUtils
public static String getNameServerAddresses() {
return System.getProperty("rocketmq.namesrv.addr",
System.getenv("NAMESRV_ADDR"));
}
这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。
这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己
的例子也可以放到RocketMQ的lib目录下去执行。
开始演示
配置环境变量:
修改 /etc/profile 文件
export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'
配置完了之后,别忘了执行 source /etc/profile 让环境变量生效.
执行 echo $NAMESRV_ADDR 查看环境变量是否配置有问题,能打印出 “zjj101:9876;zjj102:9876;zjj103:9876 “ 说明配置没问题
下面开始演示: 在RocketMq 安装目录下执行 : sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 查看配置的环境变量是否生效
[root@zjj102 rocketmq-all-4.7.1-bin-release]# echo $NAMESRV_ADDR
# 说明生效了
zjj101:9876;zjj102:9876;zjj103:9876
# 执行官方示例
[root@zjj102 rocketmq-all-4.7.1-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 下面开始打印日志了,说明生效了
12:58:13.013 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BCD0000, offsetMsgId=AC100A6600002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEB0001, offsetMsgId=AC100A6600002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEF0002, offsetMsgId=AC100A6600002A9F0000000000000196, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BF10003, offsetMsgId=AC100A6700002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3C0F0004, offsetMsgId=AC100A6700002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=0]
点击刷新查看多了个TopicTest,这个是执行官方示例自动生成的.
你再打开一个终端连接zjj102,执行下面的命令启动消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
自己可以反复的启动 示例 , 生产消息和消费消息
点击 TopicTest的consumer管理来查看消费情况
下面显示的延迟1000 的意思就是有1000个没消费.你再执行 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 启动官方消费者示例之后,你就能看到延迟变少了,就说明消费了.
下面图就说明都被这个消费者消费了, 延迟为0说明没有消息积压了.