1、安装 RocketMQ

下载:rocketmq-all-4.8.0-bin-release.zip

1、传入 Linux 服务器

2、解压缩

  1. unzip rocketmq-all-4.8.0-bin-release.zip

3、调整启动参数,

  1. cd rocketmq-all-4.8.0-bin-release/bin

修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败

调整namesrv

  1. vim runserver.sh

调整如下

Spring Cloud Alibaba 05:RocketMQ - 图1

调整broker

  1. vim runbroker.sh

调整如下

Spring Cloud Alibaba 05:RocketMQ - 图2

4、启动namesrv和启动broker

启动navmesrv

  1. nohup sh mqnamesrv &

启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876

  1. nohup ./mqbroker -n localhost:9876 &

5、检查是否启动成功

  1. jps -l

Spring Cloud Alibaba 05:RocketMQ - 图3

也可以查看日志

  1. tail -f ~/logs/rocketmqlogs/broker.log

启动成功

6、测试 RocketMQ

消息发送

  1. export NAMESRV_ADDR=localhost:
  2. ./tools.sh org.apache.rocketmq.example.quickstart.Producer

消息接收

  1. ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

7、关闭 RocketMQ

  1. ./mqshutdown broker
  2. ./mqshutdown namesrv

2、安装 RocketMQ 控制台

  1. git clone https://github.com/apache/rocketmq-externals.git

1、进入到rocketmq-console的配置文件,修改如下:

Spring Cloud Alibaba 05:RocketMQ - 图4

2、打包

  1. mvn clean package -Dmaven.test.skip=true

3、进入 target 启动 jar

  1. java -jar rocketmq-console-ng-2.0.0.jar

打开浏览器访问 localhost:9877,如果报错

Spring Cloud Alibaba 05:RocketMQ - 图5

这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口

  1. firewall-cmd --zone=public --add-port=10909/tcp --permanent
  2. firewall-cmd --zone=public --add-port=10911/tcp --permanent
  3. firewall-cmd --zone=public --add-port=9876/tcp --permanent
  4. systemctl restart firewalld.service
  5. firewall-cmd --reload

重新启动控制台项目

3、Java 实现消息发送

1、pom.xml 中引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.1</version>
  5. </dependency>

2、生产消息

  1. package com.godfrey;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. @SpringBootTest
  6. class ProviderApplicationTests {
  7. @Test
  8. @DisplayName("测试RocketMQ消息发送")
  9. void test3() throws Exception {
  10. //创建消息生产者
  11. DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
  12. //设置NameServer
  13. producer.setNamesrvAddr("39.106.41.184:9876");
  14. //启动生产者
  15. producer.start();
  16. //构建消息对象
  17. Message message = new Message("myTopic", "myTag", ("Test MQ").getBytes());
  18. //发送消息
  19. SendResult result = producer.send(message, 1000);
  20. System.out.println(result);
  21. //关闭生产者
  22. producer.shutdown();
  23. }
  24. }

3、直接运行,如果报错 sendDefaultImpl call timeout,可以开放 10911 端口

  1. firewall-cmd --zone=public --add-port=10911/tcp --permanent
  2. systemctl restart firewalld.service
  3. firewall-cmd --reload

打开 RocketMQ 控制台,可查看消息。

4、Java 实现消息消费

  1. package com.godfrey;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. @SpringBootTest
  10. class ProviderApplicationTests {
  11. @Test
  12. @DisplayName("测试RocketMQ消息接收")
  13. void test4() throws MQClientException {
  14. //创建消息消费者
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
  16. //设置NameServer
  17. consumer.setNamesrvAddr("39.106.41.184:9876");
  18. //指定订阅的主题和标签
  19. consumer.subscribe("myTopic", "*");
  20. //回调函数
  21. consumer.registerMessageListener(new MessageListenerConcurrently() {
  22. @Override
  23. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  24. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  25. }
  26. });
  27. //启动消费者
  28. consumer.start();
  29. }
  30. }

5、Spring Boot 整合 RocketMQ

provider

1、pom.xml

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-client</artifactId>
  9. <version>4.8.0</version>
  10. </dependency>

2、application.yml

  1. rocketmq:
  2. name-server: 39.106.41.184:9876
  3. producer:
  4. group: myprovider

