前置条件
本教程假设RabbitMQ已经安装在你本机的 (5672)端口。如果你使用了不同的主机、端口或者凭证,连接设置就需要作出一些对应的调整。
如何获得帮助
如果你在使用本教程的过程中遇到了麻烦,你可以通过邮件列表来联系我们。
路由
(使用.NET客户端)
上个教程中,我们建立了一个简单的日志系统。我们可以将日志消息广播给多个接收者。
这个教程中我们会添加一个新功能——让它可以从日志消息中只订阅一个子集。例如,我们只将关键的错误消息定向到日志文件中(从而节省磁盘空间),同时仍旧可以将所有的日志消息打印到控制台上。
绑定
上个例子中,我们已经创建了绑定。代码如下:
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");
一个绑定就是交换机和队列之间的一个关系。可以解读为:目标队列对此交换机的消息感兴趣。
绑定可以使用一个格外的routingKey参数。为了避免跟BasicPublish参数混淆,我们称其为绑定键(binding key)。以下是如何创建一个绑定键:
channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: "black");
绑定键的实际意义依赖于交换机的类型。对于我们之前使用的扇形交换机来说,会简单的将其值忽略掉。
直连型交换机
上个教程中,我们的日志系统将所有的消息广播给所有的消费者。我们打算对其进行扩展以根据它们的严重性来进行过滤。例如,我们想要将日志写到磁盘上的脚本只接收关键性的错误,而不在警告信息和普通日志消息上浪费磁盘空间。
我们之前使用的扇形交换机不能提供足够的灵活性——它只能进行无意识的广播。
下面我们使用直连型交换机进行替代。直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。
为了说明这点,可以考虑如下设置:

这种配置下,我们可以看到有两个队列绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。
在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。
多个绑定

使用相同的绑定键来绑定多个队列是完全合法的。在我们的例子中,我们可以使用黑色(black)绑定键来绑定X和Q1。那种情况下,直连型交换机的行为就会跟扇形交换机类似,会将消息广播给所有匹配的队列。一个拥有黑色(black)路由键的消息会被头送给Q1和Q2两个队列。
使用相同的绑定键来绑定多个队列是完全合法的。在我们的例子中,我们可以使用黑色(black)绑定键来绑定X和Q1。那种情况下,直连型交换机(direct)的行为就会跟扇形交换机(fanout)类似,会将消息广播给所有匹配的队列。一个拥有黑色(black)路由键的消息会被头送给Q1和Q2两个队列。
发送日志
我们将会在我们日志系统中采用这种模式,将消息发送给直连交换机来替代扇形交换机。我们会提供日志的严重等级来作为路由键的值。通过这种方式脚本就可以选择其需要的严重等级来进行接收。首先让我们将关注点放到发送日志上:
像往常一样,首先我们需要创建一个交换机:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
然后,做好发送消息的准备:
var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);
为了保持简洁,我们假设严重等级只可以是’info’, ‘warning’, ‘error’其中一种。
订阅
除了我们会为每个我们感兴趣的严重等级创建一个新的绑定键之外,接收消息的工作方式跟前一个教程中几乎一样。
var queueName = channel.QueueDeclare().QueueName;foreach(var severity in args){channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);}
整合到一起

EmitLogDirect.cs类的代码:
using System;using System.Linq;using RabbitMQ.Client;using System.Text;class EmitLogDirect{public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "direct_logs",type: "direct");var severity = (args.Length > 0) ? args[0] : "info";var message = (args.Length > 1)? string.Join(" ", args.Skip( 1 ).ToArray()): "Hello World!";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
ReceiveLogsDirect.cs的代码:
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;class ReceiveLogsDirect{public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "direct_logs",type: "direct");var queueName = channel.QueueDeclare().QueueName;if(args.Length < 1){Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",Environment.GetCommandLineArgs()[0]);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();Environment.ExitCode = 1;return;}foreach(var severity in args){channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);}Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;Console.WriteLine(" [x] Received '{0}':'{1}'",routingKey, message);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}
跟往常一样创建项目(参见 教程一 )
如果你只希望将’warning’ 和 ‘error’ (不包括 ‘info’) 的日志信息保存到文件中,只需要打开一个控制台,输入:
cd ReceiveLogsDirectdotnet run warning error > logs_from_rabbit.log
如果你希望将所有的日志信息显示在屏幕上,新开一个终端,做如下操作:
cd ReceiveLogsDirectdotnet run info warning error# => [*] Waiting for logs. To exit press CTRL+C
例如,如果你想发送一条error的日志信息,只需要输入:
cd EmitLogDirectdotnet run error "Run. Run. Or it will explode."# => [x] Sent 'error':'Run. Run. Or it will explode.'
(完整的 (EmitLogDirect.cs 源代码) 和 (ReceiveLogsDirect.cs 源代码))
想要了解如何基于一种模式来监听消息,可以移步至 教程 5 。
