C#创建RabbitMq连接

  1. public static class BasePublisher
  2. {
  3. public static ConnectionFactory CreateRabbitMqConnection()
  4. {
  5. // RabbitMQ连接工厂
  6. return new ConnectionFactory()
  7. {
  8. HostName = "localhost",
  9. // 用户名
  10. UserName = "guest",
  11. // 密码
  12. Password = "guest",
  13. // 网络故障自动恢复连接
  14. AutomaticRecoveryEnabled = true,
  15. // 心跳处理
  16. RequestedHeartbeat = new TimeSpan(5000)
  17. };
  18. }
  19. }

点对点

“ P”是生产者,“ C”是消费者。中间的框是一个队列-RabbitMQ代表保留的消息缓冲区

1.2 Exchange介绍及代码示例 - 图1

  1. #define publisher
  2. namespace RabbitMQPublisher
  3. {
  4. using System;
  5. using System.Text;
  6. using RabbitMQ.Client;
  7. using RabbitMQ.Client.Events;
  8. /// <summary>
  9. /// 点对点:最简单的工作模式
  10. /// </summary>
  11. internal static class PointToPointPublisher
  12. {
  13. readonly static string queueName = "test.pointToPoint.queue";
  14. #if publisher
  15. private static void Main(string[] args)
  16. {
  17. while (true)
  18. {
  19. Console.WriteLine("消息发布者:模式{点对点}=>输入消息内容");
  20. string message = Console.ReadLine();
  21. if (!string.IsNullOrEmpty(message))
  22. {
  23. // RabbitMQ连接工厂
  24. ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();
  25. // 建立连接
  26. using IConnection connection = factory.CreateConnection();
  27. // 创建信道
  28. using IModel channel = connection.CreateModel();
  29. // 声明队列
  30. channel.QueueDeclare(queueName, false, false, false, null);
  31. // 消息发送
  32. channel.BasicPublish(
  33. exchange: "",
  34. routingKey: queueName,
  35. basicProperties: null,
  36. body: Encoding.UTF8.GetBytes(message));
  37. }
  38. }
  39. }
  40. #else
  41. private static void Main(string[] args)
  42. {
  43. Console.WriteLine($"PointToPointConsumer");
  44. // RabbitMQ连接工厂
  45. ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();
  46. // 建立连接
  47. using IConnection connection = factory.CreateConnection();
  48. // 创建信道
  49. using IModel channel = connection.CreateModel();
  50. // 声明队列
  51. channel.QueueDeclare(
  52. queue: queueName,
  53. durable: false,
  54. exclusive: false,
  55. autoDelete: false,
  56. arguments: null);
  57. EventingBasicConsumer consumer =
  58. new EventingBasicConsumer(channel);
  59. // 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息
  60. channel.BasicQos(0, 1, false);
  61. // 绑定消息接收后的事件委托
  62. consumer.Received += (model, ea) =>
  63. {
  64. string message =
  65. Encoding.UTF8.GetString(ea.Body.ToArray());
  66. Console.WriteLine($"Message:{message}");
  67. channel.BasicAck(
  68. deliveryTag: ea.DeliveryTag,
  69. // 是否一次性确认多条数据
  70. multiple: false);
  71. };
  72. channel.BasicConsume(queue: queueName,
  73. autoAck: false,
  74. consumer: consumer);
  75. Console.ReadLine();
  76. }
  77. #endif
  78. }
  79. }

Worker

worker模式是一对多的模式,但是这个一对多并不是像发布订阅那种,而是将消息顺序传输给每个接收者

1.2 Exchange介绍及代码示例 - 图2