3、Order

  1. package com.godfrey.entity;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * @author godfrey
  6. * @since 2020-12-27
  7. */
  8. public class Order implements Serializable {
  9. private static final long serialVersionUID = -5397628182599822017L;
  10. private Integer id;
  11. private String buyerName;
  12. private String buyerTel;
  13. private String address;
  14. private Date createDate;
  15. public Order() {
  16. }
  17. public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {
  18. this.id = id;
  19. this.buyerName = buyerName;
  20. this.buyerTel = buyerTel;
  21. this.address = address;
  22. this.createDate = createDate;
  23. }
  24. public Integer getId() {
  25. return id;
  26. }
  27. public void setId(Integer id) {
  28. this.id = id;
  29. }
  30. public String getBuyerName() {
  31. return buyerName;
  32. }
  33. public void setBuyerName(String buyerName) {
  34. this.buyerName = buyerName;
  35. }
  36. public String getBuyerTel() {
  37. return buyerTel;
  38. }
  39. public void setBuyerTel(String buyerTel) {
  40. this.buyerTel = buyerTel;
  41. }
  42. public String getAddress() {
  43. return address;
  44. }
  45. public void setAddress(String address) {
  46. this.address = address;
  47. }
  48. public Date getCreateDate() {
  49. return createDate;
  50. }
  51. public void setCreateDate(Date createDate) {
  52. this.createDate = createDate;
  53. }
  54. @Override
  55. public String toString() {
  56. return "Order{" +
  57. "id=" + id +
  58. ", buyerName='" + buyerName + '\'' +
  59. ", buyerTel='" + buyerTel + '\'' +
  60. ", address='" + address + '\'' +
  61. ", createDate=" + createDate +
  62. '}';
  63. }
  64. }

4、Controller

  1. private RocketMQTemplate rocketMQTemplate;
  2. @Autowired
  3. public ProviderController(RocketMQTemplate rocketMQTemplate) {
  4. this.rocketMQTemplate = rocketMQTemplate;
  5. }
  6. @GetMapping("/create")
  7. public Order create(){
  8. Order order = new Order(
  9. 1,
  10. "张三",
  11. "123123",
  12. "软件园",
  13. new Date()
  14. );
  15. this.rocketMQTemplate.convertAndSend("myTopic",order);
  16. return order;
  17. }

consumer

1、pom.xml

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-client</artifactId>
  9. <version>4.8.0</version>
  10. </dependency>

2、application.yml

  1. rocketmq:
  2. name-server: 39.106.41.184:9876

3、Order

  1. package com.godfrey.entity;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * @author godfrey
  6. * @since 2020-12-27
  7. */
  8. public class Order implements Serializable {
  9. private static final long serialVersionUID = -5397628182599822017L;
  10. private Integer id;
  11. private String buyerName;
  12. private String buyerTel;
  13. private String address;
  14. private Date createDate;
  15. public Order() {
  16. }
  17. public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {
  18. this.id = id;
  19. this.buyerName = buyerName;
  20. this.buyerTel = buyerTel;
  21. this.address = address;
  22. this.createDate = createDate;
  23. }
  24. public Integer getId() {
  25. return id;
  26. }
  27. public void setId(Integer id) {
  28. this.id = id;
  29. }
  30. public String getBuyerName() {
  31. return buyerName;
  32. }
  33. public void setBuyerName(String buyerName) {
  34. this.buyerName = buyerName;
  35. }
  36. public String getBuyerTel() {
  37. return buyerTel;
  38. }
  39. public void setBuyerTel(String buyerTel) {
  40. this.buyerTel = buyerTel;
  41. }
  42. public String getAddress() {
  43. return address;
  44. }
  45. public void setAddress(String address) {
  46. this.address = address;
  47. }
  48. public Date getCreateDate() {
  49. return createDate;
  50. }
  51. public void setCreateDate(Date createDate) {
  52. this.createDate = createDate;
  53. }
  54. @Override
  55. public String toString() {
  56. return "Order{" +
  57. "id=" + id +
  58. ", buyerName='" + buyerName + '\'' +
  59. ", buyerTel='" + buyerTel + '\'' +
  60. ", address='" + address + '\'' +
  61. ", createDate=" + createDate +
  62. '}';
  63. }
  64. }

4、Service

  1. package com.godfrey.service;
  2. import com.godfrey.entity.Order;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.apache.rocketmq.spring.core.RocketMQListener;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @author godfrey
  10. * @since 2020-12-27
  11. */
  12. @Service
  13. @RocketMQMessageListener(consumerGroup = "myConsumer", topic = "myTopic")
  14. public class SmsService implements RocketMQListener<Order> {
  15. private static final Logger log = LoggerFactory.getLogger(SmsService.class);
  16. @Override
  17. public void onMessage(Order order) {
  18. log.info("新订单{},发短信通知用户", order);
  19. }
  20. }