CAP
CAP docs看到更多详细资料。
CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples

CAP 运输器

通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。

怎么选择运输器

🏳‍🌈 RabbitMQ Kafka Azure Service Bus In-Memory
定位 可靠消息传输 实时数据处理 内存型,测试
分布式
持久化
性能 Medium High Medium High

Apache Kafka®

Apache Kafka® 是一个开源流处理软件平台,由 LinkedIn 开发并捐赠给 Apache Software Foundation,用 Scala 和 Java 编写。
CAP 支持使用 Apache Kafka® 作为消息传输器。

Kafka Option

CAP 直接对外提供的 Kafka 配置参数如下:

NAME DESCRIPTION TYPE DEFAULT
Servers Broker 地址 string
ConnectionPoolSize 用户名 int 10
CustomHeaders 设置自定义头 Function

Kafka示例

NuGet

  1. Install-Package DotNetCore.CAP.Kafka
  2. Install-Package DotNetCore.CAP.Dashboard
  3. Install-Package DotNetCore.CAP.PostgreSql

注入CAP

  1. using CAP.Transport.Kafka.PostgreSql.Models;
  2. var builder = WebApplication.CreateBuilder(args);
  3. // Add services to the container.
  4. builder.Services.AddControllers();
  5. // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
  6. builder.Services.AddEndpointsApiExplorer();
  7. builder.Services.AddSwaggerGen();
  8. var appSetting = new AppSetting();
  9. builder.Configuration.GetSection("AppSetting").Bind(appSetting);
  10. //把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot<AppSetting> options
  11. builder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));
  12. builder.Services.AddCap(config =>
  13. {
  14. config.UsePostgreSql(opt => {
  15. opt.ConnectionString = appSetting.PostgreSqlSetting.Connection;
  16. }); //配置一个PostgreSql
  17. config.UseKafka(opt => {
  18. opt.Servers = appSetting.KafkaSetting.Servers;
  19. //KafkaOptions
  20. });//添加基于 Kafka 的配置项
  21. config.UseDashboard();//配置一个Dashboard
  22. });
  23. var app = builder.Build();
  24. // Configure the HTTP request pipeline.
  25. if (app.Environment.IsDevelopment())
  26. {
  27. app.UseSwagger();
  28. app.UseSwaggerUI();
  29. }
  30. app.UseHttpsRedirection();
  31. app.UseAuthorization();
  32. app.MapControllers();
  33. app.Run();

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "PostgreSqlSetting": {
  17. "Connection": "HOST=192.168.3.40;PORT=5432;DATABASE=test_db;USER ID=postgres;PASSWORD=123456;Pooling = false"
  18. },
  19. "RedisSetting": {
  20. "Connection": "192.168.3.40:6379,password=",
  21. "Database": "test"
  22. },
  23. "RabbitMQSetting": {
  24. "HostName": "localhost",
  25. "UserName": "admin",
  26. "Password": "admin",
  27. "Port": 5672
  28. }
  29. },
  30. "MongoDbSetting": {
  31. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  32. "Database": "test"
  33. },
  34. "KafkaSetting": {
  35. "Servers": "kafka1:9091,kafka2:9092,kafka3:9093"
  36. }
  37. }
  1. namespace CAP.Transport.Kafka.PostgreSql.Models;
  2. /// <summary>
  3. /// mongodb setting
  4. /// </summary>
  5. public class AppSetting
  6. {
  7. /// <summary>
  8. /// SqlServerSetting
  9. /// </summary>
  10. public SqlServerSetting SqlServerSetting { get; set; }
  11. /// <summary>
  12. /// MysqlSetting
  13. /// </summary>
  14. public MysqlSetting MysqlSetting { get; set; }
  15. /// <summary>
  16. /// PostgreSqlSetting
  17. /// </summary>
  18. public PostgreSqlSetting PostgreSqlSetting { get; set; }
  19. /// <summary>
  20. /// RedisSetting
  21. /// </summary>
  22. public RedisSetting RedisSetting { get; set; }
  23. /// <summary>
  24. /// RabbitMQSetting
  25. /// </summary>
  26. public RabbitMQSetting RabbitMQSetting { get; set; }
  27. /// <summary>
  28. /// MongoDbSetting
  29. /// </summary>
  30. public MongoDbSetting MongoDbSetting { get; set; }
  31. /// <summary>
  32. /// KafkaSetting
  33. /// </summary>
  34. public KafkaSetting KafkaSetting { get; set; }
  35. }
  36. public class RabbitMQSetting
  37. {
  38. /// <summary>
  39. /// HostName
  40. /// </summary>
  41. public string HostName { get; set; } = "localhost";
  42. /// <summary>
  43. /// Password
  44. /// </summary>
  45. public string Password { get; set; }
  46. /// <summary>
  47. /// Username
  48. /// </summary>
  49. public string UserName { get; set; }
  50. /// <summary>
  51. /// The port to connect on.
  52. /// </summary>
  53. public int Port { get; set; } = 5672;
  54. }
  55. public class RedisSetting
  56. {
  57. /// <summary>
  58. /// connection string
  59. /// </summary>
  60. public string Connection { get; set; }
  61. /// <summary>
  62. /// database name
  63. /// </summary>
  64. public string Database { get; set; }
  65. }
  66. public class MongoDbSetting
  67. {
  68. /// <summary>
  69. /// connection string
  70. /// </summary>
  71. public string Connection { get; set; }
  72. /// <summary>
  73. /// database name
  74. /// </summary>
  75. public string Database { get; set; }
  76. }
  77. public class SqlServerSetting
  78. {
  79. /// <summary>
  80. /// connection string
  81. /// </summary>
  82. public string Connection { get; set; }
  83. }
  84. public class MysqlSetting
  85. {
  86. /// <summary>
  87. /// connection string
  88. /// </summary>
  89. public string Connection { get; set; }
  90. }
  91. public class PostgreSqlSetting
  92. {
  93. /// <summary>
  94. /// connection string
  95. /// </summary>
  96. public string Connection { get; set; }
  97. }
  98. public class KafkaSetting
  99. {
  100. /// <summary>
  101. /// Servers
  102. /// </summary>
  103. public string Servers { get; set; }
  104. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.KafkaDemo.Controllers;
  3. /// <summary>
  4. /// 基础控制器
  5. /// </summary>
  6. [Route("api/[area]/[controller]/[action]")]
  7. [ApiController]
  8. public abstract class BaseController : ControllerBase
  9. {
  10. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Transport.KafkaDemo.Controllers.Kafka;
  3. /// <summary>
  4. /// 域控制器
  5. /// </summary>
  6. [Area("Order")]
  7. public abstract class AreaController : BaseController
  8. {
  9. }

发布消息

  1. using CAP.Transport.Kafka.PostgreSql.Models;
  2. using DotNetCore.CAP;
  3. using Microsoft.AspNetCore.Mvc;
  4. using Microsoft.Extensions.Options;
  5. using Npgsql;
  6. using System.Text.Json;
  7. namespace CAP.Transport.Kafka.PostgreSql.Controllers.Kafka;
  8. /// <summary>
  9. /// 发送消息
  10. /// </summary>
  11. public class PublishController : AreaController
  12. {
  13. private readonly ICapPublisher _capBus;
  14. private readonly string _connectionString;
  15. public PublishController(ICapPublisher capBus, IOptionsSnapshot<AppSetting> options)
  16. {
  17. _capBus = capBus;
  18. _connectionString = options == null ? options.Value.PostgreSqlSetting.Connection : "";
  19. }
  20. /// <summary>
  21. /// 发布者
  22. /// </summary>
  23. /// <returns></returns>
  24. [HttpGet]//表示控制器方法不是动作方法
  25. //[Route("~/without/transaction")]
  26. public async Task<IActionResult> WithoutTransaction()
  27. {
  28. Console.WriteLine("Publish send message: " + DateTime.Now);
  29. await _capBus.PublishAsync("sample.kafka.postgrsql", DateTime.Now);
  30. return Ok();
  31. }
  32. /// <summary>
  33. /// 发布者
  34. /// </summary>
  35. /// <returns></returns>
  36. [HttpPost]
  37. //[Route("~/adonet/transaction")]
  38. public IActionResult AdonetWithTransaction()
  39. {
  40. using (var connection = new NpgsqlConnection(_connectionString))
  41. {
  42. using var transaction = connection.BeginTransaction(_capBus, autoCommit: false);
  43. string sqlstr = "insert into test(cname) values('test')";
  44. using (NpgsqlCommand cmd = new(sqlstr, connection))
  45. {
  46. Console.WriteLine("即将执行SQL语句: " + sqlstr);
  47. int resut = cmd.ExecuteNonQuery();
  48. }
  49. _capBus.Publish("sample.kafka.postgrsql", DateTime.Now);
  50. transaction.Commit();
  51. }
  52. return Ok();
  53. }
  54. }

处理消息

  1. using DotNetCore.CAP;
  2. using Microsoft.AspNetCore.Mvc;
  3. namespace CAP.Transport.Kafka.PostgreSql.Controllers.Kafka;
  4. /// <summary>
  5. /// 处理消息
  6. /// </summary>
  7. [ApiController]
  8. public class ConsumerController : ControllerBase
  9. {
  10. /// <summary>
  11. /// 订阅消费者
  12. /// </summary>
  13. /// <param name="value"></param>
  14. [NonAction]//表示控制器方法不是动作方法
  15. [CapSubscribe("sample.kafka.postgrsql")]
  16. public void TestKafka(DateTime value)
  17. {
  18. Console.WriteLine("Subscriber output message: " + value);
  19. }
  20. }

测试

https://localhost:7285/api/Order/Publish/WithoutTransaction
1663651247201.png
1663651271349.png

https://localhost:7285/api/Order/Publish/AdonetWithTransaction
1663651341057.png
1663651322436.png