消费模式

EventingBasicConsumer

EventingBasicConsumer是发布/订阅模式的消费者,即只要订阅的queue中有了新消息,Broker就会立即把消息推送给消费者,这种模式可以保证消息及时地被消费者接收到。EventingBasicConsumer是长连接的,只需要创建一个Connection,然后在Connection的基础上创建通道channel,消息的发送都是通过channel来执行的,这样可以减少Connection的创建,比较节省资源。之前一直使用的就是EventingBasicConsumer不再赘述

BasicGet

EventingBasicConsumer可以让消费者最及时地获取到消息,使用EventingBasicConsumer模式时消费者在被动的接收消息,即消息是推送过来的,Broker是主动的一方,如果想让消费者作为主动的一方什么时候想要消息了,就自己发送一个请求去找Broker可以使用Get方式。Get方式是短连接的,消费者每次想要消息的时候,首先建立一个connection,发送一次请求,Broker接收到请求后,响应一条消息给消费者,然后断开连接。RabbitMQ中Get方式和HTTP的请求响应流程基本一样,Get方式的实时性比较差,也比较耗费资源

  1. BasicGetResult result = channel.BasicGet(queue: "test", autoAck: true);
  2. Assert.IsNotNull(result.Body.ToArray());
  3. Console.WriteLine($"接收到消息{Encoding.UTF8.GetString(result.Body.ToArray())}");
  4. // 打印exchange和routingKey
  5. Console.WriteLine($"exchange:{result.Exchange},routingKey:{result.RoutingKey}");

QueueBaicConsumer

用法和Get方式类似,QueueBaicConsumer在官方API中标记已过时,不再介绍

BasicQos

  • 当消息有十万,百万条时,一股脑把消息发送给消费者,可能会造成消费者内存爆满
  • 当消息处理比较慢的时,单一的消费者处理这些消息可能很长时间,自然想到再添加一个消费者加快消息的处理速度,但是这些消息都被原来的消费者接收了,状态为Unacked,所以这些消息不会再发送给新添加的消费者

RabbitMQ提供的Qos(服务质量)可以完美解决上边的问题,使用Qos时,Broke不会再把消息全部发送给消费者,可以设置每次传输给消费者的消息条数n,消费者把这n条消息处理完成后,再获取n条数据进行处理,这样就不用担心消息丢失、服务端内存爆满的问题,因为没有发送的消息状态都是Ready,所以新增一个消费者时,消息也可以立即发送给新增的消费者

使用Qos的方式十分简单,在消费端调用channel.BasicQos()方法即可

  1. channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);