生产者

  1. while (true)
  2. {
  3. Console.WriteLine("消息发布者:模式{Worker}=>输入消息内容");
  4. string message = Console.ReadLine();
  5. if (!string.IsNullOrEmpty(message))
  6. {
  7. // RabbitMQ连接工厂
  8. var factory = BasePublisher.CreateRabbitMqConnection();
  9. // 建立连接
  10. using var connection = factory.CreateConnection();
  11. // 创建信道
  12. using var channel = connection.CreateModel();
  13. // 声明队列
  14. string queueName = "test.worker.queue";
  15. channel.QueueDeclare(queueName, false, false, false, null);
  16. // 消息发送
  17. channel.BasicPublish(
  18. exchange: "",
  19. routingKey: queueName,
  20. basicProperties: null,
  21. body: Encoding.UTF8.GetBytes(message));
  22. }
  23. }

消费者

  1. static void Main(string[] args)
  2. {
  3. Console.WriteLine($"{nameof(WorkerConsumerClient1)}:");
  4. // RabbitMQ连接工厂
  5. var factory = BaseConsumer.CreateRabbitMqConnection();
  6. // 建立连接
  7. using var connection = factory.CreateConnection();
  8. // 创建信道
  9. using var channel = connection.CreateModel();
  10. string queueName = "test.worker.queue";
  11. // 申明队列
  12. channel.QueueDeclare(
  13. queue: queueName,
  14. durable: false,
  15. exclusive: false,
  16. autoDelete: false,
  17. arguments: null);
  18. EventingBasicConsumer consumer =
  19. new EventingBasicConsumer(channel);
  20. // 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息
  21. channel.BasicQos(0, 1, false);
  22. // 绑定消息接收后的事件委托
  23. consumer.Received += (model, ea) =>
  24. {
  25. string message =
  26. Encoding.UTF8.GetString(ea.Body.ToArray());
  27. Console.WriteLine($"Message:{message}");
  28. channel.BasicAck(
  29. deliveryTag: ea.DeliveryTag,
  30. // 是否一次性确认多条数据
  31. multiple: false);
  32. };
  33. channel.BasicConsume(queue: queueName,
  34. autoAck: false,
  35. consumer: consumer);
  36. Console.ReadLine();
  37. }

这里定义了两个消费者来消费Queue,结果如下

image.png

ExchangesType

Exchange分发消息时根据类型的不同分发策略有区别:directfanouttopicheadersheaders 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型

fanout

发布订阅模式(fanout),消息发送到Exchange,所有订阅了当前Exchange的Queue都可以收到消息;每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的

image.png

生产者

  1. while (true)
  2. {
  3. Console.WriteLine("消息发布者:模式{fanout}=>输入消息内容");
  4. string message = Console.ReadLine();
  5. if (!string.IsNullOrEmpty(message))
  6. {
  7. var factory = BasePublisher.CreateRabbitMqConnection();
  8. using var connection = factory.CreateConnection();
  9. using var channel = connection.CreateModel();
  10. // 声明交换机
  11. string exchangeName = $"test.exchange.fanout";
  12. channel.ExchangeDeclare(
  13. exchange: exchangeName,
  14. type: "fanout");
  15. // 声明队列
  16. string queue1 = "test.fanout.queue1";
  17. channel.QueueDeclare(queue1, false, false, false, null);
  18. string queue2 = "test.fanout.queue2";
  19. channel.QueueDeclare(queue2, false, false, false, null);
  20. // 将队列与交换机进行绑定
  21. channel.QueueBind(
  22. queue: queue1,
  23. exchange: exchangeName,
  24. routingKey: "fanout");
  25. channel.QueueBind(
  26. queue: queue2,
  27. exchange: exchangeName,
  28. routingKey: "");
  29. channel.BasicPublish(
  30. exchange: exchangeName,
  31. routingKey: "",
  32. basicProperties: null,
  33. body: Encoding.UTF8.GetBytes(message));
  34. }
  35. }

image.png

这里绑定q1时指定了routingkey=”fanout”但是q1/q2都正常收到了消息

