CAP
CAP docs看到更多详细资料。
CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples

CAP 运输器

通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。

怎么选择运输器

🏳‍🌈 RabbitMQ Kafka Azure Service Bus In-Memory
定位 可靠消息传输 实时数据处理 内存型,测试
分布式
持久化
性能 Medium High Medium High

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而聚类和故障转移是构建在开源的通讯平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
CAP 支持使用 RabbitMQ 作为消息传输器。

RabbitMQ Options

CAP 直接对外提供的 RabbitMQ 配置参数如下:

NAME DESCRIPTION TYPE DEFAULT
HostName 宿主地址,配置集群可以使用逗号分隔,如 192.168.1.111,192.168.1.112 string localhost
UserName 用户名 string guest
Password 密码 string guest
VirtualHost 虚拟主机 string /
Port 端口号 int -1
ExchangeName CAP默认Exchange名称 string cap.default.topic
QueueArguments 创建队列额外参数 x-arguments QueueArgumentsOptions N/A
ConnectionFactoryOptions RabbitMQClient原生参数 ConnectionFactory N/A
CustomHeaders 订阅者自定义头信息 Func>> N/A

CustomHeaders Options

当需要从异构系统或者直接接收从RabbitMQ 控制台发送的消息时,由于 CAP 需要定义额外的头信息才能正常订阅,所以此时会出现异常。通过提供此参数来进行自定义头信息的设置来使订阅者正常工作。
你可以在这里找到有关 头信息 的说明。
用法如下:

  1. config.UseRabbitMQ(o =>
  2. {
  3. o.CustomHeaders = e => new List<KeyValuePair<string, string>>
  4. {
  5. new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  6. new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
  7. };
  8. });

连接 RabbitMQ 集群

使用逗号分隔连接字符串即可,如下:

  1. x=> x.UseRabbitMQ("localhost:5672,localhost:5673,localhost:5674")

RabbitMQ.SqlServer示例

注意,需要在SqlServer数据库手工创建好表

  1. CREATE TABLE [dbo].[persons](
  2. [Id] [int] NOT NULL,
  3. [name] [varchar](55) NULL
  4. ) ON [PRIMARY]
  5. GO

NuGet

  1. Install-Package DotNetCore.CAP.RabbitMQ
  2. Install-Package DotNetCore.CAP.Dashboard
  3. Install-Package DotNetCore.CAP.SqlServer
  4. Install-Package Microsoft.EntityFrameworkCore.SqlServer
  5. Install-Package Microsoft.EntityFrameworkCore.Design
  6. Install-Package Dapper

