https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
http://rabbitmq.mr-ping.com/tutorials_with_csharp/routing.html#%E8%AE%A2%E9%98%85
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/dotnet

直连型交换机

direct相对于fanout就属于完全匹配、单播的模式,路由机制如下图,即队列名称和消息发送时指定的路由完全匹配时,消息才会发送到指定队列上。 Exchange-Direct - 图1
我们之前使用的扇形交换机不能提供足够的灵活性——它只能进行无意识的广播。
我们的日志系统将所有的消息广播给所有的消费者。我们打算对其进行扩展以根据它们的严重性来进行过滤。例如,我们想要将日志写到磁盘上的脚本只接收关键性的错误,而不在警告信息和普通日志消息上浪费磁盘空间。
下面我们使用直连型交换机进行替代。直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。
为了说明这点,可以考虑如下设置:
Exchange-Direct - 图2
这种配置下,我们可以看到有两个队列绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。
在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。

多个绑定

Exchange-Direct - 图3
使用相同的绑定键来绑定多个队列是完全合法的。在我们的例子中,我们可以使用黑色(black)绑定键来绑定X和Q1。那种情况下,直连型交换机的行为就会跟扇形交换机类似,会将消息广播给所有匹配的队列。一个拥有黑色(black)路由键的消息会被头送给Q1和Q2两个队列。
使用相同的绑定键来绑定多个队列是完全合法的。在我们的例子中,我们可以使用黑色(black)绑定键来绑定X和Q1。那种情况下,直连型交换机(direct)的行为就会跟扇形交换机(fanout)类似,会将消息广播给所有匹配的队列。一个拥有黑色(black)路由键的消息会被头送给Q1和Q2两个队列。

公共类库

  1. Install-Package RabbitMQ.Client
  2. Install-Package Microsoft.Extensions.Configuration
  3. Install-Package Microsoft.Extensions.Configuration.Json
  1. using Microsoft.Extensions.Configuration;
  2. using RabbitMQ.Client;
  3. namespace My.RabbitMQ.Config;
  4. public class MQConnection
  5. {
  6. public static IConnection CreateConnection() {
  7. var config = new ConfigurationBuilder()
  8. .AddInMemoryCollection() //缓存
  9. .SetBasePath(Directory.GetCurrentDirectory())
  10. .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
  11. .Build();
  12. var appSetting = config.GetSection("AppSetting:RabbitMQSetting");
  13. var factory = new ConnectionFactory
  14. {
  15. HostName = appSetting.GetSection("HostName").Value,
  16. UserName = appSetting.GetSection("UserName").Value,
  17. Password = appSetting.GetSection("Password").Value,
  18. Port = 5672,
  19. //VirtualHost= "myRabbit"
  20. };
  21. return factory.CreateConnection();
  22. }
  23. }

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "RedisSetting": {
  17. "Connection": "192.168.3.40:6379,password=",
  18. "Database": "test"
  19. },
  20. "RabbitMQSetting": {
  21. "HostName": "localhost",
  22. "UserName": "admin",
  23. "Password": "admin",
  24. "Port": 5672
  25. },
  26. "MongoDbSetting": {
  27. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  28. "Database": "test"
  29. }
  30. }
  31. }

日志消息示例

日志系统示例

我们将会在我们日志系统中采用这种模式,将消息发送给直连交换机来替代扇形交换机。我们会提供日志的严重等级来作为路由键的值。通过这种方式脚本就可以选择其需要的严重等级来进行接收。首先让我们将关注点放到发送日志上:

发送日志

我们创建一个My.MQ.Exchange.Direct.Demo的控制台项目,

  1. Install-Package RabbitMQ.Client

像往常一样,首先我们需要创建一个交换机:

  1. channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

