一、MQ基础

1、简介

  • MQ = 消息中间件
  • 消息:短信,微信,语音…….
  • 中间件:Eureka 注册中心也属于中间件

2、MQ产品种类

  • RabbitMQ
  • ActiveMQ
  • kafka
  • RocketMQ
  • ……

3、面试题

3.1 在何种场景下使用了消息中间件?

3.2 为什么要在系统中引入消息中间件?

  • 要做到系统解耦,当心的模块接入进来,可以做到代码改动最小【解耦合】
  • 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮【消峰】
  • 强弱依赖疏理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力【异步】

4、MQ的作用定义

  • 发送者把消息发送给消息服务器,消息服务器将消息存放在若干 列队/主题 中,在合适的时候,消息服务器会将消息转发给接收者。
  • 在这个过程中, 发送和接收都是异步的 ,也就是无需等待,而且发送者和接受者的声明周期也没有必然关系。
  • 尤其在发布 pub/ 订阅 sub模式下,也可以完成一对多的通信,及让一个消息有多个接收者。

    5、ActiveMQ技术维度

  • api的接收与发送

  • MQ的高可用性
  • MQ的集群和容错配置
  • MQ的持久化
  • 延迟发送/定时投递
  • 签收机制
  • Spring整合

二、ActiveMQ的安装和启动

https://blog.csdn.net/qq_41933149/article/details/101571463
启动:
在activeMQ的bin目录下

  • 使用 ./activemq start 命令启动
  • 使用 ./activemq restart 命令重启
  • 使用 ./activemq stop 命令停止
  • ./activemq start > /usr/activemq/log/run_activemq.log 启动并且将信息写入到 run_activemq.log 日志文件中

/usr/activemq/apache-activemq-5.9.0/bin
ActiveMQ默认启动的端口是 61616

  1. 查看ActiveMQ是否已经启动
  2. ps -ef | grep activemq | grep -v grep
  3. 查看61616端口是否被占用
  4. netstat -anp | grep 61616
  5. 查看端口号的使用
  6. [root@localhost bin]# lsof -i:61616
  7. COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
  8. java 36976 root 125u IPv6 189037 0t0 TCP *:61616 (LISTEN)

image.png
在浏览器访问 ActiveMQ :

61616 端口提供 JMS 服务
8161端口提供管理控制台服务


三、Java实现ActiveMQ通讯

1、步骤

  1. 创建一个connection factory
  2. 通过connection factory 来创建JSM connection
  3. 启动JMS connection 调用start方法
  4. 通过connection创建JMS session
  5. 创建JMS destination 目的地 [队列/主题]
  6. 创建提供者或者创建一个message并设置destination
  7. 创建消费者 或者是注册一个JMS message listener
  8. 发送或者接收JMS message
  9. 关闭所有的JMS资源(consumer provider session connection

image.png

在点对点的消息传递域中(张三发给李四),目的地被称为队列(queue)
在发布订阅消息传递与中(博主发给粉丝),目的地被称为主题(topic)

2、Java实现队列 【消息生产者】

依赖

  1. <!--activemq 依赖-->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-all</artifactId>
  5. <version>5.11.2</version>
  6. </dependency>
  7. <!--简化开发-->
  8. <dependency>
  9. <groupId>org.projectlombok</groupId>
  10. <artifactId>lombok</artifactId>
  11. <version>1.18.18</version>
  12. <scope>provided</scope>
  13. </dependency>
  14. <!--记录日志-->
  15. <dependency>
  16. <groupId>org.slf4j</groupId>
  17. <artifactId>slf4j-api</artifactId>
  18. <version>2.0.0-alpha1</version>
  19. </dependency>

Java代码

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. /**
  4. * @date: 2021/3/31 18:18
  5. * @author: 易学习
  6. */
  7. public class JmsProvider {
  8. private static final String DEFAULT_BROKER_HOST = "tcp://192.168.163.10:61616";
  9. public static final String QUEUE_NAME = "queue01";
  10. public static void main(String[] args) throws JMSException {
  11. // 1、创建连接工厂,传入activeMQ的URL地址,默认的用户名密码是 admin admin
  12. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_HOST);
  13. // 2、通过连接工厂获取连接Connection 并启动访问
  14. Connection connection = activeMQConnectionFactory.createConnection();
  15. connection.start();
  16. // 3、创建Session 参数1:事务 参数2:签收
  17. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  18. // 4、创建目的地 [队列还是主题]
  19. Queue queue = session.createQueue(QUEUE_NAME);
  20. // 5、创建消息的生产者
  21. MessageProducer producer = session.createProducer(queue);
  22. // 6、通过使用消息生产者生产3条消息发送到MQ的队列里面
  23. for (int i = 0; i < 3; i++){
  24. // 7、创建消息
  25. TextMessage textMessage = session.createTextMessage("消息--" + (i + 1));
  26. // 8、通过生产者发布给mq
  27. producer.send(textMessage);
  28. }
  29. // 9、释放资源
  30. producer.close();
  31. session.close();
  32. connection.close();
  33. System.out.println("----消息已发送----");
  34. }
  35. }

