CAP


CAP docs看到更多详细资料。
CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples

CAP 运输器

通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。

怎么选择运输器

🏳‍🌈 RabbitMQ Kafka Azure Service Bus In-Memory
定位 可靠消息传输 实时数据处理 内存型,测试
分布式
持久化
性能 Medium High Medium High

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而聚类和故障转移是构建在开源的通讯平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
CAP 支持使用 RabbitMQ 作为消息传输器。

RabbitMQ Options

CAP 直接对外提供的 RabbitMQ 配置参数如下:

NAME DESCRIPTION TYPE DEFAULT
HostName 宿主地址,配置集群可以使用逗号分隔,如 192.168.1.111,192.168.1.112 string localhost
UserName 用户名 string guest
Password 密码 string guest
VirtualHost 虚拟主机 string /
Port 端口号 int -1
ExchangeName CAP默认Exchange名称 string cap.default.topic
QueueArguments 创建队列额外参数 x-arguments QueueArgumentsOptions N/A
ConnectionFactoryOptions RabbitMQClient原生参数 ConnectionFactory N/A
CustomHeaders 订阅者自定义头信息 Func>> N/A

CustomHeaders Options

当需要从异构系统或者直接接收从RabbitMQ 控制台发送的消息时,由于 CAP 需要定义额外的头信息才能正常订阅,所以此时会出现异常。通过提供此参数来进行自定义头信息的设置来使订阅者正常工作。
你可以在这里找到有关 头信息 的说明。
用法如下:

  1. config.UseRabbitMQ(o =>
  2. {
  3. o.CustomHeaders = e => new List<KeyValuePair<string, string>>
  4. {
  5. new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  6. new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
  7. };
  8. });

连接 RabbitMQ 集群

使用逗号分隔连接字符串即可,如下:

  1. x=> x.UseRabbitMQ("localhost:5672,localhost:5673,localhost:5674")

RabbitMQ.MySQL示例

