消费模式
EventingBasicConsumer
EventingBasicConsumer
是发布/订阅模式的消费者,即只要订阅的queue
中有了新消息,Broker就会立即把消息推送给消费者,这种模式可以保证消息及时地被消费者接收到。EventingBasicConsumer
是长连接的,只需要创建一个Connection
,然后在Connection
的基础上创建通道channel
,消息的发送都是通过channel
来执行的,这样可以减少Connection
的创建,比较节省资源。之前一直使用的就是EventingBasicConsumer
不再赘述
BasicGet
EventingBasicConsumer
可以让消费者最及时地获取到消息,使用EventingBasicConsumer
模式时消费者在被动的接收消息,即消息是推送过来的,Broker
是主动的一方,如果想让消费者作为主动的一方什么时候想要消息了,就自己发送一个请求去找Broker
可以使用Get
方式。Get
方式是短连接的,消费者每次想要消息的时候,首先建立一个connection
,发送一次请求,Broker
接收到请求后,响应一条消息给消费者,然后断开连接。RabbitMQ中Get方式和HTTP的请求响应流程基本一样,Get方式的实时性比较差,也比较耗费资源
BasicGetResult result = channel.BasicGet(queue: "test", autoAck: true);
Assert.IsNotNull(result.Body.ToArray());
Console.WriteLine($"接收到消息{Encoding.UTF8.GetString(result.Body.ToArray())}");
// 打印exchange和routingKey
Console.WriteLine($"exchange:{result.Exchange},routingKey:{result.RoutingKey}");
QueueBaicConsumer
用法和Get方式类似,QueueBaicConsumer在官方API中标记已过时,不再介绍
BasicQos
- 当消息有十万,百万条时,一股脑把消息发送给消费者,可能会造成消费者内存爆满
- 当消息处理比较慢的时,单一的消费者处理这些消息可能很长时间,自然想到再添加一个消费者加快消息的处理速度,但是这些消息都被原来的消费者接收了,状态为
Unacked
,所以这些消息不会再发送给新添加的消费者
RabbitMQ提供的Qos(服务质量)
可以完美解决上边的问题,使用Qos
时,Broke
不会再把消息全部发送给消费者,可以设置每次传输给消费者的消息条数n
,消费者把这n条消息处理完成后,再获取n条数据进行处理,这样就不用担心消息丢失、服务端内存爆满的问题,因为没有发送的消息状态都是Ready
,所以新增一个消费者时,消息也可以立即发送给新增的消费者
使用Qos
的方式十分简单,在消费端调用channel.BasicQos()
方法即可
channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);