RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
相关名词概念
Message | 消息 - 信息载体。 一般包含两部分,消息体和标签。消息体是要发送的数据,标签是对数据的表述。 |
---|---|
Producer | 生产者 - 产生消息的一方。生产者消息投递都是面向交换机的 |
Consumer | 消费者 - 消费消息的一方。消费者消费消息是面向消息队列的 |
Broker | 中间件服务节点 - 可以看作是一个RabbitMQ服务实例。 |
Queue | 队列 - RabbitMQ内部用于存储消息的对象(模型) |
Exchange | 交换器 - 接收消息并分派(根据路由规则)到队列。 常用的交换器类型有 fanout、direct、topic。 |
RoutingKey | 路由键 - 生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。当RoutingKey与BindingKey匹配时,消息会被路由到相应的队列。RoutingKey需与交换器类型和绑定建(BindingKey)联合使用才能生效。 |
Binding | 绑定 - RabbitMQ中通过Binding将Exchange和Queue关联起来,在绑定的时候,一般会指定一个绑定键(BindingKey)。 |
Topic | 主题 - 根据主题分类处理。 |
Connection | 连接 - 无论是生产者还是消费者,都需要与RabbitMQ Broker建立连接,这个连接就是一条TCP连接。 |
Channel | 信道 - Channel是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。 |
AMQP协议 | AMQP协议是一个网络通信协议,RabbitMQ就是AMQP协议的Erlang实现。 |
架构
任务分发机制
消息确认机制
持久化机制
- 交换机持久化
声明时指定durable => true
- 队列持久化
声明时指定durable => true
- 消息持久化
在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定. 注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
消息序列化
使用ProtoBuf序列化消息
作用及应用场景
解决传统Web应用HTTP连接问题(并发耗时请求堆积(造成服务器崩溃、多线程消耗竞争CPU问题))
能对业务逻辑进行解耦,提高可扩展性
一般应用在大型的高并发项目(互联网项目)上,小型应用一般进行多线程处理即可
常见MQ应用的对比
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|---|
RabbitMQ安装
安装
erl环境
RabbitMQ服务
https://www.rabbitmq.com/download.html
配置与管理
服务管理
rabbitmqctl stop
rabbitmqctl start
rabbitmqctl status
rabbitmqctl reset
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins disable rabbitmq_management
端口:15672 默认登录用户密码 guest/guest
用户管理
rabbitmqctl list_users
rabbitmqctl add_user username
rabbitmqctl set_user_tag tagname
rabbitmqctl set_permissions [—vhost
虚拟主机管理
rabbitmqctl list_vhosts
rabbitmqctl add_vhost vhostname
rabbitmqctl delete vhost vhostname
交换机管理
队列管理
rabbitmqctl list_queues
rabbitmqctl reset
其他
rabbitmqctl cluster_status
问题
用户的tag
用户的vhost分配
RabbitMQ整合
SpringBoot整合RabbitMQ
- 添加依赖
```xml
org.springframework.boot spring-boot-starter-amqp
2. 添加配置
```json
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /jcoo
username: test
password: test
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public final static String queueName = "test_topic_queue";
public final static String exchangeName = "test_exchange_topic";
/**
* 设置queue
*/
@Bean
public Queue topicQueue() {
return new Queue(queueName);
}
/**
* 设置exchange
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(exchangeName);
}
/**
* 绑定 exchange 与 queue
*/
@Bean
Binding bindingExchangeMessage1() {
return BindingBuilder.bind(topicQueue()).to(exchange()).with("yi.#");
}
}
- 创建生产者 ```java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
import java.util.Date; import java.util.HashMap; import java.util.Map;
@RestController public class Producer { @Autowired RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public String send(@RequestParam("message") String message) {
Map<String, Object> mes = new HashMap<>();
mes.put("data", message);
rabbitTemplate.convertAndSend("test_exchange_topic", "yi.lei", mes);
return "消息发送成功";
}
public static void main(String[] args) throws Exception {
// 获取连接 创建信道(通过信道执行AMQP协议指令)
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
String exchangeName = "test_exchange_direct";
String exchangeType = "direct";
channel.exchangeDeclare(exchangeName, exchangeType);
// 发送消息到exchange
String message = new Date().toString();
channel.basicPublish(exchangeName, "", null, message.getBytes());
// 关闭连接
channel.close();
connection.close();
}
}
4. 创建消费者
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "test_topic_queue")
public class Consumer {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("消费者收到消息 : " + testMessage.toString());
}
public static void main(String[] args) throws Exception {
// 获取连接 创建信道(通过信道执行AMQP协议指令)
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
String queueName = "test_direct_queue";
channel.queueDeclare(queueName, false, false, false, null);
//绑定队列与交换机
String exchangeName = "test_exchange_direct";
channel.queueBind(queueName, exchangeName, "");
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
}
}
}
集群
- 测试
参考
https://www.rabbitmq.com/
https://www.rabbitmq.com/getstarted.html
https://www.rabbitmq.com/configure.html
https://www.cnblogs.com/lori/p/7852534.html
https://blog.csdn.net/shaoyunzhe/article/details/96461703
https://blog.csdn.net/qq_25221835/article/details/104008784
https://blog.csdn.net/maihilton/article/details/80928661