注意,需要在mysql数据库手工创建好表

  1. CREATE TABLE `persons` (
  2. `Id` int NOT NULL,
  3. `name` varchar(55) DEFAULT NULL
  4. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

NuGet

  1. Install-Package DotNetCore.CAP.RabbitMQ
  2. Install-Package DotNetCore.CAP.Dashboard
  3. Install-Package DotNetCore.CAP.MySql
  4. Install-Package Pomelo.EntityFrameworkCore.MySql
  5. Install-Package Microsoft.EntityFrameworkCore.Design
  6. Install-Package Dapper

注入CAP

  1. using CAP.Transport.RabbitMQ.MySql;
  2. using CAP.Transport.RabbitMQ.MySql.Models;
  3. using DotNetCore.CAP.Internal;
  4. using DotNetCore.CAP.Messages;
  5. var builder = WebApplication.CreateBuilder(args);
  6. // Add services to the container.
  7. builder.Services.AddControllers();
  8. builder.Services.AddEndpointsApiExplorer();
  9. builder.Services.AddSwaggerGen();
  10. var appSetting = new AppSetting();
  11. builder.Configuration.GetSection("AppSetting").Bind(appSetting);
  12. //把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot<AppSetting> options
  13. builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));
  14. builder.Services.AddDbContext<AppDbContext>();
  15. builder.Services.AddCap(config =>
  16. {
  17. config.UseEntityFramework<AppDbContext>();
  18. config.UseRabbitMQ(o =>
  19. {
  20. o.HostName = appSetting.RabbitMQSetting.HostName;
  21. o.UserName = appSetting.RabbitMQSetting.UserName;
  22. o.Password = appSetting.RabbitMQSetting.Password;
  23. o.Port = appSetting.RabbitMQSetting.Port;
  24. o.CustomHeaders = e => new List<KeyValuePair<string, string>>
  25. {
  26. new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  27. new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
  28. };
  29. o.ConnectionFactoryOptions = opt => {
  30. //rabbitmq client ConnectionFactory config
  31. };
  32. });//配置RabbitMQ
  33. config.UseDashboard();//配置Dashboard
  34. config.FailedRetryCount = 5;
  35. config.FailedThresholdCallback = failed =>
  36. {
  37. var logger = failed.ServiceProvider.GetService<ILogger<Program>>();
  38. logger?.LogError($@"A message of type {failed.MessageType} failed after executing {config.FailedRetryCount} several times,requiring manual troubleshooting. Message name: {failed.Message.GetName()}");
  39. };
  40. });
  41. var app = builder.Build();
  42. // Configure the HTTP request pipeline.
  43. if (app.Environment.IsDevelopment())
  44. {
  45. app.UseSwagger();
  46. app.UseSwaggerUI();
  47. }
  48. app.UseHttpsRedirection();
  49. app.UseAuthorization();
  50. app.MapControllers();
  51. app.Run();

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "RedisSetting": {
  17. "Connection": "192.168.3.40:6379,password=",
  18. "Database": "test"
  19. },
  20. "RabbitMQSetting": {
  21. "HostName": "localhost",
  22. "UserName": "admin",
  23. "Password": "admin",
  24. "Port": 5672
  25. },
  26. "MongoDbSetting": {
  27. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  28. "Database": "test"
  29. }
  30. }
  31. }
  1. namespace CAP.Transport.RabbitMQ.MySql.Models;
  2. /// <summary>
  3. /// mongodb setting
  4. /// </summary>
  5. public class AppSetting
  6. {
  7. /// <summary>
  8. /// SqlServerSetting
  9. /// </summary>
  10. public SqlServerSetting SqlServerSetting { get; set; }
  11. /// <summary>
  12. /// MysqlSetting
  13. /// </summary>
  14. public MysqlSetting MysqlSetting { get; set; }
  15. /// <summary>
  16. /// RedisSetting
  17. /// </summary>
  18. public RedisSetting RedisSetting { get; set; }
  19. /// <summary>
  20. /// RabbitMQSetting
  21. /// </summary>
  22. public RabbitMQSetting RabbitMQSetting { get; set; }
  23. /// <summary>
  24. /// MongoDbSetting
  25. /// </summary>
  26. public MongoDbSetting MongoDbSetting { get; set; }
  27. /// <summary>
  28. /// KafkaSetting
  29. /// </summary>
  30. public KafkaSetting KafkaSetting { get; set; }
  31. }
  32. public class RabbitMQSetting
  33. {
  34. /// <summary>
  35. /// HostName
  36. /// </summary>
  37. public string HostName { get; set; } = "localhost";
  38. /// <summary>
  39. /// Password
  40. /// </summary>
  41. public string Password { get; set; }
  42. /// <summary>
  43. /// Username
  44. /// </summary>
  45. public string UserName { get; set; }
  46. /// <summary>
  47. /// The port to connect on.
  48. /// </summary>
  49. public int Port { get; set; } = 5672;
  50. }
  51. public class RedisSetting
  52. {
  53. /// <summary>
  54. /// connection string
  55. /// </summary>
  56. public string Connection { get; set; }
  57. /// <summary>
  58. /// database name
  59. /// </summary>
  60. public string Database { get; set; }
  61. }
  62. public class MongoDbSetting
  63. {
  64. /// <summary>
  65. /// connection string
  66. /// </summary>
  67. public string Connection { get; set; }
  68. /// <summary>
  69. /// database name
  70. /// </summary>
  71. public string Database { get; set; }
  72. }
  73. public class SqlServerSetting
  74. {
  75. /// <summary>
  76. /// connection string
  77. /// </summary>
  78. public string Connection { get; set; }
  79. }
  80. public class MysqlSetting
  81. {
  82. /// <summary>
  83. /// connection string
  84. /// </summary>
  85. public string Connection { get; set; }
  86. }
  87. public class KafkaSetting
  88. {
  89. /// <summary>
  90. /// Servers
  91. /// </summary>
  92. public string Servers { get; set; }
  93. }
  1. using CAP.Transport.RabbitMQ.MySql.Models;
  2. using Microsoft.EntityFrameworkCore;
  3. using Microsoft.Extensions.Options;
  4. namespace CAP.Transport.RabbitMQ.MySql;
  5. public class AppDbContext : DbContext
  6. {
  7. /// <summary>
  8. /// AppDbContext
  9. /// builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));
  10. /// </summary>
  11. /// <param name="options"></param>
  12. public AppDbContext(IOptionsSnapshot<AppSetting> options)
  13. {
  14. ConnectionString = options != null ? options.Value.SqlServerSetting.Connection : "";
  15. }
  16. //配置数据库连接
  17. public static string ConnectionString;
  18. public DbSet<Person> Persons { get; set; }
  19. protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
  20. {
  21. optionsBuilder.UseMySql(ConnectionString, ServerVersion.AutoDetect(ConnectionString));
  22. }
  23. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.RabbitMQDemo.Controllers;
  3. /// <summary>
  4. /// 基础控制器
  5. /// </summary>
  6. [Route("api/[area]/[controller]/[action]")]
  7. [ApiController]
  8. public abstract class BaseController : ControllerBase
  9. {
  10. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.RabbitMQDemo.Controllers.MQ;
  3. /// <summary>
  4. /// 域控制器
  5. /// </summary>
  6. [Area("Order")]
  7. public abstract class AreaController : BaseController
  8. {
  9. }

发布消息

  1. using CAP.Transport.RabbitMQ.MySql.Models;
  2. using DotNetCore.CAP;
  3. using Microsoft.AspNetCore.Mvc;
  4. using MySqlConnector;
  5. using System;
  6. namespace CAP.Transport.RabbitMQ.MySql.Controllers.MQ;
  7. /// <summary>
  8. /// 发送消息
  9. /// </summary>
  10. public class PublishController : AreaController
  11. {
  12. ICapPublisher _capBus;
  13. public PublishController(ICapPublisher capBus)
  14. {
  15. _capBus = capBus;
  16. }
  17. /// <summary>
  18. /// 发布者
  19. /// </summary>
  20. /// <returns></returns>
  21. [HttpGet]
  22. public async Task<IActionResult> WithoutTransaction()
  23. {
  24. Console.WriteLine("Publish send message: " + DateTime.Now);
  25. await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);
  26. return Ok();
  27. }
  28. /// <summary>
  29. /// 发布者-MySqlConnection
  30. /// </summary>
  31. /// <returns></returns>
  32. [HttpPost]
  33. public IActionResult AdonetWithTransaction(Person person)
  34. {
  35. if (person == null || person.Id <= 0) return Ok();
  36. using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
  37. {
  38. using (var transaction = connection.BeginTransaction(_capBus, true))
  39. {
  40. string sqlstr = string.Format("insert into persons(Id,name) values({0},'{1}')", person.Id, person.Name);
  41. using (MySqlCommand cmd = new(sqlstr, connection))
  42. {
  43. Console.WriteLine("即将执行SQL语句: " + sqlstr);
  44. int resut = cmd.ExecuteNonQuery();
  45. }
  46. //for (int i = 0; i < 5; i++)
  47. //{
  48. _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
  49. //}
  50. }
  51. }
  52. return Ok();
  53. }
  54. /// <summary>
  55. /// 发布者-EF
  56. /// </summary>
  57. /// <returns></returns>
  58. [HttpPost]
  59. public IActionResult EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext, Person person)
  60. {
  61. using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
  62. {
  63. dbContext.Persons.Add(new Person() { Id=person.Id,Name = person.Name+"ef" }) ;
  64. for (int i = 0; i < 1; i++)
  65. {
  66. _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
  67. }
  68. dbContext.SaveChanges();
  69. trans.Commit();
  70. }
  71. return Ok();
  72. }
  73. }

处理消息

  1. using DotNetCore.CAP;
  2. using Microsoft.AspNetCore.Mvc;
  3. namespace CAP.Transport.RabbitMQ.MySql.Controllers.MQ;
  4. /// <summary>
  5. /// 处理消息
  6. /// </summary>
  7. [ApiController]
  8. public class ConsumerController : ControllerBase
  9. {
  10. /// <summary>
  11. /// 订阅消费者-MySqlConnection
  12. /// </summary>
  13. /// <param name="value"></param>
  14. [NonAction]
  15. [CapSubscribe("sample.rabbitmq.mysql")]
  16. public void Subscriber(DateTime p)
  17. {
  18. Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
  19. }
  20. /// <summary>
  21. /// 订阅消费者-EF
  22. /// </summary>
  23. /// <param name="p"></param>
  24. /// <param name="header"></param>
  25. [NonAction]
  26. [CapSubscribe("sample.rabbitmq.mysql", Group = "group.test2")]
  27. public void Subscriber2(DateTime p, [FromCap] CapHeader header)
  28. {
  29. Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
  30. }
  31. }

测试

https://localhost:7285/api/Order/Publish/EntityFrameworkWithTransaction
1663665077188.png
1663665097746.png
1663665352886.png
1663665373765.png