https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html
http://rabbitmq.mr-ping.com/tutorials_with_csharp/Topics.html
主题交换机
发送到主题交换机的消息所携带的路由键(routing_key)不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词,不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。只要不超过255个字节,词的长度由你来定。
绑定键(binding key)也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键(routing key)的消息会被投送给所有绑定键(binding key)与之相匹配的队列。尽管如此,仍然有两条与绑定键相关的特殊情况:
*(星号) 能够替代一个单词。#(井号) 能够替代零个或多个单词。
用一个例子可以很容易地解释:
此例中,我们将会发送用来描述动物的多条消息。发送的消息包含带有三个单词(两个点号)的路由键(routing key)。路由键中第一个单词描述速度,第二个单词是颜色,第三个是品种: “<速度>.<颜色>.<品种>”。
我们创建三个绑定:Q1通过”.orange.“绑定键进行绑定,Q2使用”..rabbit” 和 “lazy.#”。
这些绑定可以总结为:
- Q1针对所有的橘色orange动物。
- Q2针对每一个有关兔子rabbits和慵懒lazy的动物的消息。
一个带有”quick.orange.rabbit”绑定键的消息会给两个队列都进行投送。消息”lazy.orange.elephant”也会投送给这两个队列。另外一方面,”quick.orange.fox” 只会给第一个队列。”lazy.pink.rabbit”虽然与两个绑定键都匹配,但只会给第二个队列投送一遍。”quick.brown.fox” 没有匹配到任何绑定,因此会被丢弃掉。
如果我们破坏规则,发送的消息只带有一个或者四个单词,例如 “orange” 或者 “quick.orange.male.rabbit”会发生什么呢?结果是这些消息不会匹配到任何绑定,将会被丢弃。
另一方面,“lazy.orange.male.rabbit”即使有四个单词,也会与最后一个绑定匹配,并 被投送到第二个队列。
公共类库
Install-Package RabbitMQ.ClientInstall-Package Microsoft.Extensions.ConfigurationInstall-Package Microsoft.Extensions.Configuration.Json
using Microsoft.Extensions.Configuration;using RabbitMQ.Client;namespace My.RabbitMQ.Config;public class MQConnection{public static IConnection CreateConnection() {var config = new ConfigurationBuilder().AddInMemoryCollection() //缓存.SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json", optional: true, reloadOnChange: true).Build();var appSetting = config.GetSection("AppSetting:RabbitMQSetting");var factory = new ConnectionFactory{HostName = appSetting.GetSection("HostName").Value,UserName = appSetting.GetSection("UserName").Value,Password = appSetting.GetSection("Password").Value,Port = 5672,//VirtualHost= "myRabbit"};return factory.CreateConnection();}}
公共部分
{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","AppSetting": {"SqlServerSetting": {"Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"},"MysqlSetting": {"Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"},"RedisSetting": {"Connection": "192.168.3.40:6379,password=","Database": "test"},"RabbitMQSetting": {"HostName": "localhost","UserName": "admin","Password": "admin","Port": 5672},"MongoDbSetting": {"Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0"Database": "test"}}}
示例
分别创建My.MQ.Topic.Send和My.MQ.Topic.ReceiveLogs两个项目,
发送日志
// 发送日志--主题交换机using My.RabbitMQ.Config;using RabbitMQ.Client;using System.Text;//建立连接using (var connection = MQConnection.CreateConnection())//创建信道using (var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "topic_logs",type: "topic");var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";var message = (args.Length > 1)? string.Join(" ", args.Skip(1).ToArray()): "Hello World!";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "topic_logs",routingKey: routingKey,basicProperties: null,body: body);Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);}
接收日志
// 接收日志--主题交换机using My.RabbitMQ.Config;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;//建立连接using (var connection = MQConnection.CreateConnection())//创建信道using (var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");var queueName = channel.QueueDeclare().QueueName;if (args.Length < 1){Console.Error.WriteLine("Usage: {0} [binding_key...]",Environment.GetCommandLineArgs()[0]);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();Environment.ExitCode = 1;return;}foreach (var bindingKey in args){channel.QueueBind(queue: queueName,exchange: "topic_logs",routingKey: bindingKey);}Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;Console.WriteLine(" [x] Received '{0}':'{1}'",routingKey,message);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}
测试
My.MQ.Topic.ReceiveLogs
cd F:\project\LFC.NET.Demo.Project\My.RabbitMQ.Demo\My.MQ.Topic.ReceiveLogs
接收所有日志
dotnet run "#"
接收来自于”kern”设施的所有日志:
dotnet run "kern.*"
只想接收跟”严重“(”critical“)程度有关的日志
dotnet run "*.critical"
或者可以创建多个绑定
dotnet run "kern.*" "*.critical"
My.MQ.Topic.Send
cd F:\project\LFC.NET.Demo.Project\My.RabbitMQ.Demo\My.MQ.Topic.Send
发送一个路由键为”kern.critical”的日志:
dotnet run "kern.critical" "A critical kernel error"