为了保持简洁,我们假设严重等级只可以是’info’, ‘warning’, ‘error’其中一种。
Exchange-Direct - 图4

  1. // 发出日志--直连型交换机
  2. using Microsoft.Extensions.Configuration;
  3. using My.RabbitMQ.Config;
  4. using RabbitMQ.Client;
  5. using System.Text;
  6. //建立连接
  7. using (var connection = MQConnection.CreateConnection())
  8. //创建信道
  9. using (var channel = connection.CreateModel())
  10. {
  11. //创建一个交换机
  12. channel.ExchangeDeclare(exchange: "direct_logs",
  13. type: "direct");
  14. //消息类型,假设严重等级只可以是'info', 'warning', 'error'其中一种
  15. var severity = (args.Length > 0) ? args[0] : "info";
  16. //构建byte消息数据包
  17. var message = (args.Length > 1)
  18. ? string.Join(" ", args.Skip(1).ToArray())
  19. : "Hello World!";
  20. var body = Encoding.UTF8.GetBytes(message);
  21. //发送数据包
  22. channel.BasicPublish(exchange: "direct_logs",
  23. routingKey: severity,
  24. basicProperties: null,
  25. body: body);
  26. Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
  27. Console.WriteLine(" Press [enter] to exit.");
  28. Console.ReadLine();
  29. }

订阅日志

我们创建一个My.MQ.Exchange.Direct.ReceiveLogs的控制台项目,

  1. Install-Package RabbitMQ.Client
  1. // 接收日志--直连型交换机
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using RabbitMQ.Client.Events;
  5. using System.Text;
  6. //建立连接
  7. using (var connection = MQConnection.CreateConnection())
  8. //创建信道
  9. using (var channel = connection.CreateModel())
  10. {
  11. channel.ExchangeDeclare(exchange: "direct_logs",type: "direct");
  12. var queueName = channel.QueueDeclare().QueueName;
  13. if (args.Length < 1)
  14. {
  15. Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
  16. Environment.GetCommandLineArgs()[0]);
  17. Console.WriteLine(" Press [enter] to exit.");
  18. Console.ReadLine();
  19. Environment.ExitCode = 1;
  20. return;
  21. }
  22. //只可以是'info', 'warning', 'error'其中一种
  23. foreach (var severity in args)
  24. {
  25. channel.QueueBind(queue: queueName,
  26. exchange: "direct_logs",
  27. routingKey: severity);
  28. }
  29. Console.WriteLine(" [*] Waiting for messages.");
  30. var consumer = new EventingBasicConsumer(channel);
  31. consumer.Received += (model, ea) =>
  32. {
  33. var body = ea.Body.ToArray();
  34. var message = Encoding.UTF8.GetString(body);
  35. var routingKey = ea.RoutingKey;
  36. Console.WriteLine(" [x] Received '{0}':'{1}'",
  37. routingKey, message);
  38. };
  39. channel.BasicConsume(queue: queueName,
  40. autoAck: true,
  41. consumer: consumer);
  42. Console.WriteLine(" Press [enter] to exit.");
  43. Console.ReadLine();
  44. }

测试

分别定位到My.MQ.Exchange.Direct.ReceiveLogs,My.MQ.Exchange.Direct.Demo目录下,使用命令行窗体打开,
My.MQ.Exchange.Direct.ReceiveLogs命令行窗体
如果你只希望将’warning’ 和 ‘error’ (不包括 ‘info’) 的日志信息保存到文件中,打开一个控制台,输入:

  1. cd F:\project\LFC.NET.Demo.Project\My.RabbitMQ.Demo\My.MQ.Exchange.Direct.Demo\

如果你希望将所有的日志信息显示在屏幕上,新开一个终端,做如下操作:

  1. dotnet run info warning error

My.MQ.Exchange.Direct.Demo命令行窗体

  1. cd F:\project\LFC.NET.Demo.Project\My.RabbitMQ.Demo\My.MQ.Exchange.Direct.ReceiveLogs

例如,如果你想发送一条error的日志信息,只需要输入:

  1. dotnet run error "Run. Run. Or it will explode."