CAP
●CAP docs看到更多详细资料。
●CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples
CAP 运输器
通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。
怎么选择运输器
| 🏳🌈 | RabbitMQ | Kafka | Azure Service Bus | In-Memory |
|---|---|---|---|---|
| 定位 | 可靠消息传输 | 实时数据处理 | 云 | 内存型,测试 |
| 分布式 | ✔ | ✔ | ✔ | ❌ |
| 持久化 | ✔ | ✔ | ✔ | ❌ |
| 性能 | Medium | High | Medium | High |
Redis Streams
Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。
Redis Stream 是 Redis 5.0 引入的一种新数据类型,它用一种仅附加的数据结构以更抽象的方式模拟日志数据结构。
Redis Streams 可以在 CAP 中用作消息传输器。
Redis Streams Options
CAP 直接对外提供的 Redis Stream 配置参数如下:
| NAME | DESCRIPTION | TYPE | DEFAULT |
|---|---|---|---|
| Configuration | redis连接配置 (StackExchange.Redis) | ConfigurationOptions | ConfigurationOptions |
| StreamEntriesCount | 读取时从 stream 返回的条目数 | uint | 10 |
| ConnectionPoolSize | 连接池数 | uint | 10 |
Redis ConfigurationOptions
如果需要更多原生Redis相关配置选项,您可以在 Configuration 选项中进行设置 :
services.AddCap(capOptions =>{capOptions.UseRedis(redisOptions=>{// redis options.redisOptions.Configuration.EndPoints.Add(IPAddress.Loopback, 0);});});
Redis.SqlServer示例
- 已发布消息和接收的消息存储在SqlServer里
- 订阅方法的主题名称或交换路由器密钥存储在Redis
NuGet
Install-Package DotNetCore.CAP.DashboardInstall-Package DotNetCore.CAP.RedisStreamsInstall-Package DotNetCore.CAP.SqlServerInstall-Package Microsoft.VisualStudio.Azure.Containers.Tools.Targets
注入CAP
```csharp
//已发布消息和接收的消息存储在SqlServer里 //订阅方法的主题名称或交换路由器密钥存储在Redis using CAP.Redis.SqlServer.Models; using DotNetCore.CAP.Messages;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen();
var appSetting = new AppSetting(); builder.Configuration.GetSection(“AppSetting”).Bind(appSetting);
//把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot
builder.Services.AddCap(config => { config.UseRedis(appSetting.RedisSetting.Connection);
config.UseSqlServer(appSetting.SqlServerSetting.Connection);config.UseDashboard();//配置Dashboardconfig.FailedRetryCount = 5;config.FailedThresholdCallback = failed =>{var logger = failed.ServiceProvider.GetService<ILogger<Program>>();logger?.LogError($@"A message of type {failed.MessageType} failed after executing {config.FailedRetryCount} several times,requiring manual troubleshooting. Message name: {failed.Message.GetName()}");};
});
var app = builder.Build();
// Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); }
app.UseAuthorization();
app.MapControllers();
app.Run();
<a name="JCxZD"></a>### 公共部分```csharp{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","AppSetting": {"SqlServerSetting": {"Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"},"MysqlSetting": {"Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"},"RedisSetting": {"Connection": "192.168.3.40:6379,password=","Database": "test"},"RabbitMQSetting": {"HostName": "localhost","UserName": "admin","Password": "admin","Port": 5672},"MongoDbSetting": {"Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0"Database": "test"},"KafkaSetting": {"Servers": "kafka1:9091,kafka2:9092,kafka3:9093"}}}
namespace CAP.Redis.SqlServer.Models;/// <summary>/// mongodb setting/// </summary>public class AppSetting{/// <summary>/// SqlServerSetting/// </summary>public SqlServerSetting SqlServerSetting { get; set; }/// <summary>/// MysqlSetting/// </summary>public MysqlSetting MysqlSetting { get; set; }/// <summary>/// RedisSetting/// </summary>public RedisSetting RedisSetting { get; set; }/// <summary>/// RabbitMQSetting/// </summary>public RabbitMQSetting RabbitMQSetting { get; set; }/// <summary>/// MongoDbSetting/// </summary>public MongoDbSetting MongoDbSetting { get; set; }/// <summary>/// KafkaSetting/// </summary>public KafkaSetting KafkaSetting { get; set; }}public class RabbitMQSetting{/// <summary>/// HostName/// </summary>public string HostName { get; set; } = "localhost";/// <summary>/// Password/// </summary>public string Password { get; set; }/// <summary>/// Username/// </summary>public string UserName { get; set; }/// <summary>/// The port to connect on./// </summary>public int Port { get; set; } = 5672;}public class RedisSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }/// <summary>/// database name/// </summary>public string Database { get; set; }}public class MongoDbSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }/// <summary>/// database name/// </summary>public string Database { get; set; }}public class SqlServerSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class MysqlSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class KafkaSetting{/// <summary>/// Servers/// </summary>public string Servers { get; set; }}
namespace CAP.Redis.SqlServer.Models;public class Person{public string Name { get; set; }public int Age { get; set; }public override string ToString(){return "Name:" + Name + ", Age:" + Age;}}
using Microsoft.AspNetCore.Mvc;namespace CAP.Redis.SqlServer.Controllers;/// <summary>/// 基础控制器/// </summary>[Route("api/[area]/[controller]/[action]")][ApiController]public abstract class BaseController : ControllerBase{}
using Microsoft.AspNetCore.Mvc;namespace CAP.Redis.SqlServer.Controllers.Redis;/// <summary>/// 域控制器/// </summary>[Area("Redis")]public abstract class AreaController : BaseController{}
发布消息
using CAP.Redis.SqlServer.Models;using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;namespace CAP.Redis.SqlServer.Controllers.Redis;/// <summary>/// 发送消息/// </summary>public class PublishController : AreaController{private readonly ICapPublisher _capBus;private readonly ILogger<PublishController> _logger;public PublishController(ILogger<PublishController> logger, ICapPublisher capBus){_logger = logger;_capBus = capBus;}/// <summary>/// 发布者/// </summary>/// <param name="message"></param>/// <returns></returns>[HttpGet]public async Task Publish([FromQuery] string message = "test-message"){await _capBus.PublishAsync(message, new Person() { Age = 11, Name = "James" });}}
处理消息
using CAP.Redis.SqlServer.Models;using DotNetCore.CAP;using DotNetCore.CAP.Messages;using Microsoft.AspNetCore.Mvc;namespace CAP.Redis.SqlServer.Controllers.Redis;/// <summary>/// 处理消息/// </summary>[ApiController]public class ConsumerController : ControllerBase{private readonly ILogger<ConsumerController> _logger;public ConsumerController(ILogger<ConsumerController> logger){_logger = logger;}/// <summary>/// 订阅消费者-/// </summary>/// <param name="value"></param>[NonAction][CapSubscribe("test-message")][CapSubscribe("test-message-1")][CapSubscribe("test-message-2")][CapSubscribe("test-message-3")]public void Subscribe(Person p, [FromCap] CapHeader header){_logger.LogInformation($"{header[Headers.MessageName]} subscribed with value --> " + p);}}
测试


