image-RabbitMQ.png

RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

相关名词概念

Message 消息 - 信息载体。
一般包含两部分,消息体和标签。消息体是要发送的数据,标签是对数据的表述。
Producer 生产者 - 产生消息的一方。生产者消息投递都是面向交换机的
Consumer 消费者 - 消费消息的一方。消费者消费消息是面向消息队列的
Broker 中间件服务节点 - 可以看作是一个RabbitMQ服务实例。
Queue 队列 - RabbitMQ内部用于存储消息的对象(模型)
Exchange 交换器 - 接收消息并分派(根据路由规则)到队列。
常用的交换器类型有 fanout、direct、topic。
RoutingKey 路由键 - 生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。当RoutingKey与BindingKey匹配时,消息会被路由到相应的队列。RoutingKey需与交换器类型和绑定建(BindingKey)联合使用才能生效。
Binding 绑定 - RabbitMQ中通过Binding将Exchange和Queue关联起来,在绑定的时候,一般会指定一个绑定键(BindingKey)。
Topic 主题 - 根据主题分类处理。
Connection 连接 - 无论是生产者还是消费者,都需要与RabbitMQ Broker建立连接,这个连接就是一条TCP连接。
Channel 信道 - Channel是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
AMQP协议 AMQP协议是一个网络通信协议,RabbitMQ就是AMQP协议的Erlang实现。

架构

image-RabbitMQ-Frame.png

任务分发机制

exchange 多队列 多消费处理

消息确认机制

TCP - ack应答机制

持久化机制

  1. 交换机持久化

声明时指定durable => true

  1. 队列持久化

声明时指定durable => true

  1. 消息持久化

在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定. 注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。

消息序列化

使用ProtoBuf序列化消息

作用及应用场景

解决传统Web应用HTTP连接问题(并发耗时请求堆积(造成服务器崩溃、多线程消耗竞争CPU问题))
能对业务逻辑进行解耦,提高可扩展性

一般应用在大型的高并发项目(互联网项目)上,小型应用一般进行多线程处理即可

常见MQ应用的对比

ActiveMQ RabbitMQ RocketMQ Kafka

RabbitMQ安装

安装

erl环境
RabbitMQ服务
https://www.rabbitmq.com/download.html

配置与管理

服务管理

rabbitmqctl stop
rabbitmqctl start
rabbitmqctl status
rabbitmqctl reset
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins disable rabbitmq_management
端口:15672 默认登录用户密码 guest/guest

用户管理

rabbitmqctl list_users
rabbitmqctl add_user username
rabbitmqctl set_user_tag tagname
rabbitmqctl set_permissions [—vhost ]

虚拟主机管理

rabbitmqctl list_vhosts
rabbitmqctl add_vhost vhostname
rabbitmqctl delete vhost vhostname

交换机管理

队列管理

rabbitmqctl list_queues
rabbitmqctl reset

其他

rabbitmqctl cluster_status

问题

用户的tag
用户的vhost分配

RabbitMQ整合

SpringBoot整合RabbitMQ

  1. 添加依赖 ```xml org.springframework.boot spring-boot-starter-amqp

com.rabbitmq amqp-client 4.0.3

  1. 2. 添加配置
  2. ```json
  3. spring:
  4. rabbitmq:
  5. host: localhost
  6. port: 5672
  7. virtual-host: /jcoo
  8. username: test
  9. password: test
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    public final static String queueName = "test_topic_queue";
    public final static String exchangeName = "test_exchange_topic";

    /**
     * 设置queue
     */
    @Bean
    public Queue topicQueue() {
        return new Queue(queueName);
    }

    /**
     * 设置exchange
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(exchangeName);
    }

    /**
     * 绑定 exchange 与 queue
     */
    @Bean
    Binding bindingExchangeMessage1() {
        return BindingBuilder.bind(topicQueue()).to(exchange()).with("yi.#");
    }
}
  1. 创建生产者 ```java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;

import java.util.Date; import java.util.HashMap; import java.util.Map;

@RestController public class Producer { @Autowired RabbitTemplate rabbitTemplate;

@PostMapping("/send")
public String send(@RequestParam("message") String message) {
    Map<String, Object> mes = new HashMap<>();
    mes.put("data", message);
    rabbitTemplate.convertAndSend("test_exchange_topic", "yi.lei", mes);
    return "消息发送成功";
}

public static void main(String[] args) throws Exception {
    // 获取连接 创建信道(通过信道执行AMQP协议指令)
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = connection.createChannel();

    //声明交换机
    String exchangeName = "test_exchange_direct";
    String exchangeType = "direct";
    channel.exchangeDeclare(exchangeName, exchangeType);
    // 发送消息到exchange
    String message = new Date().toString();
    channel.basicPublish(exchangeName, "", null, message.getBytes());

    // 关闭连接
    channel.close();
    connection.close();
}

}


4. 创建消费者
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "test_topic_queue")
public class Consumer {
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("消费者收到消息  : " + testMessage.toString());
    }

    public static void main(String[] args) throws Exception {
        // 获取连接 创建信道(通过信道执行AMQP协议指令)
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        String queueName = "test_direct_queue";
        channel.queueDeclare(queueName, false, false, false, null);
        //绑定队列与交换机
        String exchangeName = "test_exchange_direct";
        channel.queueBind(queueName, exchangeName, "");
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
        }
    }
}

集群

  1. 测试

参考

https://www.rabbitmq.com/
https://www.rabbitmq.com/getstarted.html
https://www.rabbitmq.com/configure.html
https://www.cnblogs.com/lori/p/7852534.html
https://blog.csdn.net/shaoyunzhe/article/details/96461703
https://blog.csdn.net/qq_25221835/article/details/104008784
https://blog.csdn.net/maihilton/article/details/80928661