安装包-NuGet
说明
RabbitMq官网https://www.rabbitmq.com/
RabbitMq有很多的消息分发机制和模式,这里不做介绍请自行百度,这里是最简单的模式,一个生产者、多个个消费者
正文
接口约束
生产者接口
/// <summary>/// 生产者接口/// </summary>public interface IProducer{/// <summary>/// 注册并初始化生产者/// </summary>void Register();/// <summary>/// 发送消息/// </summary>/// <param name="message">消息内容</param>void SendMsg(string message);}
消费者接口
/// <summary>
/// 消费者接口
/// </summary>
public interface IConsumer : IDisposable
{
/// <summary>
/// 注册并初始化消费者
/// </summary>
/// <param name="Process">对消息处理的委托方法,传入消息,返回是否处理成功</param>
void Register(Func<string, bool> Process);
}
接口实现
RabbitMQ生产者
/// <summary>
/// RabbitMQ生产者
/// </summary>
public class RabbitProducer : IProducer
{
private readonly ILogHelper _logHelper;
private IModel _channel;
string _quenname = AppsettingConfig.RabbitMQQueueName; //队列名称
/// <summary>
/// RabbitMq发送
/// </summary>
/// <param name="logHelper"></param>
public RabbitProducer(ILogHelper logHelper)
{
_logHelper = logHelper;
Register();
}
public void Register()
{
try
{
var factory = new ConnectionFactory()
{
HostName = AppsettingConfig.RabbitMQHost,//rabbitmq地址不带端口号
UserName = AppsettingConfig.RabbitMQUserName,//用户名
Password = AppsettingConfig.RabbitMQPassword,//密码
Port = AppsettingConfig.RabbitMQPort.ObjToInt(5672),//端口号
VirtualHost = AppsettingConfig.RabbitMQVirtualHost//目录
};
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
//声明交换机
//_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
//声明一个队列
_channel.QueueDeclare(_quenname, false, false, false, null);
}
catch (Exception ex)
{
_logHelper.LogErr("RabbitProducer-初始化", ex);
//throw;
}
}
public void SendMsg(string message)
{
_logHelper.LogInfo($"PushMessage:{message}");
try
{
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish("", _quenname, null, body);
}
catch (Exception ex)
{
_logHelper.LogErr("RabbitProducer-SendMsg", ex);
}
}
}
RabbitMQ消费者
/// <summary>
/// Rabbit消息消费者的实现
/// </summary>
public class RabbitConsumer : IConsumer
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly ILogHelper _logHelper;
//string exchangeName = "MyExchange10";
string _quenname = AppsettingConfig.RabbitMQQueueName;
public RabbitConsumer(ILogHelper logHelper)
{
_logHelper = logHelper;
try
{
var factory = new ConnectionFactory()
{
HostName = AppsettingConfig.RabbitMQHost,
UserName = AppsettingConfig.RabbitMQUserName,
Password = AppsettingConfig.RabbitMQPassword,
Port = AppsettingConfig.RabbitMQPort.ObjToInt(5672),
VirtualHost = AppsettingConfig.RabbitMQVirtualHost
};
this._connection = factory.CreateConnection();
this._channel = _connection.CreateModel();
}
catch (Exception ex)
{
_logHelper.LogErr("RabbitConsumer-初始化", ex);
//Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
}
}
/// <summary>
/// 注册消费者
/// </summary>
/// <param name="Process"></param>
public void Register(Func<string,bool> Process)
{
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
if (Process(message))
{
//确认该消息已经被消费
_channel.BasicAck(ea.DeliveryTag, false);
}
else
{
//拒绝消息 并进入为false并进入死信队列
_channel.BasicReject(ea.DeliveryTag, false);
}
//
Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};
//启动消费者 设置为手动应答消息
_channel.BasicConsume(_quenname, false, consumer);
}
public void Dispose()
{
if (_connection != null) _connection.Close();
if (_channel != null) _channel.Close();
}
}
实现完成,下来可是调用。消费者只是实现了功能,但未有载体,比如宿主服务,windows服务,这里使用宿主服务的方式,需要集成IHostedService
测试
宿主服务消费者
/// <summary>
/// 这是适合api站点宿主服务的消费者
/// </summary>
public class RabbitWebHostConsumer : IHostedService
{
private readonly IConsumer _listener;
private readonly ILogHelper _logHelper;
public RabbitWebHostConsumer(IConsumer listener,ILogHelper logHelper)
{
_listener = listener;
_logHelper = logHelper;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_listener.Register(Process);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_listener.Dispose();
return Task.CompletedTask;
}
public bool Process(string message)
{
//return base.Process(message);
var re = false;
try
{
//假设里用来处理消息 比如 写入数据库
Console.WriteLine("来自继承者的消息:"+message);
re = true;
}
catch (Exception ex)
{
_logHelper.LogErr("RabbitWebHostListener.Process",ex);
}
return re;
}
}
开始注册
1、注册RabbitMq对象
services.AddSingleton<IProducer, RabbitProducer>();
services.AddSingleton<IConsumer, RabbitConsumer>();
2、注册宿主消费者
//开启RabbitMQ消费宿主服务
services.AddHostedService<RabbitWebHostConsumer>();
3、Controller中引用生产者,方法中调用
private readonly IProducer _producer;
public WeatherForecastController(IProducer producer)
{
_producer = producer;
}
public async Task<IActionResult> Get()
{
_producer.SendMsg(DateTime.Now.ToString());
}
结果

