https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html
http://rabbitmq.mr-ping.com/tutorials_with_csharp/Topics.html

主题交换机

发送到主题交换机的消息所携带的路由键(routing_key)不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词,不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。只要不超过255个字节,词的长度由你来定。
绑定键(binding key)也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键(routing key)的消息会被投送给所有绑定键(binding key)与之相匹配的队列。尽管如此,仍然有两条与绑定键相关的特殊情况:

  • * (星号) 能够替代一个单词。
  • # (井号) 能够替代零个或多个单词。

用一个例子可以很容易地解释:
Exchange-Topics - 图1
此例中,我们将会发送用来描述动物的多条消息。发送的消息包含带有三个单词(两个点号)的路由键(routing key)。路由键中第一个单词描述速度,第二个单词是颜色,第三个是品种: “<速度>.<颜色>.<品种>”。
我们创建三个绑定:Q1通过”.orange.“绑定键进行绑定,Q2使用”..rabbit” 和 “lazy.#”。
这些绑定可以总结为:

  • Q1针对所有的橘色orange动物。
  • Q2针对每一个有关兔子rabbits和慵懒lazy的动物的消息。

一个带有”quick.orange.rabbit”绑定键的消息会给两个队列都进行投送。消息”lazy.orange.elephant”也会投送给这两个队列。另外一方面,”quick.orange.fox” 只会给第一个队列。”lazy.pink.rabbit”虽然与两个绑定键都匹配,但只会给第二个队列投送一遍。”quick.brown.fox” 没有匹配到任何绑定,因此会被丢弃掉。
如果我们破坏规则,发送的消息只带有一个或者四个单词,例如 “orange” 或者 “quick.orange.male.rabbit”会发生什么呢?结果是这些消息不会匹配到任何绑定,将会被丢弃。
另一方面,“lazy.orange.male.rabbit”即使有四个单词,也会与最后一个绑定匹配,并 被投送到第二个队列。

公共类库

  1. Install-Package RabbitMQ.Client
  2. Install-Package Microsoft.Extensions.Configuration
  3. Install-Package Microsoft.Extensions.Configuration.Json
  1. using Microsoft.Extensions.Configuration;
  2. using RabbitMQ.Client;
  3. namespace My.RabbitMQ.Config;
  4. public class MQConnection
  5. {
  6. public static IConnection CreateConnection() {
  7. var config = new ConfigurationBuilder()
  8. .AddInMemoryCollection() //缓存
  9. .SetBasePath(Directory.GetCurrentDirectory())
  10. .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
  11. .Build();
  12. var appSetting = config.GetSection("AppSetting:RabbitMQSetting");
  13. var factory = new ConnectionFactory
  14. {
  15. HostName = appSetting.GetSection("HostName").Value,
  16. UserName = appSetting.GetSection("UserName").Value,
  17. Password = appSetting.GetSection("Password").Value,
  18. Port = 5672,
  19. //VirtualHost= "myRabbit"
  20. };
  21. return factory.CreateConnection();
  22. }
  23. }

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "RedisSetting": {
  17. "Connection": "192.168.3.40:6379,password=",
  18. "Database": "test"
  19. },
  20. "RabbitMQSetting": {
  21. "HostName": "localhost",
  22. "UserName": "admin",
  23. "Password": "admin",
  24. "Port": 5672
  25. },
  26. "MongoDbSetting": {
  27. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  28. "Database": "test"
  29. }
  30. }
  31. }

示例

分别创建My.MQ.Topic.Send和My.MQ.Topic.ReceiveLogs两个项目,

发送日志

  1. // 发送日志--主题交换机
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using System.Text;
  5. //建立连接
  6. using (var connection = MQConnection.CreateConnection())
  7. //创建信道
  8. using (var channel = connection.CreateModel())
  9. {
  10. channel.ExchangeDeclare(exchange: "topic_logs",
  11. type: "topic");
  12. var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
  13. var message = (args.Length > 1)
  14. ? string.Join(" ", args.Skip(1).ToArray())
  15. : "Hello World!";
  16. var body = Encoding.UTF8.GetBytes(message);
  17. channel.BasicPublish(exchange: "topic_logs",
  18. routingKey: routingKey,
  19. basicProperties: null,
  20. body: body);
  21. Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
  22. }

接收日志

  1. // 接收日志--主题交换机
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using RabbitMQ.Client.Events;
  5. using System.Text;
  6. //建立连接
  7. using (var connection = MQConnection.CreateConnection())
  8. //创建信道
  9. using (var channel = connection.CreateModel())
  10. {
  11. channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
  12. var queueName = channel.QueueDeclare().QueueName;
  13. if (args.Length < 1)
  14. {
  15. Console.Error.WriteLine("Usage: {0} [binding_key...]",
  16. Environment.GetCommandLineArgs()[0]);
  17. Console.WriteLine(" Press [enter] to exit.");
  18. Console.ReadLine();
  19. Environment.ExitCode = 1;
  20. return;
  21. }
  22. foreach (var bindingKey in args)
  23. {
  24. channel.QueueBind(queue: queueName,
  25. exchange: "topic_logs",
  26. routingKey: bindingKey);
  27. }
  28. Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
  29. var consumer = new EventingBasicConsumer(channel);
  30. consumer.Received += (model, ea) =>
  31. {
  32. var body = ea.Body.ToArray();
  33. var message = Encoding.UTF8.GetString(body);
  34. var routingKey = ea.RoutingKey;
  35. Console.WriteLine(" [x] Received '{0}':'{1}'",
  36. routingKey,
  37. message);
  38. };
  39. channel.BasicConsume(queue: queueName,
  40. autoAck: true,
  41. consumer: consumer);
  42. Console.WriteLine(" Press [enter] to exit.");
  43. Console.ReadLine();
  44. }

测试

My.MQ.Topic.ReceiveLogs

  1. cd F:\project\LFC.NET.Demo.Project\My.RabbitMQ.Demo\My.MQ.Topic.ReceiveLogs

接收所有日志

  1. dotnet run "#"

接收来自于”kern”设施的所有日志:

  1. dotnet run "kern.*"

只想接收跟”严重“(”critical“)程度有关的日志

  1. dotnet run "*.critical"

或者可以创建多个绑定

  1. dotnet run "kern.*" "*.critical"

My.MQ.Topic.Send

  1. cd F:\project\LFC.NET.Demo.Project\My.RabbitMQ.Demo\My.MQ.Topic.Send

发送一个路由键为”kern.critical”的日志:

  1. dotnet run "kern.critical" "A critical kernel error"

1663387674357.png