消费者1

  1. Console.WriteLine($"{nameof(FanoutConsumerClient1)}:");
  2. // RabbitMQ连接工厂
  3. var factory = BaseConsumer.CreateRabbitMqConnection();
  4. // 建立连接
  5. using var connection = factory.CreateConnection();
  6. // 创建信道
  7. using var channel = connection.CreateModel();
  8. string exchangeName = $"test.exchange.fanout";
  9. //声明交换机并指定类型
  10. channel.ExchangeDeclare(
  11. exchange: exchangeName,
  12. type: "fanout");
  13. string queueName = $"test.fanout.queue1";
  14. //声明队列
  15. channel.QueueDeclare(queue: queueName,
  16. durable: false,
  17. exclusive: false,
  18. autoDelete: false,
  19. arguments: null);
  20. //将队列与交换机进行绑定
  21. channel.QueueBind(
  22. queue: queueName,
  23. exchange: exchangeName,
  24. routingKey: "fanout");
  25. EventingBasicConsumer consumer =
  26. new EventingBasicConsumer(channel);
  27. // 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息
  28. channel.BasicQos(0, 1, false);
  29. // 绑定消息接收后的事件委托
  30. consumer.Received += (model, ea) =>
  31. {
  32. string message =
  33. Encoding.UTF8.GetString(ea.Body.ToArray());
  34. Console.WriteLine($"Message:{message}");
  35. channel.BasicAck(
  36. deliveryTag: ea.DeliveryTag,
  37. // 是否一次性确认多条数据
  38. multiple: false);
  39. };
  40. channel.BasicConsume(queue: queueName,
  41. autoAck: false,
  42. consumer: consumer);
  43. Console.ReadLine();

消费者2

  1. Console.WriteLine($"{nameof(FanoutConsumerClient2)}:");
  2. // RabbitMQ连接工厂
  3. var factory = BaseConsumer.CreateRabbitMqConnection();
  4. // 建立连接
  5. using var connection = factory.CreateConnection();
  6. // 创建信道
  7. using var channel = connection.CreateModel();
  8. string exchangeName = $"test.rabbitMq.fanout";
  9. //声明交换机并指定类型
  10. channel.ExchangeDeclare(
  11. exchange: exchangeName,
  12. type: "fanout");
  13. string queueName = $"test.fanout.queue2";
  14. //声明队列
  15. channel.QueueDeclare(queue: queueName,
  16. durable: false,
  17. exclusive: false,
  18. autoDelete: false,
  19. arguments: null);
  20. //将队列与交换机进行绑定
  21. channel.QueueBind(
  22. queue: queueName,
  23. exchange: exchangeName,
  24. routingKey: "");
  25. EventingBasicConsumer consumer =
  26. new EventingBasicConsumer(channel);
  27. // 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息
  28. channel.BasicQos(0, 1, false);
  29. // 绑定消息接收后的事件委托
  30. consumer.Received += (model, ea) =>
  31. {
  32. string message =
  33. Encoding.UTF8.GetString(ea.Body.ToArray());
  34. Console.WriteLine($"Message:{message}");
  35. channel.BasicAck(
  36. deliveryTag: ea.DeliveryTag,
  37. // 是否一次性确认多条数据
  38. multiple: false);
  39. };
  40. channel.BasicConsume(queue: queueName,
  41. autoAck: false,
  42. consumer: consumer);
  43. Console.ReadLine();

image.png

Consumer1消费时指定了routingKey,但是两个消费端都正常收到了消息,说明ExchangeType=”fanout”时,不受routingKey影响

direct

direct跟fanout的区别在于多了routekey,消息发送到Exchange,所有订阅了当前Exchange并且routingKey完全匹配的Queue才可以收到消息;消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式

