RabbitMQ安装请自行百度,这里不再概述。

安装包-NuGet

RabbitMQ.Client

说明

RabbitMq官网https://www.rabbitmq.com/
RabbitMq有很多的消息分发机制和模式,这里不做介绍请自行百度,这里是最简单的模式,一个生产者、多个个消费者

正文

接口约束

分别设计生产者和消费者接口用来作为消息处理的约束。

生产者接口

  1. /// <summary>
  2. /// 生产者接口
  3. /// </summary>
  4. public interface IProducer
  5. {
  6. /// <summary>
  7. /// 注册并初始化生产者
  8. /// </summary>
  9. void Register();
  10. /// <summary>
  11. /// 发送消息
  12. /// </summary>
  13. /// <param name="message">消息内容</param>
  14. void SendMsg(string message);
  15. }

消费者接口

/// <summary>
    /// 消费者接口
    /// </summary>
    public interface IConsumer : IDisposable
    {
        /// <summary>
        /// 注册并初始化消费者
        /// </summary>
        /// <param name="Process">对消息处理的委托方法,传入消息,返回是否处理成功</param>
        void Register(Func<string, bool> Process);
    }

接口实现

RabbitMQ生产者

/// <summary>
/// RabbitMQ生产者
/// </summary>
public class RabbitProducer : IProducer
    {
        private readonly ILogHelper _logHelper;
        private IModel _channel;
        string _quenname = AppsettingConfig.RabbitMQQueueName; //队列名称
        /// <summary>
        /// RabbitMq发送
        /// </summary>
        /// <param name="logHelper"></param>
        public RabbitProducer(ILogHelper logHelper)
        {
            _logHelper = logHelper;
            Register();
        }

        public void Register()
        {
            try
            {
                var factory = new ConnectionFactory()
                {
                    HostName = AppsettingConfig.RabbitMQHost,//rabbitmq地址不带端口号
                    UserName = AppsettingConfig.RabbitMQUserName,//用户名
                    Password = AppsettingConfig.RabbitMQPassword,//密码
                    Port = AppsettingConfig.RabbitMQPort.ObjToInt(5672),//端口号
                    VirtualHost = AppsettingConfig.RabbitMQVirtualHost//目录
                };
                var connection = factory.CreateConnection();
                _channel = connection.CreateModel();
                //声明交换机
                //_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                //声明一个队列
                _channel.QueueDeclare(_quenname, false, false, false, null);
            }
            catch (Exception ex)
            {
                _logHelper.LogErr("RabbitProducer-初始化", ex);
                //throw;
            }
        }

        public void SendMsg(string message)
        {
            _logHelper.LogInfo($"PushMessage:{message}");
            try
            {
                var body = Encoding.UTF8.GetBytes(message);
                _channel.BasicPublish("", _quenname, null, body);
            }
            catch (Exception ex)
            {

                _logHelper.LogErr("RabbitProducer-SendMsg", ex);
            }
        }

    }

RabbitMQ消费者

/// <summary>
    /// Rabbit消息消费者的实现
    /// </summary>
    public class RabbitConsumer : IConsumer
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly ILogHelper _logHelper;

        //string exchangeName = "MyExchange10";
        string _quenname = AppsettingConfig.RabbitMQQueueName;
        public RabbitConsumer(ILogHelper logHelper)
        {
            _logHelper = logHelper;
            try
            {
                var factory = new ConnectionFactory()
                {
                    HostName = AppsettingConfig.RabbitMQHost,
                    UserName = AppsettingConfig.RabbitMQUserName,
                    Password = AppsettingConfig.RabbitMQPassword,
                    Port = AppsettingConfig.RabbitMQPort.ObjToInt(5672),
                    VirtualHost = AppsettingConfig.RabbitMQVirtualHost
                };
                this._connection = factory.CreateConnection();
                this._channel = _connection.CreateModel();
            }
            catch (Exception ex)
            {
                _logHelper.LogErr("RabbitConsumer-初始化", ex);
                //Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
            }

        }
        /// <summary>
        /// 注册消费者
        /// </summary>
        /// <param name="Process"></param>
        public void Register(Func<string,bool> Process)
        {
            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                if (Process(message))
                {
                    //确认该消息已经被消费
                    _channel.BasicAck(ea.DeliveryTag, false);
                }
                else
                {
                    //拒绝消息 并进入为false并进入死信队列
                    _channel.BasicReject(ea.DeliveryTag, false);
                }
                //
                Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
            };
            //启动消费者 设置为手动应答消息
            _channel.BasicConsume(_quenname, false, consumer);
        }

        public void Dispose()
        {
            if (_connection != null) _connection.Close();
            if (_channel != null) _channel.Close();
        }
    }

实现完成,下来可是调用。消费者只是实现了功能,但未有载体,比如宿主服务,windows服务,这里使用宿主服务的方式,需要集成IHostedService

测试

宿主服务消费者

/// <summary>
/// 这是适合api站点宿主服务的消费者
/// </summary>
public class RabbitWebHostConsumer : IHostedService
    {
        private readonly IConsumer _listener;
        private readonly ILogHelper _logHelper;

        public RabbitWebHostConsumer(IConsumer listener,ILogHelper logHelper)
        {
            _listener = listener;
            _logHelper = logHelper;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _listener.Register(Process);
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _listener.Dispose();
            return Task.CompletedTask;
        }
        public bool Process(string message)
        {
            //return base.Process(message);
            var re = false;
            try
            {
                //假设里用来处理消息 比如 写入数据库
                Console.WriteLine("来自继承者的消息:"+message);
                re = true;
            }
            catch (Exception ex)
            {
                _logHelper.LogErr("RabbitWebHostListener.Process",ex);

            }
            return re;
        }

    }

开始注册

1、注册RabbitMq对象

services.AddSingleton<IProducer, RabbitProducer>();
services.AddSingleton<IConsumer, RabbitConsumer>();

2、注册宿主消费者

//开启RabbitMQ消费宿主服务
services.AddHostedService<RabbitWebHostConsumer>();

3、Controller中引用生产者,方法中调用

private readonly IProducer _producer;

public WeatherForecastController(IProducer producer)
        {
            _producer = producer;
        }

public async Task<IActionResult> Get()
{
    _producer.SendMsg(DateTime.Now.ToString());
}

结果

image.png