Feign调用就属于同步方式,调用可以实时得到结果。
RabbitMQ是软件与软件之间异步消息通信。

什么是MQ?MQ可以解决什么问题?

MQ(Message Queue)消息队列,消息队列中间件是分布式系统中重要的组件,主要是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。
image.png
异步通信的优缺点:
好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速
  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用间没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔,可替换
  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

几种常见的MQ技术:
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka

MQ的基本结构
RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

image.png
image.png

MQ怎么使用?

下载安装MQ(略)
执行下面的命令来运行MQ容器:

  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=itcast \
  3. -e RABBITMQ_DEFAULT_PASS=123321 \
  4. -v mq-plugins:/plugins \
  5. --name mq \
  6. --hostname mq \
  7. -p 15672:15672 \
  8. -p 5672:5672 \
  9. -d \
  10. rabbitmq:3.8-management

原生MQ使用:

RabbitMQ消息模型:
image.png
基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

    SpringAMQP

    springAMQP是什么?

    SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
    SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

    springAMQP是干什么的?

    SpringAMQP提供了三个功能:
  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

    springAMQP怎么用?

    1.Basic Queue 简单队列模型

    在父工程mq-demo中引入依赖
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    消息发送
    首先配置MQ地址,在publisher服务的application.yml中添加配置:
    spring:
    rabbitmq:
      host: 192.168.200.130 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机
      username: itcast # 用户名
      password: 123321 # 密码
    
    然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息接收
首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

然后在consumer服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener,代码如下:

 x package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class SpringRabbitListener {    @RabbitListener(queues = "simple.queue")    public void listenSimpleQueueMessage(String msg) throws InterruptedException {        System.out.println("spring 消费者接收到消息:【" + msg + "】");    }}

测试

面试题:

什么是MQ?

MQ(Message Queue)消息队列,消息队列中间件是分布式系统中重要的组件,主要是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

MQ的应用场景?

MQ最直接的使用场景就是可以将两个系统进行解耦,主要有异步处理,应用解耦,流量削锋和消息通讯四个应用场景

常见的mq产品?

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

    什么是AMQP协议模型?

    AMQP,即(Advanced Message Queuing Protocol),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    是用于在应用程序之间传递业务消息的开放标准

    rabbitmq支持哪些消息模式?

    一共支持五种消息模式

    1.简单模式(simple.queue),当生产者生产消息后,将消息发往队列,当队列中有消息时,消费者会实时的监听队列中的消息.如果有消息则会执行消息
    2.工作模式(Work queues),默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
    3.发布者订阅模式/广播模式(Fanout exchange), 生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者
    4.路由模式(Direct exchange)当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息
    5.主题模式(Topic exchange)当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放

    如何使用RabbitMQ收发消息?(基于SpringAMQP)?

    SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

通过SpringAMQP使用RabbitMQ
1.引入依赖


org.springframework.boot
spring-boot-starter-amqp

2.生产者端 添加配置
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
3.服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = “simple.queue”;
// 消息
String message = “hello, spring amqp!”;
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
可以查看队列已有一个消息
image.png
4.消费者端添加rabbitMQ配置,跟生产者端一样
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
5.然后在消费者端服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {
@RabbitListener(queues = “simple.queue”)
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println(“spring 消费者接收到消息:【” + msg + “】”);
}
}
6.启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息
可以看到接收到消息了,并且队列中的ready数减1了
image.png
image.png

如何项目中声明交换机、队列及绑定关系?

两种方式声明:

1.配置bean Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
@Configuration
public class FanoutConfig {
_ /
声明交换机
@return Fanout类型交换机
/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(“itcast.fanout”);
}
/_
**_
第1个队列
/
@Bean
public Queue fanoutQueue1(){
return new Queue(“fanout.queue1”);
}
/_
**_
绑定队列和交换机
/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/_
**_
第2个队列
/
@Bean
public Queue fanoutQueue2(){
return new Queue(“fanout.queue2”);
}
/_
**_
绑定队列和交换机
/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}**
2.基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
**
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “direct.queue1”),
exchange = @Exchange(name = “itcast.direct”, type = ExchangeTypes.DIRECT),
key = {“red”, “blue”}
))
_public void listenDirectQueue1(String msg){

System.out.println(“消费者接收到direct.queue1的消息:【” + msg + “】”);
}
@
QueueBinding*
注解的三个属性:

  • value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
  • exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 topic 方式
  • key: 在 topic 方式下,这个就是我们熟知的 routingKey

    消息转换器的作用?

    Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

    Message toMessage(Object object, MessageProperties messageProperties);
    将java对象和属性对象转换成Message对象。
    Object fromMessage(Message message) throws MessageConversionException;
    将消息对象转换成java对象。
    默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞
  • 可读性差

    如何修改MQ默认的消息转换器?

    Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
    如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
    消费者端生产者端两个服务中都引入依赖:

    com.fasterxml.jackson.dataformat
    jackson-dataformat-xml
    2.9.10

    配置消息转换器。
    在启动类中添加一个Bean即可:
    @Bean
    public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
    }

https://www.processon.com/view/link/62a735040e3e747c5c3ffeb9

MQ高级:
rabbitmq如何保证消息的可靠性?

rabbitmq如何解决百万消息堆积问题?

rabbitmq如何防止消息重复消费?

rabbitmq如何保证高可用?

rabbitmq消息的重试机制?

rabbitmq如何实现延迟消费?

rabbitmq在项目中的使用场景?