CAP
CAP docs看到更多详细资料。
CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples

CAP 运输器

通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。

怎么选择运输器

🏳‍🌈 RabbitMQ Kafka Azure Service Bus In-Memory
定位 可靠消息传输 实时数据处理 内存型,测试
分布式
持久化
性能 Medium High Medium High

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而聚类和故障转移是构建在开源的通讯平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
CAP 支持使用 RabbitMQ 作为消息传输器。

RabbitMQ Options

CAP 直接对外提供的 RabbitMQ 配置参数如下:

NAME DESCRIPTION TYPE DEFAULT
HostName 宿主地址,配置集群可以使用逗号分隔,如 192.168.1.111,192.168.1.112 string localhost
UserName 用户名 string guest
Password 密码 string guest
VirtualHost 虚拟主机 string /
Port 端口号 int -1
ExchangeName CAP默认Exchange名称 string cap.default.topic
QueueArguments 创建队列额外参数 x-arguments QueueArgumentsOptions N/A
ConnectionFactoryOptions RabbitMQClient原生参数 ConnectionFactory N/A
CustomHeaders 订阅者自定义头信息 Func>> N/A

CustomHeaders Options

当需要从异构系统或者直接接收从RabbitMQ 控制台发送的消息时,由于 CAP 需要定义额外的头信息才能正常订阅,所以此时会出现异常。通过提供此参数来进行自定义头信息的设置来使订阅者正常工作。
你可以在这里找到有关 头信息 的说明。
用法如下:

  1. config.UseRabbitMQ(o =>
  2. {
  3. o.CustomHeaders = e => new List<KeyValuePair<string, string>>
  4. {
  5. new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  6. new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
  7. };
  8. });

连接 RabbitMQ 集群

使用逗号分隔连接字符串即可,如下:

  1. x=> x.UseRabbitMQ("localhost:5672,localhost:5673,localhost:5674")

MongoDB

MongoDB 是一个跨平台的面向文档型的数据库程序,它被归为 NOSQL 数据库,CAP 从 2.3 版本开始支持 MongoDB 作为消息存储。
MongoDB 从 4.0 版本开始支持 ACID 事务,所以 CAP 也只支持 4.0 以上的 MongoDB,并且 MongoDB 需要部署为集群,因为 MongoDB 的 ACID 事务需要集群才可以使用。
有关开发环境如何快速搭建 MongoDB 4.0+ 集群,你可以我的参考 这篇文章

UseMongoDB配置项

NAME DESCRIPTION TYPE DEFAULT
DatabaseName 数据库名称 string cap
DatabaseConnection 数据库连接字符串 string mongodb://localhost:27017
ReceivedCollection 接收消息集合名称 string cap.received
PublishedCollection 发送消息集合名称 string cap.published

RabbitMQ.MongoDB示例

NuGet

  1. Install-Package DotNetCore.CAP.RabbitMQ
  2. Install-Package DotNetCore.CAP.Dashboard
  3. Install-Package DotNetCore.CAP.MongoDB
  4. Install-Package Microsoft.Extensions.Configuration