注入CAP

  1. using CAP.Transport.RabbitMQ.SqlServer;
  2. using CAP.Transport.RabbitMQ.SqlServer.Models;
  3. using DotNetCore.CAP.Internal;
  4. using DotNetCore.CAP.Messages;
  5. var builder = WebApplication.CreateBuilder(args);
  6. // Add services to the container.
  7. builder.Services.AddControllers();
  8. builder.Services.AddEndpointsApiExplorer();
  9. builder.Services.AddSwaggerGen();
  10. var appSetting = new AppSetting();
  11. builder.Configuration.GetSection("AppSetting").Bind(appSetting);
  12. //把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot<AppSetting> options
  13. builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));
  14. //[FromServices] AppDbContext dbContext
  15. builder.Services.AddDbContext<AppDbContext>();
  16. builder.Services.AddCap(config =>
  17. {
  18. config.UseEntityFramework<AppDbContext>();
  19. config.UseRabbitMQ(o =>
  20. {
  21. o.HostName = appSetting.RabbitMQSetting.HostName;
  22. o.UserName = appSetting.RabbitMQSetting.UserName;
  23. o.Password = appSetting.RabbitMQSetting.Password;
  24. o.Port = appSetting.RabbitMQSetting.Port;
  25. o.CustomHeaders = e => new List<KeyValuePair<string, string>>
  26. {
  27. new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  28. new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
  29. };
  30. o.ConnectionFactoryOptions = opt => {
  31. //rabbitmq client ConnectionFactory config
  32. };
  33. });//配置RabbitMQ
  34. config.UseDashboard();//配置Dashboard
  35. config.FailedRetryCount = 5;
  36. config.FailedThresholdCallback = failed =>
  37. {
  38. var logger = failed.ServiceProvider.GetService<ILogger<Program>>();
  39. logger?.LogError($@"A message of type {failed.MessageType} failed after executing {config.FailedRetryCount} several times,requiring manual troubleshooting. Message name: {failed.Message.GetName()}");
  40. };
  41. });
  42. var app = builder.Build();
  43. // Configure the HTTP request pipeline.
  44. if (app.Environment.IsDevelopment())
  45. {
  46. app.UseSwagger();
  47. app.UseSwaggerUI();
  48. }
  49. app.UseHttpsRedirection();
  50. app.UseAuthorization();
  51. app.MapControllers();
  52. app.Run();

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "RedisSetting": {
  17. "Connection": "192.168.3.40:6379,password=",
  18. "Database": "test"
  19. },
  20. "RabbitMQSetting": {
  21. "HostName": "localhost",
  22. "UserName": "admin",
  23. "Password": "admin",
  24. "Port": 5672
  25. },
  26. "MongoDbSetting": {
  27. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  28. "Database": "test"
  29. }
  30. }
  31. }
  1. namespace CAP.Transport.RabbitMQ.SqlServer.Models;
  2. /// <summary>
  3. /// mongodb setting
  4. /// </summary>
  5. public class AppSetting
  6. {
  7. /// <summary>
  8. /// SqlServerSetting
  9. /// </summary>
  10. public SqlServerSetting SqlServerSetting { get; set; }
  11. /// <summary>
  12. /// MysqlSetting
  13. /// </summary>
  14. public MysqlSetting MysqlSetting { get; set; }
  15. /// <summary>
  16. /// RedisSetting
  17. /// </summary>
  18. public RedisSetting RedisSetting { get; set; }
  19. /// <summary>
  20. /// RabbitMQSetting
  21. /// </summary>
  22. public RabbitMQSetting RabbitMQSetting { get; set; }
  23. /// <summary>
  24. /// MongoDbSetting
  25. /// </summary>
  26. public MongoDbSetting MongoDbSetting { get; set; }
  27. /// <summary>
  28. /// KafkaSetting
  29. /// </summary>
  30. public KafkaSetting KafkaSetting { get; set; }
  31. }
  32. public class RabbitMQSetting
  33. {
  34. /// <summary>
  35. /// HostName
  36. /// </summary>
  37. public string HostName { get; set; } = "localhost";
  38. /// <summary>
  39. /// Password
  40. /// </summary>
  41. public string Password { get; set; }
  42. /// <summary>
  43. /// Username
  44. /// </summary>
  45. public string UserName { get; set; }
  46. /// <summary>
  47. /// The port to connect on.
  48. /// </summary>
  49. public int Port { get; set; } = 5672;
  50. }
  51. public class RedisSetting
  52. {
  53. /// <summary>
  54. /// connection string
  55. /// </summary>
  56. public string Connection { get; set; }
  57. /// <summary>
  58. /// database name
  59. /// </summary>
  60. public string Database { get; set; }
  61. }
  62. public class MongoDbSetting
  63. {
  64. /// <summary>
  65. /// connection string
  66. /// </summary>
  67. public string Connection { get; set; }
  68. /// <summary>
  69. /// database name
  70. /// </summary>
  71. public string Database { get; set; }
  72. }
  73. public class SqlServerSetting
  74. {
  75. /// <summary>
  76. /// connection string
  77. /// </summary>
  78. public string Connection { get; set; }
  79. }
  80. public class MysqlSetting
  81. {
  82. /// <summary>
  83. /// connection string
  84. /// </summary>
  85. public string Connection { get; set; }
  86. }
  87. public class KafkaSetting
  88. {
  89. /// <summary>
  90. /// Servers
  91. /// </summary>
  92. public string Servers { get; set; }
  93. }
  1. using CAP.Transport.RabbitMQ.SqlServer.Models;
  2. using DotNetCore.CAP;
  3. using Microsoft.EntityFrameworkCore;
  4. using Microsoft.Extensions.Configuration;
  5. using Microsoft.Extensions.Options;
  6. namespace CAP.Transport.RabbitMQ.SqlServer;
  7. public class AppDbContext : DbContext
  8. {
  9. /// <summary>
  10. /// AppDbContext
  11. /// builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));
  12. /// </summary>
  13. /// <param name="options"></param>
  14. public AppDbContext(IOptionsSnapshot<AppSetting> options)
  15. {
  16. ConnectionString = options != null ? options.Value.SqlServerSetting.Connection : "";
  17. }
  18. public static string ConnectionString;
  19. public DbSet<Person> Persons { get; set; }
  20. protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
  21. {
  22. optionsBuilder.UseSqlServer(ConnectionString);
  23. }
  24. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.RabbitMQ.SqlServer.Controllers;
  3. /// <summary>
  4. /// 基础控制器
  5. /// </summary>
  6. [Route("api/[area]/[controller]/[action]")]
  7. [ApiController]
  8. public abstract class BaseController : ControllerBase
  9. {
  10. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.RabbitMQ.SqlServer.Controllers.MQ;
  3. /// <summary>
  4. /// 域控制器
  5. /// </summary>
  6. [Area("MQ")]
  7. public abstract class AreaController : BaseController
  8. {
  9. }

发布消息

  1. using CAP.Transport.RabbitMQ.SqlServer.Models;
  2. using DotNetCore.CAP;
  3. using Microsoft.AspNetCore.Mvc;
  4. using Microsoft.Data.SqlClient;
  5. namespace CAP.Transport.RabbitMQ.SqlServer.Controllers.MQ;
  6. /// <summary>
  7. /// 发送消息
  8. /// </summary>
  9. public class PublishController : AreaController
  10. {
  11. ICapPublisher _capBus;
  12. public PublishController(ICapPublisher capBus)
  13. {
  14. _capBus = capBus;
  15. }
  16. /// <summary>
  17. /// 发布者-
  18. /// </summary>
  19. /// <returns></returns>
  20. [HttpGet]
  21. public async Task<IActionResult> WithoutTransaction()
  22. {
  23. await _capBus.PublishAsync("sample.rabbitmq.sqlserver", new Person()
  24. {
  25. Id = 123,
  26. Name = "Bar"
  27. });
  28. return Ok();
  29. }
  30. /// <summary>
  31. /// 发布者-SqlConnection
  32. /// </summary>
  33. /// <returns></returns>
  34. [HttpPost]
  35. public IActionResult AdonetWithTransaction(Person person)
  36. {
  37. using (var connection = new SqlConnection(AppDbContext.ConnectionString))
  38. {
  39. using (var transaction = connection.BeginTransaction(_capBus, true))
  40. {
  41. //your business code
  42. string sqlstr = string.Format("insert into persons(Id,name) values({0},'{1}')", person.Id, person.Name);
  43. using (SqlCommand cmd = new(sqlstr, connection))
  44. {
  45. Console.WriteLine("即将执行SQL语句: " + sqlstr);
  46. int resut = cmd.ExecuteNonQuery();
  47. }
  48. _capBus.Publish("sample.rabbitmq.sqlserver", new Person()
  49. {
  50. Id = 123,
  51. Name = "Bar"
  52. });
  53. }
  54. }
  55. return Ok();
  56. }
  57. /// <summary>
  58. /// 发布者-EF
  59. /// </summary>
  60. /// <returns></returns>
  61. [HttpPost]
  62. public IActionResult EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext, Person person)
  63. {
  64. using (dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
  65. {
  66. dbContext.Persons.Add(new Person() { Id = person.Id, Name = person.Name + "ef" });
  67. dbContext.SaveChanges();
  68. _capBus.Publish("sample.rabbitmq.sqlserver", new Person()
  69. {
  70. Id = 123,
  71. Name = "Bar"
  72. });
  73. }
  74. return Ok();
  75. }
  76. }

处理消息

  1. using CAP.Transport.RabbitMQ.SqlServer.Models;
  2. using DotNetCore.CAP;
  3. using DotNetCore.CAP.Messages;
  4. using Microsoft.AspNetCore.Mvc;
  5. namespace CAP.Transport.RabbitMQ.SqlServer.Controllers.MQ;
  6. /// <summary>
  7. /// 处理消息
  8. /// </summary>
  9. [ApiController]
  10. public class ConsumerController : ControllerBase
  11. {
  12. /// <summary>
  13. /// 订阅消费者-
  14. /// </summary>
  15. /// <param name="value"></param>
  16. [NonAction]
  17. [CapSubscribe("sample.rabbitmq.sqlserver")]
  18. public void Subscriber(Person p)
  19. {
  20. Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
  21. }
  22. /// <summary>
  23. /// 订阅消费者-EF
  24. /// </summary>
  25. /// <param name="p"></param>
  26. /// <param name="header"></param>
  27. [NonAction]
  28. [CapSubscribe("sample.rabbitmq.sqlserver", Group = "group.test2")]
  29. public void Subscriber2(Person p, [FromCap] CapHeader header)
  30. {
  31. var id = header[Headers.MessageId];
  32. Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
  33. }
  34. }

测试