1、安装 RocketMQ
下载:rocketmq-all-4.8.0-bin-release.zip
1、传入 Linux 服务器
2、解压缩
unzip rocketmq-all-4.8.0-bin-release.zip
3、调整启动参数,
cd rocketmq-all-4.8.0-bin-release/bin
修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败
调整namesrv
vim runserver.sh
调整如下

调整broker
vim runbroker.sh
调整如下

4、启动namesrv和启动broker
启动navmesrv
nohup sh mqnamesrv &
启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876
nohup ./mqbroker -n localhost:9876 &
5、检查是否启动成功
jps -l

也可以查看日志
tail -f ~/logs/rocketmqlogs/broker.log
启动成功
6、测试 RocketMQ
消息发送
export NAMESRV_ADDR=localhost:./tools.sh org.apache.rocketmq.example.quickstart.Producer
消息接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
7、关闭 RocketMQ
./mqshutdown broker./mqshutdown namesrv
2、安装 RocketMQ 控制台
git clone https://github.com/apache/rocketmq-externals.git
1、进入到rocketmq-console的配置文件,修改如下:

2、打包
mvn clean package -Dmaven.test.skip=true
3、进入 target 启动 jar
java -jar rocketmq-console-ng-2.0.0.jar
打开浏览器访问 localhost:9877,如果报错

这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口
firewall-cmd --zone=public --add-port=10909/tcp --permanentfirewall-cmd --zone=public --add-port=10911/tcp --permanentfirewall-cmd --zone=public --add-port=9876/tcp --permanentsystemctl restart firewalld.servicefirewall-cmd --reload
重新启动控制台项目
3、Java 实现消息发送
1、pom.xml 中引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
2、生产消息
package com.godfrey;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;@SpringBootTestclass ProviderApplicationTests {@Test@DisplayName("测试RocketMQ消息发送")void test3() throws Exception {//创建消息生产者DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");//设置NameServerproducer.setNamesrvAddr("39.106.41.184:9876");//启动生产者producer.start();//构建消息对象Message message = new Message("myTopic", "myTag", ("Test MQ").getBytes());//发送消息SendResult result = producer.send(message, 1000);System.out.println(result);//关闭生产者producer.shutdown();}}
3、直接运行,如果报错 sendDefaultImpl call timeout,可以开放 10911 端口
firewall-cmd --zone=public --add-port=10911/tcp --permanentsystemctl restart firewalld.servicefirewall-cmd --reload
打开 RocketMQ 控制台,可查看消息。
4、Java 实现消息消费
package com.godfrey;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;@SpringBootTestclass ProviderApplicationTests {@Test@DisplayName("测试RocketMQ消息接收")void test4() throws MQClientException {//创建消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");//设置NameServerconsumer.setNamesrvAddr("39.106.41.184:9876");//指定订阅的主题和标签consumer.subscribe("myTopic", "*");//回调函数consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}}
5、Spring Boot 整合 RocketMQ
provider
1、pom.xml
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>
2、application.yml
rocketmq:name-server: 39.106.41.184:9876producer:group: myprovider
3、Order
package com.godfrey.entity;import java.io.Serializable;import java.util.Date;/*** @author godfrey* @since 2020-12-27*/public class Order implements Serializable {private static final long serialVersionUID = -5397628182599822017L;private Integer id;private String buyerName;private String buyerTel;private String address;private Date createDate;public Order() {}public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {this.id = id;this.buyerName = buyerName;this.buyerTel = buyerTel;this.address = address;this.createDate = createDate;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getBuyerName() {return buyerName;}public void setBuyerName(String buyerName) {this.buyerName = buyerName;}public String getBuyerTel() {return buyerTel;}public void setBuyerTel(String buyerTel) {this.buyerTel = buyerTel;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}public Date getCreateDate() {return createDate;}public void setCreateDate(Date createDate) {this.createDate = createDate;}@Overridepublic String toString() {return "Order{" +"id=" + id +", buyerName='" + buyerName + '\'' +", buyerTel='" + buyerTel + '\'' +", address='" + address + '\'' +", createDate=" + createDate +'}';}}
4、Controller
private RocketMQTemplate rocketMQTemplate;@Autowiredpublic ProviderController(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}@GetMapping("/create")public Order create(){Order order = new Order(1,"张三","123123","软件园",new Date());this.rocketMQTemplate.convertAndSend("myTopic",order);return order;}
consumer
1、pom.xml
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>
2、application.yml
rocketmq:name-server: 39.106.41.184:9876
3、Order
package com.godfrey.entity;import java.io.Serializable;import java.util.Date;/*** @author godfrey* @since 2020-12-27*/public class Order implements Serializable {private static final long serialVersionUID = -5397628182599822017L;private Integer id;private String buyerName;private String buyerTel;private String address;private Date createDate;public Order() {}public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {this.id = id;this.buyerName = buyerName;this.buyerTel = buyerTel;this.address = address;this.createDate = createDate;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getBuyerName() {return buyerName;}public void setBuyerName(String buyerName) {this.buyerName = buyerName;}public String getBuyerTel() {return buyerTel;}public void setBuyerTel(String buyerTel) {this.buyerTel = buyerTel;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}public Date getCreateDate() {return createDate;}public void setCreateDate(Date createDate) {this.createDate = createDate;}@Overridepublic String toString() {return "Order{" +"id=" + id +", buyerName='" + buyerName + '\'' +", buyerTel='" + buyerTel + '\'' +", address='" + address + '\'' +", createDate=" + createDate +'}';}}
4、Service
package com.godfrey.service;import com.godfrey.entity.Order;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/*** @author godfrey* @since 2020-12-27*/@Service@RocketMQMessageListener(consumerGroup = "myConsumer", topic = "myTopic")public class SmsService implements RocketMQListener<Order> {private static final Logger log = LoggerFactory.getLogger(SmsService.class);@Overridepublic void onMessage(Order order) {log.info("新订单{},发短信通知用户", order);}}
