CAP
●CAP docs看到更多详细资料。
●CAP 视频教程,学习如何在项目中集成CAP。
●GitHub源码:https://github.com/dotnetcore/cap
●示例代码:https://github.com/dotnetcore/CAP/tree/master/samples
CAP 运输器
通过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。
怎么选择运输器
| 🏳🌈 | RabbitMQ | Kafka | Azure Service Bus | In-Memory |
|---|---|---|---|---|
| 定位 | 可靠消息传输 | 实时数据处理 | 云 | 内存型,测试 |
| 分布式 | ✔ | ✔ | ✔ | ❌ |
| 持久化 | ✔ | ✔ | ✔ | ❌ |
| 性能 | Medium | High | Medium | High |
Apache Kafka®
Apache Kafka® 是一个开源流处理软件平台,由 LinkedIn 开发并捐赠给 Apache Software Foundation,用 Scala 和 Java 编写。
CAP 支持使用 Apache Kafka® 作为消息传输器。
Kafka Option
CAP 直接对外提供的 Kafka 配置参数如下:
| NAME | DESCRIPTION | TYPE | DEFAULT |
|---|---|---|---|
| Servers | Broker 地址 | string | |
| ConnectionPoolSize | 用户名 | int | 10 |
| CustomHeaders | 设置自定义头 | Function |
Kafka示例
NuGet
Install-Package DotNetCore.CAP.KafkaInstall-Package DotNetCore.CAP.DashboardInstall-Package DotNetCore.CAP.PostgreSql
注入CAP
using CAP.Transport.Kafka.PostgreSql.Models;var builder = WebApplication.CreateBuilder(args);// Add services to the container.builder.Services.AddControllers();// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbucklebuilder.Services.AddEndpointsApiExplorer();builder.Services.AddSwaggerGen();var appSetting = new AppSetting();builder.Configuration.GetSection("AppSetting").Bind(appSetting);//把AppSetting实体注入到容器,方便在构造函数里使用IOptionsSnapshot<AppSetting> optionsbuilder.Services.Configure<AppSetting>(builder.Configuration.GetSection("AppSetting"));builder.Services.AddCap(config =>{config.UsePostgreSql(opt => {opt.ConnectionString = appSetting.PostgreSqlSetting.Connection;}); //配置一个PostgreSqlconfig.UseKafka(opt => {opt.Servers = appSetting.KafkaSetting.Servers;//KafkaOptions});//添加基于 Kafka 的配置项config.UseDashboard();//配置一个Dashboard});var app = builder.Build();// Configure the HTTP request pipeline.if (app.Environment.IsDevelopment()){app.UseSwagger();app.UseSwaggerUI();}app.UseHttpsRedirection();app.UseAuthorization();app.MapControllers();app.Run();
公共部分
{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","AppSetting": {"SqlServerSetting": {"Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"},"MysqlSetting": {"Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"},"PostgreSqlSetting": {"Connection": "HOST=192.168.3.40;PORT=5432;DATABASE=test_db;USER ID=postgres;PASSWORD=123456;Pooling = false"},"RedisSetting": {"Connection": "192.168.3.40:6379,password=","Database": "test"},"RabbitMQSetting": {"HostName": "localhost","UserName": "admin","Password": "admin","Port": 5672}},"MongoDbSetting": {"Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0"Database": "test"},"KafkaSetting": {"Servers": "kafka1:9091,kafka2:9092,kafka3:9093"}}
namespace CAP.Transport.Kafka.PostgreSql.Models;/// <summary>/// mongodb setting/// </summary>public class AppSetting{/// <summary>/// SqlServerSetting/// </summary>public SqlServerSetting SqlServerSetting { get; set; }/// <summary>/// MysqlSetting/// </summary>public MysqlSetting MysqlSetting { get; set; }/// <summary>/// PostgreSqlSetting/// </summary>public PostgreSqlSetting PostgreSqlSetting { get; set; }/// <summary>/// RedisSetting/// </summary>public RedisSetting RedisSetting { get; set; }/// <summary>/// RabbitMQSetting/// </summary>public RabbitMQSetting RabbitMQSetting { get; set; }/// <summary>/// MongoDbSetting/// </summary>public MongoDbSetting MongoDbSetting { get; set; }/// <summary>/// KafkaSetting/// </summary>public KafkaSetting KafkaSetting { get; set; }}public class RabbitMQSetting{/// <summary>/// HostName/// </summary>public string HostName { get; set; } = "localhost";/// <summary>/// Password/// </summary>public string Password { get; set; }/// <summary>/// Username/// </summary>public string UserName { get; set; }/// <summary>/// The port to connect on./// </summary>public int Port { get; set; } = 5672;}public class RedisSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }/// <summary>/// database name/// </summary>public string Database { get; set; }}public class MongoDbSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }/// <summary>/// database name/// </summary>public string Database { get; set; }}public class SqlServerSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class MysqlSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class PostgreSqlSetting{/// <summary>/// connection string/// </summary>public string Connection { get; set; }}public class KafkaSetting{/// <summary>/// Servers/// </summary>public string Servers { get; set; }}
using Microsoft.AspNetCore.Mvc;namespace CAP.Transport.KafkaDemo.Controllers;/// <summary>/// 基础控制器/// </summary>[Route("api/[area]/[controller]/[action]")][ApiController]public abstract class BaseController : ControllerBase{}
using Microsoft.AspNetCore.Mvc;namespace CAP.Transport.KafkaDemo.Controllers.Kafka;/// <summary>/// 域控制器/// </summary>[Area("Order")]public abstract class AreaController : BaseController{}
发布消息
using CAP.Transport.Kafka.PostgreSql.Models;using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;using Microsoft.Extensions.Options;using Npgsql;using System.Text.Json;namespace CAP.Transport.Kafka.PostgreSql.Controllers.Kafka;/// <summary>/// 发送消息/// </summary>public class PublishController : AreaController{private readonly ICapPublisher _capBus;private readonly string _connectionString;public PublishController(ICapPublisher capBus, IOptionsSnapshot<AppSetting> options){_capBus = capBus;_connectionString = options == null ? options.Value.PostgreSqlSetting.Connection : "";}/// <summary>/// 发布者/// </summary>/// <returns></returns>[HttpGet]//表示控制器方法不是动作方法//[Route("~/without/transaction")]public async Task<IActionResult> WithoutTransaction(){Console.WriteLine("Publish send message: " + DateTime.Now);await _capBus.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();}/// <summary>/// 发布者/// </summary>/// <returns></returns>[HttpPost]//[Route("~/adonet/transaction")]public IActionResult AdonetWithTransaction(){using (var connection = new NpgsqlConnection(_connectionString)){using var transaction = connection.BeginTransaction(_capBus, autoCommit: false);string sqlstr = "insert into test(cname) values('test')";using (NpgsqlCommand cmd = new(sqlstr, connection)){Console.WriteLine("即将执行SQL语句: " + sqlstr);int resut = cmd.ExecuteNonQuery();}_capBus.Publish("sample.kafka.postgrsql", DateTime.Now);transaction.Commit();}return Ok();}}
处理消息
using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;namespace CAP.Transport.Kafka.PostgreSql.Controllers.Kafka;/// <summary>/// 处理消息/// </summary>[ApiController]public class ConsumerController : ControllerBase{/// <summary>/// 订阅消费者/// </summary>/// <param name="value"></param>[NonAction]//表示控制器方法不是动作方法[CapSubscribe("sample.kafka.postgrsql")]public void TestKafka(DateTime value){Console.WriteLine("Subscriber output message: " + value);}}
测试
https://localhost:7285/api/Order/Publish/WithoutTransaction

https://localhost:7285/api/Order/Publish/AdonetWithTransaction

