CAP
●CAP docs看到更多详细资料。
●CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples
CAP 运输器
通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。
怎么选择运输器
| 🏳🌈 | RabbitMQ | Kafka | Azure Service Bus | In-Memory |
|---|---|---|---|---|
| 定位 | 可靠消息传输 | 实时数据处理 | 云 | 内存型,测试 |
| 分布式 | ✔ | ✔ | ✔ | ❌ |
| 持久化 | ✔ | ✔ | ✔ | ❌ |
| 性能 | Medium | High | Medium | High |
RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而聚类和故障转移是构建在开源的通讯平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
CAP 支持使用 RabbitMQ 作为消息传输器。
RabbitMQ Options
CAP 直接对外提供的 RabbitMQ 配置参数如下:
| NAME | DESCRIPTION | TYPE | DEFAULT |
|---|---|---|---|
| HostName | 宿主地址,配置集群可以使用逗号分隔,如 192.168.1.111,192.168.1.112 | string | localhost |
| UserName | 用户名 | string | guest |
| Password | 密码 | string | guest |
| VirtualHost | 虚拟主机 | string | / |
| Port | 端口号 | int | -1 |
| ExchangeName | CAP默认Exchange名称 | string | cap.default.topic |
| QueueArguments | 创建队列额外参数 x-arguments | QueueArgumentsOptions | N/A |
| ConnectionFactoryOptions | RabbitMQClient原生参数 | ConnectionFactory | N/A |
| CustomHeaders | 订阅者自定义头信息 | Func |
N/A |
CustomHeaders Options
当需要从异构系统或者直接接收从RabbitMQ 控制台发送的消息时,由于 CAP 需要定义额外的头信息才能正常订阅,所以此时会出现异常。通过提供此参数来进行自定义头信息的设置来使订阅者正常工作。
你可以在这里找到有关 头信息 的说明。
用法如下:
config.UseRabbitMQ(o =>{o.CustomHeaders = e => new List<KeyValuePair<string, string>>{new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),};});
连接 RabbitMQ 集群
使用逗号分隔连接字符串即可,如下:
x=> x.UseRabbitMQ("localhost:5672,localhost:5673,localhost:5674")
RabbitMQ.MySQL示例
注意,需要在mysql数据库手工创建好表
CREATE TABLE `persons` (`Id` int NOT NULL,`name` varchar(55) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
NuGet
Install-Package DotNetCore.CAP.RabbitMQInstall-Package DotNetCore.CAP.DashboardInstall-Package DotNetCore.CAP.MySqlInstall-Package Pomelo.EntityFrameworkCore.MySqlInstall-Package Microsoft.EntityFrameworkCore.DesignInstall-Package Dapper
注入CAP
using CAP.Transport.RabbitMQ.MySql;using CAP.Transport.RabbitMQ.MySql.Models;using DotNetCore.CAP.Internal;using DotNetCore.CAP.Messages;var builder = WebApplication.CreateBuilder(args);// Add services to the container.builder.Services.AddControllers();builder.Services.AddEndpointsApiExplorer();builder.Services.AddSwaggerGen();var appSetting = new AppSetting();builder.Configuration.GetSection("AppSetting").Bind(appSetting);//把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot<AppSetting> optionsbuilder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));builder.Services.AddDbContext<AppDbContext>();builder.Services.AddCap(config =>{config.UseEntityFramework<AppDbContext>();config.UseRabbitMQ(o =>{o.HostName = appSetting.RabbitMQSetting.HostName;o.UserName = appSetting.RabbitMQSetting.UserName;o.Password = appSetting.RabbitMQSetting.Password;o.Port = appSetting.RabbitMQSetting.Port;o.CustomHeaders = e => new List<KeyValuePair<string, string>>{new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),};o.ConnectionFactoryOptions = opt => {//rabbitmq client ConnectionFactory config};});//配置RabbitMQconfig.UseDashboard();//配置Dashboardconfig.FailedRetryCount = 5;config.FailedThresholdCallback = failed =>{var logger = failed.ServiceProvider.GetService<ILogger<Program>>();logger?.LogError($@"A message of type {failed.MessageType} failed after executing {config.FailedRetryCount} several times,requiring manual troubleshooting. Message name: {failed.Message.GetName()}");};});var app = builder.Build();// Configure the HTTP request pipeline.if (app.Environment.IsDevelopment()){app.UseSwagger();app.UseSwaggerUI();}app.UseHttpsRedirection();app.UseAuthorization();app.MapControllers();app.Run();
公共部分
{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","AppSetting": {"SqlServerSetting": {"Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"},"MysqlSetting": {"Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"},"RedisSetting": {"Connection": "192.168.3.40:6379,password=","Database": "test"},"RabbitMQSetting": {"HostName": "localhost","UserName": "admin","Password": "admin","Port": 5672},"MongoDbSetting": {"Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0"Database": "test"}}}
namespace CAP.Transport.RabbitMQ.MySql.Models;/// <summary>/// mongodb setting/// </summary>public class AppSetting{/// <summary>/// SqlServerSetting/// </summary>public SqlServerSetting SqlServerSetting { get; set; }/// <summary>/// MysqlSetting/// </summary>public MysqlSetting MysqlSetting { get; set; }/// <summary>/// RedisSetting/// </summary>public RedisSetting RedisSetting { get; set; }/// <summary>/// RabbitMQSetting/// </summary>public RabbitMQSetting RabbitMQSetting { get; set; }/// <summary>/// MongoDbSetting/// </summary>public MongoDbSetting MongoDbSetting { get; set; }/// <summary>/// KafkaSetting/// </summary>public KafkaSetting KafkaSetting { get; set; }}public class RabbitMQSetting{/// <summary>/// HostName/// </summary>public string HostName { get; set; } = "localhost";/// <summary>/// Password/// </summary>public string Password { get; set; }/// <summary>/// Username/// </summary>public string UserName { get; set; }/// <summary>/// The port to connect on./// </summary>public int Port { get; set; } = 5672;}public class RedisSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }/// <summary>/// database name/// </summary>public string Database { get; set; }}public class MongoDbSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }/// <summary>/// database name/// </summary>public string Database { get; set; }}public class SqlServerSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class MysqlSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class KafkaSetting{/// <summary>/// Servers/// </summary>public string Servers { get; set; }}
using CAP.Transport.RabbitMQ.MySql.Models;using Microsoft.EntityFrameworkCore;using Microsoft.Extensions.Options;namespace CAP.Transport.RabbitMQ.MySql;public class AppDbContext : DbContext{/// <summary>/// AppDbContext/// builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));/// </summary>/// <param name="options"></param>public AppDbContext(IOptionsSnapshot<AppSetting> options){ConnectionString = options != null ? options.Value.SqlServerSetting.Connection : "";}//配置数据库连接public static string ConnectionString;public DbSet<Person> Persons { get; set; }protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseMySql(ConnectionString, ServerVersion.AutoDetect(ConnectionString));}}
using Microsoft.AspNetCore.Mvc;namespace CAP.Transport.RabbitMQDemo.Controllers;/// <summary>/// 基础控制器/// </summary>[Route("api/[area]/[controller]/[action]")][ApiController]public abstract class BaseController : ControllerBase{}
using Microsoft.AspNetCore.Mvc;namespace CAP.Transport.RabbitMQDemo.Controllers.MQ;/// <summary>/// 域控制器/// </summary>[Area("Order")]public abstract class AreaController : BaseController{}
发布消息
using CAP.Transport.RabbitMQ.MySql.Models;using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;using MySqlConnector;using System;namespace CAP.Transport.RabbitMQ.MySql.Controllers.MQ;/// <summary>/// 发送消息/// </summary>public class PublishController : AreaController{ICapPublisher _capBus;public PublishController(ICapPublisher capBus){_capBus = capBus;}/// <summary>/// 发布者/// </summary>/// <returns></returns>[HttpGet]public async Task<IActionResult> WithoutTransaction(){Console.WriteLine("Publish send message: " + DateTime.Now);await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);return Ok();}/// <summary>/// 发布者-MySqlConnection/// </summary>/// <returns></returns>[HttpPost]public IActionResult AdonetWithTransaction(Person person){if (person == null || person.Id <= 0) return Ok();using (var connection = new MySqlConnection(AppDbContext.ConnectionString)){using (var transaction = connection.BeginTransaction(_capBus, true)){string sqlstr = string.Format("insert into persons(Id,name) values({0},'{1}')", person.Id, person.Name);using (MySqlCommand cmd = new(sqlstr, connection)){Console.WriteLine("即将执行SQL语句: " + sqlstr);int resut = cmd.ExecuteNonQuery();}//for (int i = 0; i < 5; i++)//{_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);//}}}return Ok();}/// <summary>/// 发布者-EF/// </summary>/// <returns></returns>[HttpPost]public IActionResult EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext, Person person){using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)){dbContext.Persons.Add(new Person() { Id=person.Id,Name = person.Name+"ef" }) ;for (int i = 0; i < 1; i++){_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);}dbContext.SaveChanges();trans.Commit();}return Ok();}}
处理消息
using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;namespace CAP.Transport.RabbitMQ.MySql.Controllers.MQ;/// <summary>/// 处理消息/// </summary>[ApiController]public class ConsumerController : ControllerBase{/// <summary>/// 订阅消费者-MySqlConnection/// </summary>/// <param name="value"></param>[NonAction][CapSubscribe("sample.rabbitmq.mysql")]public void Subscriber(DateTime p){Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");}/// <summary>/// 订阅消费者-EF/// </summary>/// <param name="p"></param>/// <param name="header"></param>[NonAction][CapSubscribe("sample.rabbitmq.mysql", Group = "group.test2")]public void Subscriber2(DateTime p, [FromCap] CapHeader header){Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");}}
测试
https://localhost:7285/api/Order/Publish/EntityFrameworkWithTransaction