执行完后 在浏览器中可以看到
image.png

3、【消息消费者】

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. /**
  4. * @date: 2021/3/31 19:07
  5. * @author: 易学习
  6. */
  7. public class JmsConsumer {
  8. public static final String ACTIVEMQ_URL = "tcp://192.168.163.10:61616";
  9. public static final String QUEUE_NAME = "queue01";
  10. public static void main(String[] args) throws JMSException {
  11. // 1、创建ActiveMQ连接工厂
  12. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  13. // 2、获取Connection 并启动
  14. Connection connection = activeMQConnectionFactory.createConnection();
  15. connection.start();
  16. // 3、获取Session
  17. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  18. // 4、创建目的地(主题/队列)
  19. Queue queue = session.createQueue(QUEUE_NAME);
  20. // 5、创建消费者
  21. MessageConsumer consumer = session.createConsumer(queue);
  22. // 6、进行消费
  23. /*
  24. 同步阻塞方式 receive() 方法
  25. 订阅者或者接收者调用consumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
  26. */
  27. while (true){
  28. // 接收 (因为生产者传进去的是TextMessage类型,所以取出来强转成这种)
  29. TextMessage receive = (TextMessage) consumer.receive();
  30. if (receive != null) {
  31. System.out.println("------消费者---->" + receive.getText());
  32. }else{
  33. break;
  34. }
  35. }
  36. // 7、关闭资源
  37. consumer.close();
  38. session.close();
  39. connection.close();
  40. }
  41. }

image.png

4、消费者receive()方法说明

  • consumer.receive();// 如果这个参数为空,则表示一直等待 死等,IDEA中灯不灭
  • consumer.receive(3000L); // 等待3秒,单位是毫秒,如果三秒消费不到就不等了 IDEA 熄灯

5、消费者MessageListener() 方法说明

通过监听的方式来消费消息 【比较好点 异步非阻塞方式

  1. // 控制台停留 不添加这句话不会进行消费

System.in.read();

  1. /**
  2. * @date: 2021/3/31 19:07
  3. * @author: 易学习
  4. */
  5. public class JmsConsumer {
  6. public static final String ACTIVEMQ_URL = "tcp://192.168.163.10:61616";
  7. public static final String QUEUE_NAME = "queue01";
  8. public static void main(String[] args) throws JMSException, IOException {
  9. // 1、创建ActiveMQ连接工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  11. // 2、获取Connection 并启动
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. connection.start();
  14. // 3、获取Session
  15. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  16. // 4、创建目的地(主题/队列)
  17. Queue queue = session.createQueue(QUEUE_NAME);
  18. // 5、创建消费者
  19. MessageConsumer consumer = session.createConsumer(queue);
  20. // 6、进行消费
  21. // 使用MessageListener进行消费信息
  22. // 7、消费消息
  23. consumer.setMessageListener(new MessageListener() {
  24. @SneakyThrows
  25. public void onMessage(Message message) {
  26. if (message != null){
  27. TextMessage textMessage = (TextMessage) message;
  28. String text = textMessage.getText();
  29. System.out.println("----消费者---->" + text);
  30. }
  31. }
  32. });
  33. // 控制台停留 不添加这句话不会进行消费
  34. System.in.read();
  35. // 7、关闭资源
  36. consumer.close();
  37. session.close();
  38. connection.close();
  39. }
  40. }