image.png

  1. static void Main(string[] args)
  2. {
  3. while (true)
  4. {
  5. Console.WriteLine("消息发布者:模式{direct}=>输入消息内容");
  6. string message = Console.ReadLine();
  7. if (!string.IsNullOrEmpty(message))
  8. {
  9. ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();
  10. using var connection = factory.CreateConnection();
  11. using var channel = connection.CreateModel();
  12. // 声明交换机
  13. string exchangeName = $"test.exchange.direct";
  14. channel.ExchangeDeclare(
  15. exchange: exchangeName,
  16. type: "direct");
  17. // 声明队列
  18. string queue1 = "test.direct.queue1";
  19. channel.QueueDeclare(queue1, false, false, false, null);
  20. string queue2 = "test.direct.queue2";
  21. channel.QueueDeclare(queue2, false, false, false, null);
  22. //将队列与交换机进行绑定
  23. channel.QueueBind(
  24. queue: queue1,
  25. exchange: exchangeName,
  26. routingKey: "fanout");
  27. channel.QueueBind(
  28. queue: queue2,
  29. exchange: exchangeName,
  30. routingKey: "");
  31. // 只有queue1可以收到消息,因为queue2的routingKey不匹配
  32. channel.BasicPublish(
  33. exchange: exchangeName,
  34. routingKey: "fanout",
  35. basicProperties: null,
  36. body: Encoding.UTF8.GetBytes(message));
  37. }
  38. }
  39. }

topic

topic符模式与路由模式一致,只不过通配符模式中的路由可以声明为模糊查询,RabbitMQ拥有两个通配符;topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词

image.png

  • “#”:匹配0-n个字符语句
  • “*”:匹配一个字符语句

注意:RabbitMQ中通配符并不像正则中的单个字符,而是一个以“.”分割的字符串,如 ”topic1.*“匹配的规则以topic1开始并且”.”后只有一段语句的路由。例:”topic1.aaa”,”topic1.bb”

  1. namespace RabbitMQPublisher
  2. {
  3. using System;
  4. using System.Text;
  5. using RabbitMQ.Client;
  6. /// <summary>
  7. /// 路由模式(topic),消息会发送到exchange
  8. /// topic与direct模式区别在于routingKey可以声明为模糊查询,RabbitMQ拥有两个通配符
  9. /// #:匹配0-n个字符语句
  10. /// *:匹配一个字符语句
  11. /// </summary>
  12. static class TopicPublisher
  13. {
  14. static void Main(string[] args)
  15. {
  16. while (true)
  17. {
  18. Console.WriteLine("消息发布者:模式{topic}=>输入消息内容");
  19. string message = Console.ReadLine();
  20. if (!string.IsNullOrEmpty(message))
  21. {
  22. ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();
  23. using var connection = factory.CreateConnection();
  24. using var channel = connection.CreateModel();
  25. // 声明交换机
  26. string exchangeName = $"test.exchange.topic";
  27. channel.ExchangeDeclare(
  28. exchange: exchangeName,
  29. type: "topic");
  30. // 声明队列
  31. string queue1 = "test.topic.queue1";
  32. channel.QueueDeclare(queue1, false, false, false, null);
  33. string queue2 = "test.topic.queue2";
  34. channel.QueueDeclare(queue2, false, false, false, null);
  35. //将队列与交换机进行绑定
  36. channel.QueueBind(
  37. queue: queue1,
  38. exchange: exchangeName,
  39. routingKey: "topic.*");
  40. channel.QueueBind(
  41. queue: queue2,
  42. exchange: exchangeName,
  43. routingKey: "topic.#");
  44. #if debug
  45. // queue1和queue2都可以收到消息
  46. channel.BasicPublish(
  47. exchange: exchangeName,
  48. routingKey: "topic.test",
  49. basicProperties: null,
  50. body: Encoding.UTF8.GetBytes(message));
  51. #endif
  52. // 只有queue2可以收到消息,因为.#可以匹配一个或者多个字符语句而.*只能匹配单个
  53. channel.BasicPublish(
  54. exchange: exchangeName,
  55. routingKey: "topic.test.test",
  56. basicProperties: null,
  57. body: Encoding.UTF8.GetBytes(message));
  58. }
  59. }
  60. }
  61. }
  62. }

发布,路由,通配符这三种模式可以算为一种模式,区别仅仅是交互机类型不同.发送者将消息发送发送到交换机,接收者创建各自的消息队列绑定到交换机

image.png