CAP


CAP docs看到更多详细资料。
CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples

CAP 运输器

通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。

怎么选择运输器

🏳‍🌈 RabbitMQ Kafka Azure Service Bus In-Memory
定位 可靠消息传输 实时数据处理 内存型,测试
分布式
持久化
性能 Medium High Medium High

Redis Streams

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。
Redis Stream 是 Redis 5.0 引入的一种新数据类型,它用一种仅附加的数据结构以更抽象的方式模拟日志数据结构。
Redis Streams 可以在 CAP 中用作消息传输器。

Redis Streams Options

CAP 直接对外提供的 Redis Stream 配置参数如下:

NAME DESCRIPTION TYPE DEFAULT
Configuration redis连接配置 (StackExchange.Redis) ConfigurationOptions ConfigurationOptions
StreamEntriesCount 读取时从 stream 返回的条目数 uint 10
ConnectionPoolSize 连接池数 uint 10

Redis ConfigurationOptions

如果需要更多原生Redis相关配置选项,您可以在 Configuration 选项中进行设置 :

  1. services.AddCap(capOptions =>
  2. {
  3. capOptions.UseRedis(redisOptions=>
  4. {
  5. // redis options.
  6. redisOptions.Configuration.EndPoints.Add(IPAddress.Loopback, 0);
  7. });
  8. });