6、问题

  1. 先启动两个消费者,再生产6点消息,问题:消费情况如何?

    • 会进行论循消费,类似负载均衡 一人一个

      7、两种消费方式

  2. 同步阻塞方式 receive()

    • 订阅者或者接收方调用 consumer.receive() 方法来接收消息,receive方法再能够接收到消息之前或 超时之前 将一直阻塞。
  3. 异步非阻塞方式 监听器 onMessage()

    • 订阅者或者接收者通过consumer.setMessageListener(MessageListener listener)注册一个消息监听器。
    • 当消息到达之后,系统自动调用监听器 MessageListener的onMessage(Message message)方法

      8、点对点消息传递域的特点

  4. 每个消息只能有一个消费者,类似于 1 对 1 的关系,好比个人快递自己领取自己的。

  5. 消息的生产者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息,【好比我们发送短信,发送者发送后不见得接收者会即收即看】
  6. 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息

三、Topic 讲解

主题

1、发布订阅消息传递域的特点如下:

  1. 生产者将消息发布到 topic中,每个消息可以有多个消费者,属于1:N的关系
  2. 生产者和消费者之间有时间上的相关性,订阅某一个主题的消费者只能消费 自他订阅之后发布的消息。
  3. 生产者生产时,topic不保存消息他是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
  4. 凡是订阅的 都可以收到 **session.createTopic("目的地和生产者的目的地一样")**

JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的信息 一句话,好比我们的微信公众号订阅

2、java实现topic 【主题】代码

不同点就是把目的地Queue换成了 Topic

2.1 消费者

  1. public class JmsConsumerTopic {
  2. public static final String ACTIVEMQ_URL = "tcp://192.168.163.10:61616";
  3. public static final String TOPIC_NAME = "topic-yixuexi";
  4. public static void main(String[] args) throws JMSException, IOException {
  5. System.out.println("---------我是3号消费者 topic--------");
  6. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  7. Connection connection = activeMQConnectionFactory.createConnection();
  8. connection.start();
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. // 创建目的地 为 topic 主题
  11. Topic topic = session.createTopic(TOPIC_NAME);
  12. // 创建消费者,传入目的地
  13. MessageConsumer consumer = session.createConsumer(topic);
  14. consumer.setMessageListener(new MessageListener() {
  15. @SneakyThrows
  16. public void onMessage(Message message) {
  17. if (message != null){
  18. TextMessage textMessage = (TextMessage) message;
  19. System.out.println("topic消费者----->" + textMessage.getText());
  20. }
  21. }
  22. });
  23. // 控制台停留
  24. System.in.read();
  25. // 7、关闭资源
  26. consumer.close();
  27. session.close();
  28. connection.close();
  29. }
  30. }

2.2 生产者

  1. public class JmsProviderTopic {
  2. public static final String ACTIVE_URL = "tcp://192.168.163.10:61616";
  3. public static final String TOPIC_NAME = "topic-yixuexi";
  4. public static void main(String[] args) throws JMSException {
  5. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
  6. Connection connection = activeMQConnectionFactory.createConnection();
  7. connection.start();
  8. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  9. // 创建topic 目的地
  10. Topic topic = session.createTopic(TOPIC_NAME);
  11. MessageProducer producer = session.createProducer(topic);
  12. for (int i = 0; i < 10; i++) {
  13. TextMessage textMessage = session.createTextMessage("topic消息:" + (i + 1));
  14. producer.send(textMessage);
  15. }
  16. producer.close();
  17. session.close();
  18. connection.close();
  19. }
  20. }

四、Topic和Queue对比总结

1、模式比较 面试题

比较项目: Topic模式列队 Queue模式列队
工作模式: “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息 “负载均衡”模式,如果当前没有消费者,消息也不会被丢弃;如果有多个消费者,那么一条信息也可以只会发送其中一个消费者,并且要求消费者ack信息
有无状态: 无状态 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ 一般保存在 $AMQ_HOME\data\kr-store\data 下面,也可以配置成DB存储
传递完整性: 如果没有订阅者,消息会被丢弃 消息不会丢弃
处理效率: 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差距 由于一条消息只能发送给一个消费者,所以就算消费者再多,性能也不会有明显的降低,当然不同消息协议的具体性能那也是有差异的。