消息队列是一种进程对进程的通信方式。用来解决应用解耦,异步信息,流量削峰等问题。
- 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合
- 异步:主业务执行结束后从属业务通过MQ异步执行,减低业务的响应时间,提高用户体验。
- 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
生产者将消息放入到队列里,然后由消费者去消费。消费者可以到指定队列拉取信息,或订阅相应的队列,由MQ服务端给其推送消息。
AMQP
一个提供统一信息服务的应用层标准高级消息队列协议。基于此协议的客户端和消息中间件可传递信息,并不受客户端/中间件不同产品、不同开发语言等条件的限制。
工作过程
AMQP工作过程:
- 发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
- 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
- 最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取
注意:
- 发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。
- 发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
- 从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。
- 在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
rabbitmq消息发布过程
ConnectionFactory
、Connection
、Channel
都是RabbitMQ对外提供的API中最基本的对象。Connection
是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory
为Connection
的制造工厂。Channel
是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel
这个接口中完成的, 包括定义Queue
、定义Exchange
、绑定Queue
与Exchange
、发布消息等。
- 发送消息:
- 建立TCP连接connection
- 建立通道 Channel
- 发送消息给Broker
- Exchange将消息转发给指定队列Queue
- 接收消息
- 消费者和Broker进行TCP连接 connection
- 消费者和Broker建立通道 Channel
- 消费者监听指定的Queue
- 当有消息到达Queue时,Broker默认将消息推送给消费者
- 消费者接收到消息
消息组件
队列(Queue)
用于存储消息。RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。
为了避免消费者收到Queue中的消息,但没有处理完成就宕机的情况,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ, RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable)
我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数
声明
- name : 队列名称
- durable : 消息是否持久化
- auto-deleted : 队列接受完消息是否自动删除
- exclusive : 是否是独占队列(独占队列只能由声明它们的连接访问,并且将在连接关闭时删除。)
- no-wait : 如果为true,队列默认认为已存在Exchange,连接不上会报错
- argument : 队列的其他选项
ch.QueueDeclare(
r.QueueName,
false, //是否持久化
false, //是否为自动删除
false, //是否具有排他性
false, //是否阻塞
nil, //额外属性
)
消费
- name : 队列名
- consumer : 消费者名称
- autoAck : 自动应答
- exclusive : 排他性
- noLocal : 不允许本地消费(使用了同一个connection)
- nowait : 是否阻塞
args : 其他参数
ch.Consume(
q.Name,
"", //用来区分多个消费者
true, //是否自动应答,告诉我已经消费完了
false,
false, //若设置为true,则表示为不能将同一个connection中发送的消息传递给这个connection中的消费者.
false, //消费队列是否设计阻塞
nil,
)
交换机(Exchange)
生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则, 而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的), 我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。 RabbitMQ为routing key设定的长度限制为255 bytes。声明
name : exchange的名称
- type : exchange的类型
- durable: 消息是否持久化
- auto-deleted :也就是当分发器关联的所有Queue都删除以后,”分发器”也自动删除。
- internal : 是否为rabbitmq内部使用
- no-wait : 如果设置为
false
, 则不期望RabbitMQ
服务器有一个Exchange.DeclareOk
这样响应。 - argument : 分发器的其他选项
// demo
ch.ExchangeDeclare(
"eventti.event", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
);
发布
ch.Publish(
r.ExChange, // 交换机名称
"", // 路由键
false, // 消息发送成功确认(没有队列会异常)
false, // 消息发送失败回调(队列中没有消费者会异常)
amqp.Publishing{ // 发送的消息
ContentType: "text/plain",
Body: []byte(message),
})
绑定(Binding)
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key; 消费者将消息发送给Exchange时,一般会指定一个routing key; 当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
信息交换类型(Exchange Types)
Exchange Type有fanout
、direct
、topic
、headers
这四种
fanout
全输出类型,fanout类型的Exchange会把所有消息发送到与它绑定的所有Queue中。
direct
direct类型会处理路由键(routing key),它会把消息路由到那些绑定键(binding key)
与路由键(routing key)
完全匹配的Queue中。
topic
topic类型在direct类型的匹配规则上有约束:
routing key
为一个句点号“. ”分隔的字符串binding key
与routing key
一样也是句点号“. ”分隔的字符串binding key
中可以存在两种特殊字符*
与_#_
,用于做模糊匹配,其中_*_
用于匹配一个单词,#
用于匹配多个单词(可以是零个) 实例:quick.orange.rabbit
会匹配到*.orange.*
和*.*.rabbit
上headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。Rabbitmq队列模式
简单队列模式
一个生产者 P 发送消息到队列 Q,一个消费者 C 接收
简单队列也称为点对点,即一个生产者对应一个消费者,生产者发送消息到队列,消费者在队列中取出消息消费。
golang代码实现
- 定义结构体 ```go package RabbitMq
import ( “fmt” “github.com/streadway/amqp” )
// 这里主要是RabbitMQ的一些信息。包括其结构体和函数。
// 连接信息 const MQURL = “amqp://du:du@129.211.78.6:5672/dudevirtualhost”
// RabbitMQ结构体 type RabbitMQ struct { //连接 conn amqp.Connection channel amqp.Channel //队列 QueueName string //交换机名称 ExChange string //绑定的key名称 Key string //连接的信息,上面已经定义好了 MqUrl string }
// 创建结构体实例,参数队列名称、交换机名称和bind的key(也就是几个大写的,除去定义好的常量信息) func NewRabbitMQ(queueName string, exChange string, key string) *RabbitMQ { return &RabbitMQ{QueueName: queueName, ExChange: exChange, Key: key, MqUrl: MQURL} }
// 关闭conn和chanel的方法 func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() }
// 错误的函数处理 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { fmt.Printf(“err是:%s,小杜同学手写的信息是:%s”, err, message) } }
2. 实现
```go
package RabbitMq
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//创建简单模式下的实例,只需要queueName这个参数,其中exchange是默认的,key则不需要。
func NewRabbitMQSimple(queueName string) *RabbitMQ {
rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
//获取参数connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
rabbitmq.failOnErr(err, "连接connection失败")
//获取channel参数
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel参数失败")
return rabbitmq
}
//直接模式,生产者.
func (r *RabbitMQ) PublishSimple(message string) {
//第一步,申请队列,如不存在,则自动创建之,存在,则路过。
_, err := r.channel.QueueDeclare(
r.QueueName,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
fmt.Printf("创建连接队列失败:%s", err)
}
//第二步,发送消息到队列中
r.channel.Publish(
r.ExChange,
r.QueueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//直接模式,消费者
func (r *RabbitMQ) ConsumeSimple() {
//第一步,申请队列,如果队列不存在则自动创建,存在则跳过
q, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否自动删除
false,
//是否具有排他性
false,
//是否阻塞处理
false,
//额外的属性
nil,
)
if err != nil {
fmt.Println(err)
}
//第二步,接收消息
msgs, err := r.channel.Consume(
q.Name,
"", //用来区分多个消费者
true, //是否自动应答,告诉我已经消费完了
false,
false, //若设置为true,则表示为不能将同一个connection中发送的消息传递给这个connection中的消费者.
false, //消费队列是否设计阻塞
nil,
)
if err != nil {
fmt.Printf("消费者接收消息出现问题:%s", err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range msgs {
log.Printf("小杜同学写的Simple模式接收到了消息:%s\n", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
- 发布者发布消息 ```go package main
import ( “fmt” “rabbitmq20181121/RabbitMq” )
func main() { rabbitmq := RabbitMq.NewRabbitMQSimple(“duQueueName1912161843”) rabbitmq.PublishSimple(“他是客,你是心上人。 —-来自simple模式”) fmt.Println(“发送成功!”) }
4. 消费者消费消息
```go
package main
import (
"fmt"
"rabbitmq20181121/RabbitMq"
)
func main() {
rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName1912161843")
rabbitmq.ConsumeSimple()
fmt.Println("接收成功!")
}
工作模式
用来将耗时的任务分发给多个消费者
主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。
工作队列也称为公平性队列模式,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。
工作模式和简单模式比起来,只不过多了多个消费者,相当于起到了负载均衡的作用
golang代码实现
- 发布者 ```go package main
import ( “fmt” “rabbitmq20181121/RabbitMq” “strconv” “time” )
func main() { rabbitmq := RabbitMq.NewRabbitMQSimple(“duQueueName191224”) for i := 0; i < 100; i++ { rabbitmq.PublishSimple(“hello du message” + strconv.Itoa(i) + “—-来自work模式”) time.Sleep(1 * time.Second) fmt.Printf(“work模式,共产生了%d条消息\n”, i) } }
2. 消费者(建立两个)
```go
package main
import "rabbitmq20181121/RabbitMq"
func main() {
rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName191224")
rabbitmq.ConsumeSimple()
}
简单队列模式和工作模式都是直接在生产者和消费者里声明好一个队列。这种情况下消息只会应对同类型的消费。
发布/订阅模式
当一个事务涉及处理不同的业务逻辑时,就需要使用到发布/订阅模式。
发布/订阅模式将消息发送到交换机(Exchange),声明两个不同的队列,并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发送给对应的消费者。做到发布一次,消费多个。
在应用中,只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
发布/订阅模式下会使用到交换机,交换机使用fanout
模式,消费者以独占队列方式连接(只能由使用相同connection的生产者来调用,并且使用完即删除队列)。
相对于工作模式,发布订阅模式引入了交换机的概念,相对其类型上更加灵活广泛一些。
- 生产者不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列。从不加特定参数的运行结果中可以看到,两种类型的消费者都收到相同数量的信息。
- 必须声明交换机,并且设置交换机类型为
fanout
模式(将每一条消息都发送到与交换机绑定的队列) -
golang代码实现
发布订阅模式代码 ```go package RabbitMq
import ( “fmt” “github.com/streadway/amqp” )
//这里是订阅模式的相关代码。 //订阅模式需要用到exchange。
//获取订阅模式下的rabbitmq的实例 func NewRabbitMqSubscription(exchangeName string) *RabbitMQ { //创建rabbitmq实例 rabbitmq := NewRabbitMQ(“”, exchangeName, “”) var err error //获取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl) rabbitmq.failOnErr(err, “订阅模式连接rabbitmq失败。”) //获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, “订阅模式获取channel失败”) return rabbitmq }
//订阅模式发布消息 func (r *RabbitMQ) PublishSubscription(message string) { //第一步,尝试连接交换机 err := r.channel.ExchangeDeclare( r.ExChange, “fanout”, //这里一定要设计为”fanout”也就是广播类型。 true, false, false, false, nil, ) r.failOnErr(err, “订阅模式发布方法中尝试连接交换机失败。”)
//第二步,发送消息
err = r.channel.Publish(
r.ExChange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//订阅模式消费者 func (r *RabbitMQ) ConsumeSbuscription() { //第一步,试探性创建交换机exchange err := r.channel.ExchangeDeclare( r.ExChange, “fanout”, true, false, false, false, nil, ) r.failOnErr(err, “订阅模式消费方法中创建交换机失败。”)
//第二步,试探性创建队列queue
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "订阅模式消费方法中创建创建队列失败。")
//第三步,绑定队列到交换机中
err = r.channel.QueueBind(
q.Name,
"", //在pub/sub模式下key要为空
r.ExChange,
false,
nil,
)
//第四步,消费消息
messages, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range messages {
fmt.Printf("小杜同学写的订阅模式收到的消息:%s\n", d.Body)
}
}()
fmt.Println("订阅模式消费者已开启,退出请按 CTRL+C\n")
<-forever
}
2. 发布者的代码
```go
package main
import (
"fmt"
"rabbitmq20181121/RabbitMq"
"strconv"
"time"
)
func main() {
rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
for i := 0; i < 100; i++ {
rabbitmq.PublishSubscription("订阅模式生产第" + strconv.Itoa(i) + "条数据")
fmt.Printf("订阅模式生产第" + strconv.Itoa(i) + "条数据\n")
time.Sleep(1 * time.Second)
}
}
- 订阅者(两个)的代码 ```go package main
import “rabbitmq20181121/RabbitMq”
func main() { rabbitmq := RabbitMq.NewRabbitMqSubscription(“duexchangeName”) rabbitmq.ConsumeSbuscription() }
<a name="06gBW"></a>
## 路由模式
路由模式跟发布订阅模式类似,然后在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列
![](https://cdn.nlark.com/yuque/0/2020/png/2337686/1598408779235-f1a5df69-8695-4a6a-9bd9-0dda9a037a74.png#align=left&display=inline&height=234&margin=%5Bobject%20Object%5D&originHeight=234&originWidth=633&size=0&status=done&style=none&width=633)<br />在路由模式它会把消息路由到那些 `binding key` 与 `routing ke`y 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的 direct 模式。
在路由模式下,与发布/订阅模式相比,路由模式的交换机类型修改为`direct`,消费者可以根据路由键的不同来选择信息
总结:
1. 两个队列消费者设置的路由不一样,接收到的消息就不一样。路由模式下,决定消息向队列推送的主要取决于`路由键`,而不是交换机了。
1. 该模式必须设置交换机,并声明路由模式为`direct`
<a name="F1obc"></a>
### golang代码实现
1. 实现路由模式
```go
package main
import "rabbitmq20181121/RabbitMq"
func main() {
rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
rabbitmq.ConsumeSbuscription()
}1 package RabbitMq
import (
"github.com/streadway/amqp"
"log"
)
//rabbitmq的路由模式。
//主要特点不仅一个消息可以被多个消费者消费还可以由生产端指定消费者。
//这里相对比订阅模式就多了一个routingkey的设计,也是通过这个来指定消费者的。
//创建exchange的kind需要是"direct",不然就不是roting模式了。
//创建rabbitmq实例,这里有了routingkey为参数了。
func NewRabbitMqRouting(exchangeName string, routingKey string) *RabbitMQ {
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
rabbitmq.failOnErr(err, "创建rabbit的路由实例的时候连接出现问题")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "创建rabbitmq的路由实例时获取channel出错")
return rabbitmq
}
//路由模式,产生消息。
func (r *RabbitMQ) PublishRouting(message string) {
//第一步,尝试创建交换机,与pub/sub模式不同的是这里的kind需要是direct
err := r.channel.ExchangeDeclare(r.ExChange, "direct", true, false, false, false, nil)
r.failOnErr(err, "路由模式,尝试创建交换机失败")
//第二步,发送消息
err = r.channel.Publish(
r.ExChange,
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//路由模式,消费消息。
func (r *RabbitMQ) ConsumerRouting() {
//第一步,尝试创建交换机,注意这里的交换机类型与发布订阅模式不同,这里的是direct
err := r.channel.ExchangeDeclare(
r.ExChange,
"direct",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "路由模式,创建交换机失败。")
//第二步,尝试创建队列,注意这里队列名称不用写,这样就会随机产生队列名称
q, err := r.channel.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "路由模式,创建队列失败。")
//第三步,绑定队列到exchange中
err = r.channel.QueueBind(q.Name, r.Key, r.ExChange, false, nil)
//第四步,消费消息。
messages, err := r.channel.Consume(q.Name, "", true, false, false, false, nil)
forever := make(chan bool)
go func() {
for d := range messages {
log.Printf("小杜同学写的路由模式(routing模式)收到消息为:%s。\n", d.Body)
}
}()
<-forever
}
- 生产者的代码,发布消息到one,two,three三个路由键 ```go package main
import ( “fmt” “rabbitmq20181121/RabbitMq” “strconv” “time” )
func main() { rabbitmq1 := RabbitMq.NewRabbitMqRouting(“duExchangeName”, “one”) rabbitmq2 := RabbitMq.NewRabbitMqRouting(“duExchangeName”, “two”) rabbitmq3 := RabbitMq.NewRabbitMqRouting(“duExchangeName”, “three”) for i := 0; i < 100; i++ { rabbitmq1.PublishRouting(“路由模式one” + strconv.Itoa(i)) rabbitmq2.PublishRouting(“路由模式two” + strconv.Itoa(i)) rabbitmq3.PublishRouting(“路由模式three” + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Printf(“在路由模式下,routingKey为one,为two,为three的都分别生产了%d条消息\n”, i) } }
3. 消费者的代码,消费one的消息
```go
package main
import "rabbitmq20181121/RabbitMq"
func main() {
one := RabbitMq.NewRabbitMqRouting("duExchangeName", "one")
one.ConsumerRouting()
}
- 消费者的代码,消费two的消息 ```go package main
import “rabbitmq20181121/RabbitMq”
func main() { one := RabbitMq.NewRabbitMqRouting(“duExchangeName”, “two”) one.ConsumerRouting() }
<a name="KeOfI"></a>
## 主题模式
主题模式跟路由模式类似,但路由模式是指定固定的路由键routingKey,而主题模式是可以模糊匹配路由键routingKey,类似于SQL中 `=` 和`like` 的关系<br />![](https://cdn.nlark.com/yuque/0/2020/png/2337686/1598409839118-b7900a29-5dea-410b-b9b9-9e474cdbacfe.png#align=left&display=inline&height=208&margin=%5Bobject%20Object%5D&originHeight=208&originWidth=562&size=0&status=done&style=none&width=562)<br />topics 模式与 routing 模式比较相近,**topics 模式不能具有任意的 routingKey,必须由**<br />**一个英文句点号“.”分隔的字符串**(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),比如 `lazy.orange.fox`。主题模式的路由键 `routingKey `中可以存在两种特殊字符`*`_与_`_#_`_,用于做模糊匹配,其中_`_*_`用于匹配一个单词,`#`用于匹配多个单词(可以是零个)。
主题模式下,与路由模式相比,主题模式修改了交换机的类型为`topic`,生产者可以订阅一批类型的事件。
总结:
1. topic使所有消息都带有一个主题,交换机会将消息转发到与主题模糊匹配的队列上
1. 同样,如果exchange没有发现能与routeKey匹配的队列,则会抛弃此信息
<a name="Y0saS"></a>
### golang代码实现
1. 主题模式代码
```go
package RabbitMq
import (
"github.com/streadway/amqp"
"log"
)
//topic模式
//与routing模式不同的是这个exchange的kind是"topic"类型的。
//topic模式的特别是可以以通配符的形式来指定与之匹配的消费者。
//"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
//创建rabbitmq实例
func NewRabbitMqTopic(exchangeName string, routingKey string) *RabbitMQ {
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
rabbitmq.failOnErr(err, "创建rabbit的topic模式时候连接出现问题")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "创建rabbitmq的topic实例时获取channel出错")
return rabbitmq
}
//topic模式。生产者。
func (r *RabbitMQ) PublishTopic(message string) {
//第一步,尝试创建交换机,这里的kind的类型要改为topic
err := r.channel.ExchangeDeclare(
r.ExChange,
"topic",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "topic模式尝试创建exchange失败。")
//第二步,发送消息。
err = r.channel.Publish(
r.ExChange,
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//topic模式。消费者。"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
func (r *RabbitMQ) ConsumerTopic() {
//第一步,创建交换机。这里的kind需要是“topic”类型。
err := r.channel.ExchangeDeclare(
r.ExChange,
"topic",
true, //这里需要是true
false,
false,
false,
nil,
)
r.failOnErr(err, "topic模式,消费者创建exchange失败。")
//第二步,创建队列。这里不用写队列名称。
q, err := r.channel.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "topic模式,消费者创建queue失败。")
//第三步,将队列绑定到交换机里。
err = r.channel.QueueBind(
q.Name,
r.Key,
r.ExChange,
false,
nil,
)
//第四步,消费消息。
messages, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range messages {
log.Printf("小杜同学写的topic模式收到了消息:%s。\n", d.Body)
}
}()
<-forever
}
- 生产者的代码 ```go package main
import ( “fmt” “rabbitmq20181121/RabbitMq” “strconv” “time” )
func main() { one := RabbitMq.NewRabbitMqTopic(“exchangeNameTpoic1224”, “Singer.Jay”) two := RabbitMq.NewRabbitMqTopic(“exchangeNameTpoic1224”, “Persident.XIDADA”) for i := 0; i < 100; i++ { one.PublishTopic(“小杜同学,topic模式,Jay,” + strconv.Itoa(i)) two.PublishTopic(“小杜同学,topic模式,All,” + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Printf(“topic模式。这是小杜同学发布的消息%v \n”, i) } }
3. 消费者1的代码
```go
package main
import "rabbitmq20181121/RabbitMq"
func main() {
jay := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.*")
jay.ConsumerTopic()
}
- 消费者2的代码 ```go package main
import “rabbitmq20181121/RabbitMq”
func main() { jay := RabbitMq.NewRabbitMqTopic(“exchangeNameTpoic1224”, “#”) jay.ConsumerTopic() }
<a name="sHHhi"></a>
## RPC模式
使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。
但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。
![](https://cdn.nlark.com/yuque/0/2020/webp/2337686/1598410514744-b8a63cce-082f-4c3e-97e4-433d55c1c36f.webp#align=left&display=inline&height=131&margin=%5Bobject%20Object%5D&originHeight=131&originWidth=550&size=0&status=done&style=none&width=550)
将MQ当作中间件实现一次双向的消息传递。客户端和服务端即是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。
<a name="BDVOe"></a>
# rabbitmq的消息处理
<a name="cOV9I"></a>
## 过期时间TTL
过期时间TTL表示可以对消息设置预期的时间,在这个时间内事件都可以被消费者接受获取;过了之后消息将会自动被删除。RabbitMQ可以对**消息和队列**设置TTL:
- 通过队列属性设置,队列中所有消息都有相同的过期时间
- 对消息进行单独设置,每条消息TTL可以不同
消息在队列的生存时间一旦超过设置的TTL,就称为dead message,被投递到死信队列。
```go
err = ch.Publish(
"eventti.event",
"test_delay", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Expiration: "5000", // 设置五秒的过期时间
})
死信队列
DLX,称为死信交换机。当消息在一个队列中变成死信时,它可能被重新发送到另一个交换机中。
消息变成死信,可能由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,rabbitmq就会自动地将这个消息重新发送到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可
_, errDelay := ch.QueueDeclare(
"test_delay", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
amqp.Table{
// 当消息过期时把消息发送到 logs 这个 exchange
"x-dead-letter-exchange":"logs",
}, // arguments
)
延迟队列
延迟队列存储的对象是对应的延迟消息;所谓延迟消息,是指消息被发送以后并不想让消费者立刻拿到消息,而是等待特定时间之后再有消费者进行消费。
在rabbitmq中延迟队列可以通过过期时间
+死信队列
来实现
发布确认
有两种方式,消息发送成功确认
和消息发送失败回调
。
ch.Publish(
r.ExChange, // 交换机名称
"", // 路由键
false, // 消息发送成功确认(没有队列会异常)
false, // 消息发送失败回调(队列中没有消费者会异常)
amqp.Publishing{ // 发送的消息
ContentType: "text/plain",
Body: []byte(message),
})
消息追踪
消息的追踪可以通过trace
实现。Trace是rabbitmq用于记录每一次发送的消息。Trace启动后会自动创建系统Exchange,每个队列会自动绑定Exchange,绑定后发送到队列的消息都会记录到Trace日志。
$ rabbitmqctl trace_on
rabbitmq问题
消息堆积
当消息生产的速度长时间,远远大于消费的速度时,就会造成消息堆积。
影响:
- 可能导致新消息无法进入队列
- 可能导致旧消息无法丢失
- 消息等待消费的时间过长,超出了业务容忍范围
产生堆积的原因:**
- 生产者突然大量发布信息
- 消费者消费失败
- 消费者出现性能瓶颈
- 消费者挂掉
解决方法:
- 排查消费者的消费性能瓶颈
- 增加消费者的多线程处理
-
消息丢失
在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致没有消费成功,从而造成数据不一致等问题,造成严重的影响。
消息丢失的场景主要分为: 消息在生产者丢失
- 消息在rabbitmq丢失
- 消息在消费者丢失
消息在生产者丢失
消息生产者发送消息成功,但是MQ没有收到消息,消息从生产者传输到MQ的过程中丢失,一般由于网络不稳定。
消息在rabbitmq丢失
消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况
解决方案:持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。
消息在消费者丢失
消费者消费消息时,如果设置自动回复MQ,消费者端接受到消息后会自动回复,MQ会删除消息,此时消费者业务异常,则消息小时
解决方法:设置为手动回复MQ,当消费者出现异常或服务器宕机时,MQ服务器不会删除该消息,而是把消息重发给绑定该队列的消费者。
有序消费消息
工作模式多个消费者是竞争关系
解决方法:生产者根据商品id计算出一个hash值,然后再对队列的个数取余,就可以让相同id的所有操作压到一个队列,并且每一个队列都只有一个消费者,此时就不会出现乱序
简单模式下消费者采用多线程的方式来加速消息处理
消费者拉取消息然后根据id算出一个hash值,然后把同id的商品压到同一个内存队列,让同一个线程去处理
重复消费
消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,这样会导致重复消费
解决方法:
- 幂等的操作则无需处理
如果非幂等,则需要消费者端将消息id保存到服务器,每次消费前查询该id。
相关链接
- RabbitMQ六种队列模式-简单队列模式
- RabbitMQ六种队列模式-工作队列模式
- RabbitMQ六种队列模式-发布订阅模式
- RabbitMQ六种队列模式-路由模式
- RabbitMQ六种队列模式-主题模式