Redis.SqlServer示例

  • 已发布消息和接收的消息存储在SqlServer里
  • 订阅方法的主题名称或交换路由器密钥存储在Redis

    NuGet

    1. Install-Package DotNetCore.CAP.Dashboard
    2. Install-Package DotNetCore.CAP.RedisStreams
    3. Install-Package DotNetCore.CAP.SqlServer
    4. Install-Package Microsoft.VisualStudio.Azure.Containers.Tools.Targets

    注入CAP

    ```csharp

//已发布消息和接收的消息存储在SqlServer里 //订阅方法的主题名称或交换路由器密钥存储在Redis using CAP.Redis.SqlServer.Models; using DotNetCore.CAP.Messages;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen();

var appSetting = new AppSetting(); builder.Configuration.GetSection(“AppSetting”).Bind(appSetting);

//把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot options builder.Services.Configure(builder.Configuration.GetSection(“AppSetting”));

builder.Services.AddCap(config => { config.UseRedis(appSetting.RedisSetting.Connection);

  1. config.UseSqlServer(appSetting.SqlServerSetting.Connection);
  2. config.UseDashboard();//配置Dashboard
  3. config.FailedRetryCount = 5;
  4. config.FailedThresholdCallback = failed =>
  5. {
  6. var logger = failed.ServiceProvider.GetService<ILogger<Program>>();
  7. logger?.LogError($@"A message of type {failed.MessageType} failed after executing {config.FailedRetryCount} several times,requiring manual troubleshooting. Message name: {failed.Message.GetName()}");
  8. };

});

var app = builder.Build();

// Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); }

app.UseAuthorization();

app.MapControllers();

app.Run();

  1. <a name="JCxZD"></a>
  2. ### 公共部分
  3. ```csharp
  4. {
  5. "Logging": {
  6. "LogLevel": {
  7. "Default": "Information",
  8. "Microsoft.AspNetCore": "Warning"
  9. }
  10. },
  11. "AllowedHosts": "*",
  12. "AppSetting": {
  13. "SqlServerSetting": {
  14. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  15. },
  16. "MysqlSetting": {
  17. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  18. },
  19. "RedisSetting": {
  20. "Connection": "192.168.3.40:6379,password=",
  21. "Database": "test"
  22. },
  23. "RabbitMQSetting": {
  24. "HostName": "localhost",
  25. "UserName": "admin",
  26. "Password": "admin",
  27. "Port": 5672
  28. },
  29. "MongoDbSetting": {
  30. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  31. "Database": "test"
  32. },
  33. "KafkaSetting": {
  34. "Servers": "kafka1:9091,kafka2:9092,kafka3:9093"
  35. }
  36. }
  37. }
  1. namespace CAP.Redis.SqlServer.Models;
  2. /// <summary>
  3. /// mongodb setting
  4. /// </summary>
  5. public class AppSetting
  6. {
  7. /// <summary>
  8. /// SqlServerSetting
  9. /// </summary>
  10. public SqlServerSetting SqlServerSetting { get; set; }
  11. /// <summary>
  12. /// MysqlSetting
  13. /// </summary>
  14. public MysqlSetting MysqlSetting { get; set; }
  15. /// <summary>
  16. /// RedisSetting
  17. /// </summary>
  18. public RedisSetting RedisSetting { get; set; }
  19. /// <summary>
  20. /// RabbitMQSetting
  21. /// </summary>
  22. public RabbitMQSetting RabbitMQSetting { get; set; }
  23. /// <summary>
  24. /// MongoDbSetting
  25. /// </summary>
  26. public MongoDbSetting MongoDbSetting { get; set; }
  27. /// <summary>
  28. /// KafkaSetting
  29. /// </summary>
  30. public KafkaSetting KafkaSetting { get; set; }
  31. }
  32. public class RabbitMQSetting
  33. {
  34. /// <summary>
  35. /// HostName
  36. /// </summary>
  37. public string HostName { get; set; } = "localhost";
  38. /// <summary>
  39. /// Password
  40. /// </summary>
  41. public string Password { get; set; }
  42. /// <summary>
  43. /// Username
  44. /// </summary>
  45. public string UserName { get; set; }
  46. /// <summary>
  47. /// The port to connect on.
  48. /// </summary>
  49. public int Port { get; set; } = 5672;
  50. }
  51. public class RedisSetting
  52. {
  53. /// <summary>
  54. /// connection string
  55. /// </summary>
  56. public string Connection { get; set; }
  57. /// <summary>
  58. /// database name
  59. /// </summary>
  60. public string Database { get; set; }
  61. }
  62. public class MongoDbSetting
  63. {
  64. /// <summary>
  65. /// connection string
  66. /// </summary>
  67. public string Connection { get; set; }
  68. /// <summary>
  69. /// database name
  70. /// </summary>
  71. public string Database { get; set; }
  72. }
  73. public class SqlServerSetting
  74. {
  75. /// <summary>
  76. /// connection string
  77. /// </summary>
  78. public string Connection { get; set; }
  79. }
  80. public class MysqlSetting
  81. {
  82. /// <summary>
  83. /// connection string
  84. /// </summary>
  85. public string Connection { get; set; }
  86. }
  87. public class KafkaSetting
  88. {
  89. /// <summary>
  90. /// Servers
  91. /// </summary>
  92. public string Servers { get; set; }
  93. }
  1. namespace CAP.Redis.SqlServer.Models;
  2. public class Person
  3. {
  4. public string Name { get; set; }
  5. public int Age { get; set; }
  6. public override string ToString()
  7. {
  8. return "Name:" + Name + ", Age:" + Age;
  9. }
  10. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Redis.SqlServer.Controllers;
  3. /// <summary>
  4. /// 基础控制器
  5. /// </summary>
  6. [Route("api/[area]/[controller]/[action]")]
  7. [ApiController]
  8. public abstract class BaseController : ControllerBase
  9. {
  10. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Redis.SqlServer.Controllers.Redis;
  3. /// <summary>
  4. /// 域控制器
  5. /// </summary>
  6. [Area("Redis")]
  7. public abstract class AreaController : BaseController
  8. {
  9. }

发布消息

  1. using CAP.Redis.SqlServer.Models;
  2. using DotNetCore.CAP;
  3. using Microsoft.AspNetCore.Mvc;
  4. namespace CAP.Redis.SqlServer.Controllers.Redis;
  5. /// <summary>
  6. /// 发送消息
  7. /// </summary>
  8. public class PublishController : AreaController
  9. {
  10. private readonly ICapPublisher _capBus;
  11. private readonly ILogger<PublishController> _logger;
  12. public PublishController(ILogger<PublishController> logger, ICapPublisher capBus)
  13. {
  14. _logger = logger;
  15. _capBus = capBus;
  16. }
  17. /// <summary>
  18. /// 发布者
  19. /// </summary>
  20. /// <param name="message"></param>
  21. /// <returns></returns>
  22. [HttpGet]
  23. public async Task Publish([FromQuery] string message = "test-message")
  24. {
  25. await _capBus.PublishAsync(message, new Person() { Age = 11, Name = "James" });
  26. }
  27. }

处理消息

  1. using CAP.Redis.SqlServer.Models;
  2. using DotNetCore.CAP;
  3. using DotNetCore.CAP.Messages;
  4. using Microsoft.AspNetCore.Mvc;
  5. namespace CAP.Redis.SqlServer.Controllers.Redis;
  6. /// <summary>
  7. /// 处理消息
  8. /// </summary>
  9. [ApiController]
  10. public class ConsumerController : ControllerBase
  11. {
  12. private readonly ILogger<ConsumerController> _logger;
  13. public ConsumerController(ILogger<ConsumerController> logger)
  14. {
  15. _logger = logger;
  16. }
  17. /// <summary>
  18. /// 订阅消费者-
  19. /// </summary>
  20. /// <param name="value"></param>
  21. [NonAction]
  22. [CapSubscribe("test-message")]
  23. [CapSubscribe("test-message-1")]
  24. [CapSubscribe("test-message-2")]
  25. [CapSubscribe("test-message-3")]
  26. public void Subscribe(Person p, [FromCap] CapHeader header)
  27. {
  28. _logger.LogInformation($"{header[Headers.MessageName]} subscribed with value --> " + p);
  29. }
  30. }

测试

1663729555932.png
1663729594344.png