初识MQ

同步通讯

image.png
image.png


异步通讯

image.png
image.png


MQ常见框架

MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
image.png


RabbitMQ快速入门

RabbitMQ概述和安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:

  1. docker pull rabbitmq:3-management
  2. docker run \
  3. -e RABBITMQ_DEFAULT_USER=xiaochong \
  4. -e RABBITMQ_DEFAULT_PASS=123321 \
  5. --name mq \
  6. --hostname mq1 \
  7. -p 15672:15672 \
  8. -p 5672:5672 \
  9. -d \
  10. rabbitmq:3-management

image.png

RabbitMQ中的几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息模型

image.png
image.png


快速入门

HelloWorld案例
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

image.png

<dependencies>
  <!--AMQP依赖,包含RabbitMQ-->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
</dependencies>
public void testSendMessage() throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("23.224.181.140");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("xiaochong");
    factory.setPassword("123321");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.发送消息
    String message = "hello, rabbitmq!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("发送消息成功:【" + message + "】");

    // 5.关闭通道和连接
    channel.close();
    connection.close();
}
 public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("23.224.181.140");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xiaochong");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }

为什么消费者也创建队列,生产者也创建队列

因为,这两个服务部署是不确定的,不确定哪个先,已经存在就不会在创建


SpringAMQP

什么是SpringAMQP

SpringAmqp的官方地址:
image.png


Basic Queue简单队列模型

利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

  1. 在父工程中引入spring-amqp的依赖
  2. 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
  3. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

步骤1:引入AMQP依赖
image.png

步骤2:在publisher中编写测试方法,向simple.queue发送消息

spring:
 rabbitmq:
  host: 23.224.181.140
  port: 5672
  username: xiaochong
  password: 123321
  virtual-host: /
package cn.itcast.mq.spring;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue(){
        String queueName = "simple.queue";
        String message = "hello,spring amqp!";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

步骤3:在consumer中编写消费逻辑,监听simple.queue
image.png

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能


Work Queue工作队列

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
image.png


模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

image.png
image.png
image.png


发布( Publish )、订阅( Subscribe )

image.png


发布订阅-Fanout Exchange

image.png


利用SpringAMQP演示FanoutExchange的使用

image.png
image.png

步骤1:在consumer服务声明Exchange、Queue、Binding

package cn.itcast.mq;
@Configuration
public class FanoutConfig {
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("xiaochong");
    }

    @Bean
    public Queue fanoutQueue1(){
        return  new Queue("fanout.queue1");
    }

    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Queue fanoutQueue2(){
        return  new Queue("fanout.queue2");
    }

    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

image.png


步骤2:在consumer服务声明两个消费者

    @RabbitListener(queues = "fanout.queue1")
    public void listenWorkQueue(String msg) throws InterruptedException {
        System.out.println("消费者1接收到信息: [" + msg + "]" + LocalTime.now());
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到信息: [" + msg + "]" + LocalTime.now());
    }

步骤3:在publisher服务发送消息到FanoutExchange

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName = "xiaochong";
        //消息
        String message = "hello,every one!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

image.png


发布订阅-DirectExchange

image.png

利用SpringAMQP演示DirectExchange的使用

image.png
image.png
image.png
image.png


发布订阅-TopicExchange

image.png


利用SpringAMQP演示TopicExchange的使用

image.png
image.png
image.png


SpringAMQP-消息转换器

image.png
image.png
image.png
image.png