1. RabbitMQ
1.1. 概念
1.1.1. AMQP协议
高级消息队列协议(Advanced Message Queuing Protocol),是一种二进制应用层协议,用于应对广泛的面向消息应用程序的支持。协议提供了消息流控制,保证的一个消息对象的传递过程,如至多一次、保证多次、仅有一次等,和基于SASL和TLS的身份验证和消息加密。实现AMQP协议的中间件有: RabbitMQ,ActiveMQ,Qpid,JORAM等。
1.1.2. RabbitMQ相关概念
- Broker 消息传输服务,等同于RabbitMQ Server本身
- Virtual Host 虚拟主机,用于资源隔离,不同的虚拟主机下的资源相互隔离
- Queue 队列,消息存储器
- Exchange 交换机,接收客户端发来的消息,并根据路由方式将消息投递到队列中。交换机存在不同类型。
- Message Routing Key 消息在Exchange中路由规则的索引,用于指定当前消息采用的路由规则
Bind Routing Key 指定消息在Exchange与Queue绑定关系的Routing Key,也被称为Binding Key
1.1.3. RabbitMQ使用场景
项目解耦 在微服务场景中非常常见,一个大的单体服务拆成若干小服务,不同服务之间进行数据交换可以通过RabbitMQ
- 流量削峰 在突如其来的大并发场景中(常见于商品秒杀),使用RabbitMQ进行缓存,避免流量直接打垮后端服务
异步处理 当前服务将任务分发给其它服务,由其它服务处理后上报结果到MQ后被消费
1.1.4. 安装
个人学习环境可以直接启动一个 RabbitMQ 容器,仓库
- 生产环境中需要安装一个RabbitMQ 集群,并配置高可用策略,运维指南:集群安装,镜像队列,Metrics,备份与恢复,AnsibleRole
测试环境中可以使用Chart 按照RabbitMQ集群,安装方式:Chart
1.2. RabbitMQ入门操作
创建 vhost (一般不使用默认的 / )
- 创建 Queue
- 创建 Exchange
- 绑定 Exchange 与 Queue
- 发送消息
- 接收消息
1.2.1. 资源创建
1.2.1.1. 创建vhost与queue
创建四个队列方便演示(qn):
1.2.1.2. 创建交换机
这里创建四种不同类型的交换机(type-test),方便演示
1.2.2. 测试交换机
RabbitMQ中,生产者是没有办法将消息直接发送到目标队列的,甚至都不知道目标队列名称,也不知道会被哪些服务消费。
生产者只需要正确投递消息给broker的交换机,然后RabbitMQ内部进行消息路由,分发消息到目标队列,消费者从目标队列获取消息即可!
默认交换机:当发布消息时,Exachage为空,将采用默认交换机,该交换机自动为direct类型。创建队列时,会以队列名称为key,自动绑定到默认交换机上!
1.2.2.1. 测试fanout类型交换机
这类交换机在被称为广播模式,即所有绑定到该交换机上的queue都会收到消息,如多个服务同时监听某个应用上报的信息,那么采用广播比较合适,性能也很优异。
消费队列中的消息,q1到q4全部消费
1.2.2.2. 测试direct类型
指定routing key绑定不同的队列,这种在生产环境中是最常见的!
1.2.2.3. 测试topic类型
topic 交换机就是在direct基础上增加了模糊匹配功能,性能相对于derict有所下降,但是模式匹配功能增强了扩展能力。
其中*
表示一个单词,#
表示零个或者多个单词。通常topic使用具体单词+.
+通配符,如k8s.*.*
#.ddn.com
1.2.2.4. 测试header类型
这种类型根据发送消息的 Headers 来进行消息分发,性能较差,使用的较少。此处不再演示!1.3. golang操作RabbitMQ
1.3.1. 入门
当前包中,queue声明、exchange声明、queue和exchage绑定都是幂等操作,即重复执行不会报错。生产环境中,可能不允许程序申请这些信息,程序要使用的queue、exchange、Key 很可能是通过环境变量或者配置中心注入的。 ```go package main
import ( “github.com/streadway/amqp” “go_learn/logger” “time” )
func main() { // 打开连接 conn, err := amqp.Dial(“amqp://admin:123456@127.0.0.1:5672/golang”) if err != nil { logger.Errorf(“dail to mq server failed, err:%s”) return } defer func() { _= conn.Close()}()
// 声明通道, 一个TCP连接可以申请多个通道,不同的协程可以使用不同的通道来增强性能
channel, err := conn.Channel()
if err != nil {
logger.Errorf("open channel failed, err:%s")
return
}
defer func() { _= channel.Close()}()
// 队列声明
/* (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
name: 声明的队列名称,如果不填为返回一个随机名称
durable: 是否持久化队列,这里针对的时队列信息,而不是消息。消息的持久化是需要在发布消息时指定
autoDelete: 自动删除,当没有消费者时,该队列会被删除。并不是立刻被删除
exclusive: 独享队列,当连接端开就会被删除,并且只能由当前连接使用
noWait: 如果这个有满足该队列条件的,或者其它连接在修改当前的队列则立刻返回错误
*/
queue, err := channel.QueueDeclare("queue-1", true, false, false, false, nil)
if err != nil {
logger.Errorf("declare queue failed, err:%s")
return
}
// 交换机声明
if channel.ExchangeDeclare("direct-test", "direct", true, false, false, false, nil) != nil {
logger.Errorf("declare exchange failed, err:%s")
return
}
// queue与交换机绑定
/* (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
name: queue名称
key: routine key
exchange: exchange名称
*/
if channel.QueueBind(queue.Name, "key-1", "direct-test", false, nil) != nil {
logger.Errorf("bind queue to exchange failed, err:%s")
return
}
go func() {
for {
// 定义消息
/*
Headers: header
ContentType: MIME 类型
DeliveryMode: 是否持久化,2表示持久化,0或者1表示非持久化
Body: 消息体
*/
msg := amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Body: []byte("Hello world,"+ time.Now().Format("2006-01-02 15:04:05")),
}
// 发布消息
/* (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing)
exchange: 交换机名称,空表示默认交换机
key: 消息发布时指定的 key
mandatory: 为true时,如果当前消息无法路由到任何queue中,消息则返回给发布者
immediate: 为true时,如果当消息路由的目标queued都没有消费者,则返回消息给发布者
*/
err = channel.Publish("direct-test", "key-1", false,false, msg)
if err != nil {
logger.Errorf("publish message failed, err:%s")
return
}
time.Sleep(time.Second)
}
}()
go func() {
// 定义消费者
/* (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
queue: 队列名称
consumer: 消费者名称,为空则自动指定名称
autoAck: 是否自动确认消息
exclusive: 独享当前队列,其它消费者不能消费
noLocal: rabbitmq不支持
noWait: 要求立即消费,如果为true,且队列为空,会导致报错
*/
deliveryChan, err := channel.Consume(queue.Name, "", false, false, false, false, nil)
if err != nil {
logger.Errorf("create consumer failed, err:%s")
return
}
for delivery := range deliveryChan {
logger.Infof("msg:%s", string(delivery.Body))
_ = delivery.Ack(false) // true 表示确认所有未ack的消息,false表示仅确认当前这一条消息
}
}()
select {}
}
```
[root@duduniao easy]# go run main.go
2021-01-17 16:53:59.739|msg:Hello world,2021-01-17 16:53:59
2021-01-17 16:54:00.74|msg:Hello world,2021-01-17 16:54:00
2021-01-17 16:54:01.74|msg:Hello world,2021-01-17 16:54:01
2021-01-17 16:54:02.741|msg:Hello world,2021-01-17 16:54:02