注入CAP

  1. using CAP.Transport.RabbitMQ.MongoDB;
  2. using CAP.Transport.RabbitMQ.MongoDB.Models;
  3. using DotNetCore.CAP.Internal;
  4. using DotNetCore.CAP.Messages;
  5. using MongoDB.Driver;
  6. var builder = WebApplication.CreateBuilder(args);
  7. // Add services to the container.
  8. builder.Services.AddControllers();
  9. builder.Services.AddEndpointsApiExplorer();
  10. builder.Services.AddSwaggerGen();
  11. var appSetting = new AppSetting();
  12. builder.Configuration.GetSection("AppSetting").Bind(appSetting);
  13. //把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot<AppSetting> options
  14. builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));
  15. //MongoDB 的客户端接口
  16. builder.Services.AddSingleton<IMongoClient>(new MongoClient(appSetting.MongoDbSetting.Connection));
  17. builder.Services.AddCap(config =>
  18. {
  19. config.UseMongoDB(appSetting.MongoDbSetting.Connection);
  20. config.UseRabbitMQ(o =>
  21. {
  22. o.HostName = appSetting.RabbitMQSetting.HostName;
  23. o.UserName = appSetting.RabbitMQSetting.UserName;
  24. o.Password = appSetting.RabbitMQSetting.Password;
  25. o.Port = appSetting.RabbitMQSetting.Port;
  26. o.CustomHeaders = e => new List<KeyValuePair<string, string>>
  27. {
  28. new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
  29. new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
  30. };
  31. o.ConnectionFactoryOptions = opt => {
  32. //rabbitmq client ConnectionFactory config
  33. };
  34. });//配置RabbitMQ
  35. config.UseDashboard();//配置Dashboard
  36. config.FailedRetryCount = 5;
  37. config.FailedThresholdCallback = failed =>
  38. {
  39. var logger = failed.ServiceProvider.GetService<ILogger<Program>>();
  40. logger?.LogError($@"A message of type {failed.MessageType} failed after executing {config.FailedRetryCount} several times,requiring manual troubleshooting. Message name: {failed.Message.GetName()}");
  41. };
  42. });
  43. var app = builder.Build();
  44. // Configure the HTTP request pipeline.
  45. if (app.Environment.IsDevelopment())
  46. {
  47. app.UseSwagger();
  48. app.UseSwaggerUI();
  49. }
  50. app.UseHttpsRedirection();
  51. app.UseAuthorization();
  52. app.MapControllers();
  53. app.Run();

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "RedisSetting": {
  17. "Connection": "192.168.3.40:6379,password=",
  18. "Database": "test"
  19. },
  20. "RabbitMQSetting": {
  21. "HostName": "localhost",
  22. "UserName": "admin",
  23. "Password": "admin",
  24. "Port": 5672
  25. },
  26. "MongoDbSetting": {
  27. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  28. "Database": "test"
  29. }
  30. }
  31. }
  1. namespace CAP.Transport.RabbitMQ.MongoDB.Models;
  2. /// <summary>
  3. /// mongodb setting
  4. /// </summary>
  5. public class AppSetting
  6. {
  7. /// <summary>
  8. /// SqlServerSetting
  9. /// </summary>
  10. public SqlServerSetting SqlServerSetting { get; set; }
  11. /// <summary>
  12. /// MysqlSetting
  13. /// </summary>
  14. public MysqlSetting MysqlSetting { get; set; }
  15. /// <summary>
  16. /// RedisSetting
  17. /// </summary>
  18. public RedisSetting RedisSetting { get; set; }
  19. /// <summary>
  20. /// RabbitMQSetting
  21. /// </summary>
  22. public RabbitMQSetting RabbitMQSetting { get; set; }
  23. /// <summary>
  24. /// MongoDbSetting
  25. /// </summary>
  26. public MongoDbSetting MongoDbSetting { get; set; }
  27. /// <summary>
  28. /// KafkaSetting
  29. /// </summary>
  30. public KafkaSetting KafkaSetting { get; set; }
  31. }
  32. public class RabbitMQSetting
  33. {
  34. /// <summary>
  35. /// HostName
  36. /// </summary>
  37. public string HostName { get; set; } = "localhost";
  38. /// <summary>
  39. /// Password
  40. /// </summary>
  41. public string Password { get; set; }
  42. /// <summary>
  43. /// Username
  44. /// </summary>
  45. public string UserName { get; set; }
  46. /// <summary>
  47. /// The port to connect on.
  48. /// </summary>
  49. public int Port { get; set; } = 5672;
  50. }
  51. public class RedisSetting
  52. {
  53. /// <summary>
  54. /// connection string
  55. /// </summary>
  56. public string Connection { get; set; }
  57. /// <summary>
  58. /// database name
  59. /// </summary>
  60. public string Database { get; set; }
  61. }
  62. public class MongoDbSetting
  63. {
  64. /// <summary>
  65. /// connection string
  66. /// </summary>
  67. public string Connection { get; set; }
  68. /// <summary>
  69. /// database name
  70. /// </summary>
  71. public string Database { get; set; }
  72. }
  73. public class SqlServerSetting
  74. {
  75. /// <summary>
  76. /// connection string
  77. /// </summary>
  78. public string Connection { get; set; }
  79. }
  80. public class MysqlSetting
  81. {
  82. /// <summary>
  83. /// connection string
  84. /// </summary>
  85. public string Connection { get; set; }
  86. }
  87. public class KafkaSetting
  88. {
  89. /// <summary>
  90. /// Servers
  91. /// </summary>
  92. public string Servers { get; set; }
  93. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.RabbitMQ.MongoDB.Controllers;
  3. /// <summary>
  4. /// 基础控制器
  5. /// </summary>
  6. [Route("api/[area]/[controller]/[action]")]
  7. [ApiController]
  8. public abstract class BaseController : ControllerBase
  9. {
  10. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.RabbitMQ.MongoDB.Controllers.MongoDB;
  3. /// <summary>
  4. /// 域控制器
  5. /// </summary>
  6. [Area("MongoDB")]
  7. public abstract class AreaController : BaseController
  8. {
  9. }

发布消息

  1. using CAP.Transport.RabbitMQ.MongoDB.Models;
  2. using DotNetCore.CAP;
  3. using Microsoft.AspNetCore.Mvc;
  4. using Microsoft.Extensions.Options;
  5. using MongoDB.Bson;
  6. using MongoDB.Driver;
  7. using MongoDB.Driver.Core.Configuration;
  8. namespace CAP.Transport.RabbitMQ.MongoDB.Controllers.MongoDB;
  9. /// <summary>
  10. /// 发送消息
  11. /// </summary>
  12. public class PublishController : AreaController
  13. {
  14. private readonly IMongoClient _client;
  15. private readonly ICapPublisher _capBus;
  16. private readonly MongoDbSetting _mongoDbSetting;
  17. public PublishController(IMongoClient client, ICapPublisher capBus, IOptionsSnapshot<AppSetting> options)
  18. {
  19. _client = client;
  20. _capBus = capBus;
  21. _mongoDbSetting = options != null ? options.Value.MongoDbSetting:new MongoDbSetting();
  22. }
  23. /// <summary>
  24. /// 发布者-无事务
  25. /// </summary>
  26. /// <returns></returns>
  27. [HttpGet]
  28. public IActionResult WithoutTransaction()
  29. {
  30. _capBus.PublishAsync("sample.rabbitmq.mongodb", DateTime.Now);
  31. return Ok();
  32. }
  33. /// <summary>
  34. /// 发布者-发布不自动提交
  35. /// </summary>
  36. /// <returns></returns>
  37. [HttpGet]
  38. public IActionResult PublishNotAutoCommit()
  39. {
  40. //注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建
  41. var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
  42. mycollection.InsertOne(new BsonDocument { { "test", "test" } });
  43. using (var session = _client.StartTransaction(_capBus, autoCommit: false))
  44. {
  45. //var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
  46. //collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
  47. _capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
  48. session.CommitTransaction();
  49. }
  50. return Ok();
  51. }
  52. /// <summary>
  53. /// 发布者--
  54. /// </summary>
  55. /// <returns></returns>
  56. [HttpGet]
  57. public IActionResult PublishWithoutTrans()
  58. {
  59. //注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建
  60. var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
  61. mycollection.InsertOne(new BsonDocument { { "test", "test" } });
  62. using (var session = _client.StartTransaction(_capBus, autoCommit: true))
  63. {
  64. //var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
  65. //collection.InsertOne(session, new BsonDocument { { "hello", "world" } });
  66. _capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
  67. }
  68. return Ok();
  69. }
  70. }

处理消息

  1. using DotNetCore.CAP;
  2. using DotNetCore.CAP.Messages;
  3. using Microsoft.AspNetCore.Mvc;
  4. namespace CAP.Transport.RabbitMQ.MongoDB.Controllers.MongoDB;
  5. /// <summary>
  6. /// 处理消息
  7. /// </summary>
  8. [ApiController]
  9. public class ConsumerController : ControllerBase
  10. {
  11. /// <summary>
  12. /// 订阅消费者-
  13. /// </summary>
  14. /// <param name="value"></param>
  15. [NonAction]
  16. [CapSubscribe("sample.rabbitmq.mongodb")]
  17. public void ReceiveMessage(DateTime time)
  18. {
  19. Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
  20. }
  21. }

测试

测试报错没信息:”Standalone servers do not support transactions.”