生产端消息确认
tx机制
tx机制叫做事务机制,RabbitMQ中有三个与tx机制的方法:
txSelect()
、txCommit()
、txRollback()
channel.txSelect()
用于将当前channel
设置成transaction
模式channel.txCommit()
提交事务channel.txRollback()
回滚事务
使用tx机制,首先要通过txSelect方法开启事务,然后发布消息给broker服务器,如果txCommit提交成功,则说明消息成功被broker接收;如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候可以捕获异常,通过
txRollback
回滚事务
// 创建名称为hello的队列
channel.QueueDeclare("hello", false, false, false, null);
for (int i = 0; i < 5; i++)
{
// 构建消息数据包
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
try
{
// 开启事务机制
channel.TxSelect();
// 消息发送
channel.BasicPublish(
exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
// 事务提交
channel.TxCommit();
}
catch (Exception ex)
{
channel.TxRollback();
Assert.Fail(ex.Message);
}
}
Confirm模式
C#的RabbitMQ API中,有三个与Confirm相关的方法:
ConfirmSelect()
、WaitForConfirms()
、WaitForConfirmOrDie
channel.ConfirmSelect()
表示开启Confirm模式channel.WaitForConfirms()
等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false
channel.WaitForConfirmsOrDie()
和WaitForConfirms
作用类型,也是等待所有消息确认。区别在于该方法没有返回值(Void
),如果有任意一条消息没有被成功接收,该方法会立即抛出OperationInterrupedException
类型异常
channel.QueueDeclare(
queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "hello world";
byte[] messageBody = Encoding.UTF8.GetBytes(message);
// 开启Confirm模式
channel.ConfirmSelect();
// 消息发送
channel.BasicPublish(
exchange: "",
routingKey: "hello",
basicProperties: null,
body: messageBody);
// WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
if (channel.WaitForConfirms())
{
Console.WriteLine($"Message发送成功");
}
else
{
Assert.Fail();
}
消费端消息确认
自动确认
当RabbbitMQ将消息发送给消费者后,消费者接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message =
Encoding.UTF8.GetString(ea.Body.ToArray());
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
可能存在的问题
- 丢失数据:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了
- 只要队列不空,RabbitMQ会源源不断的把消息推送给客户端,而不管客户端能否消费的完,如果其中一个消费端消费的较慢,会极大的浪费性能
手动确认(BasicAck)
消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈。
Resume
方法的参数autoAck
设置为false
,然后在消费端使用代码channel.BasicAck()
/BasicReject()
等方法来确认和拒绝消息即可实现手动确认
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message =
Encoding.wUTF8.GetString(ea.Body.ToArray());
// 手动ack
channel.BasicAck(
deliveryTag: ea.DeliveryTag,
multiple: false);
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
改为手动确认方式只需改两处
- 开启监听时将
autoAck
参数改为false
- 消息消费成功后返回确认
这段代码中,先处理消息,完成后,再做ack
响应,失败就不做ack
响应,这样消息会储存在MQ
的Unacked
消息里,不会丢失,看起来没啥问题,但是如果其中一条消息在处理时抛出了异常,将导致后续所有消息都会无法消费
消息拒绝
BasicNack()
与
BasicReject()
不同的是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息
EventingBasicConsumer consumer =
new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message =
Encoding.UTF8.GetString(ea.Body.ToArray());
try
{
/* 消费到某条消息时出错
* 导致Broker无法拿到正常回执信息引发后续消息都无法被正常消费
* 如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会丢弃,直至客户端断开重连时,才变回ready
* 如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态
* Unacked消息多了,占用内存越来越大,就会异常
*/
MessageConsumer(ea);
channel.BasicAck(
deliveryTag: ea.DeliveryTag,
multiple: false);
}
catch (Exception ex)
{
// 出错了,发nack,并通知MQ把消息塞回的队列头部(不是尾部)
channel.BasicNack(
deliveryTag: ea.DeliveryTag,
multiple: false,
requeue: true);
}
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
这里将代码调整为消费正常就ack
,不正常就nack
,并等下一次重新消费。看起来没问题,但是如果某条消息在消费时又抛出异常,该消息将会被Nack
机制重新扔回队列头部,下一步又消费这条会出异常的消息,又出错,塞回队列……进入死循环,所以要谨慎使用Nack
机制。这里可以在catch
中记录错误日志依旧使用ack
确认消费
BasicReject()
消费端告诉服务器这个消息拒绝接收,不处理,可以设置是否放回到队列中还是丢掉(只能一次拒绝一个消息)
MessagePublisher("hello", $"1");
MessagePublisher("hello", $"2");
MessagePublisher("hello", $"3");
channel.QueueDeclare(
queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer =
new EventingBasicConsumer(channel);
channel.BasicQos(0, 1, false);
consumer.Received += (model, ea) =>
{
string message =
Encoding.UTF8.GetString(ea.Body.ToArray());
if (message == "2")
{
Console.WriteLine($"Message:{message}");
channel.BasicAck(
deliveryTag: ea.DeliveryTag,
multiple: false);
}
else
{
Console.WriteLine($"拒绝处理");
/* BasicReject用于拒绝消息
requeue参数指定了拒绝后是否重新放回queue
一次只能拒绝一条消息
设置为true: 消息会被重新仍回queue中
设置为false:消息将被丢弃
*/
channel.BasicReject(
deliveryTag: ea.DeliveryTag,
requeue: true);
}
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
BasicRecover()
路由不成功的消息可以使用
recovery
重新发送到队列中,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己
消息持久化/优先级
Persistent
参数 | 重启RabbitMQ |
---|---|
exchange.durable=fasle/queue.durable=false | exchange/queue将会被丢弃 |
exchange.durable=fasle | exchange将会被丢弃 |
queue.durable=fasle | queue将会被丢弃 |
exchange.durable=fasle/queue.durable=true | exchange将会被丢弃,queue虽然会存在,但队列内消息会全部丢失 |
exchange.durable=true/queue.durable=true | exchange/queue会存在,但队列内消息会全部丢失 |
exchange.durable=true&&queue.durable=true/消息发布时(persistent=true) | 消息真正的持久化 |
for (int i = 0; i < 100; i++)
{
byte[] messageBody = Encoding.UTF8.GetBytes(i.ToString());
// 设置消息持久化
var props = channel.CreateBasicProperties();
props.Persistent = true;
// 消息发送
channel.BasicPublish(
exchange: "TestExchange",
routingKey: "",
basicProperties: props,
body: messageBody);
}
Priority
queue是先进先出的,即先发送的消息,先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的
x-max-priority
属性,然后在publish
消息时,设置消息的优先级等级即可
生产者
// 声明交换机exchang
channel.ExchangeDeclare(exchange: "myexchange",
type: ExchangeType.Fanout,
durable: true,
autoDelete: false,
arguments: null);
// 声明队列queue
channel.QueueDeclare(queue: "myqueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>() {
//队列优先级最高为10,不加x-max-priority的话,消息发布时设置了消息的优先级也不会生效
{"x-max-priority",10 }
});
// 绑定exchange和queue
channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");
Console.WriteLine("生产者准备就绪....");
// 测试数据
string[] msgs = { "vip1", "hello1", "hello2", "hello3", "vip5" };
// 设置消息优先级
var props = channel.CreateBasicProperties();
foreach (string msg in msgs)
{
// vip开头的消息,优先级设置为9
if (msg.StartsWith("vip"))
{
props.Priority = 9;
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: props,
body: Encoding.UTF8.GetBytes(msg));
}
// 其他消息优先级为1
else
{
props.Priority = 1;
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: props,
body: Encoding.UTF8.GetBytes(msg));
}
消费端
EventingBasicConsumer consumer =
new EventingBasicConsumer(channel);
// 绑定消息接收后的事件委托
consumer.Received += (model, ea) =>
{
string message =
Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"Message:{message}");
Assert.IsNotNull(message);
};
channel.BasicConsume(
queue: "myqueue",
autoAck: true,
consumer: consumer);