可用的Kafka .NET客户端
基于Confluent.Kafka的Sample
要完成本文示例,首先得有一个启动好的Kafka Broker服务。
部署Kafka环境
创建项目
创建一个My.Kafka.Core.Demo控制台项目,安装Confluent.Kafka组件
Install-Package Confluent.KafkaInstall-Package System.Text.Json
编写KafkaService
namespace My.Kafka.Core.Demo;public interface IKafkaService{/// <summary>/// 发布/// </summary>/// <typeparam name="T"></typeparam>/// <param name="topicName"></param>/// <param name="message"></param>/// <returns></returns>Task PublishAsync<T>(string topicName, T message) where T : class;/// <summary>/// 订阅/// </summary>/// <typeparam name="T"></typeparam>/// <param name="topics"></param>/// <param name="messageFunc"></param>/// <param name="cancellationToken"></param>/// <returns></returns>Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;}
using Confluent.Kafka;using System.Text.Json;namespace My.Kafka.Core.Demo;public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig{BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20 // 修改等待时间为20ms};using var producer = new ProducerBuilder<string, string>(config).Build();await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonSerializer.Serialize(message)});}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{//messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);if(consumeResult!=null&& consumeResult.Message!=null)messageResult = JsonSerializer.Deserialize<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}}
namespace My.Kafka.Core.Demo;public class EventData{public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; }}
编写Producer
新建一个Console项目,暂且命名为:My.Kafka.Demo.Producer,其主体内容如下:
using My.Kafka.Core.Demo;KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync<EventData>(eventData.TopicName, eventData);}Console.WriteLine("发布结束!");Console.ReadKey();
编写Consumer
新建一个Console项目,暂且命名为:My.Kafka.Demo.Consumer,其主体内容如下:
using My.Kafka.Core.Demo;KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) =>{Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");});
测试Pub/Sub效果
将Producer和Consumer两个项目都启动起来(ctrl+F5),可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。
基于CAP项目的Sample
模拟场景说明
假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。
Catalog.WebAPI
新建一个名称为Catalog.WebAPI的ASP.NET Core WebAPI项目,然后分别安装以下组件:
Install-Package DotNetCore.CAPInstall-Package DotNetCore.CAP.MongoDBInstall-Package DotNetCore.CAP.KafkaInstall-Package AutoMapper
注入CAP
//注入CAPbuilder.Services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});
namespace Catalog.WebAPI;public class Product{public string Id { get; set; }public string Name { get; set; }public decimal Price { get; set; }public int Stock { get; set; }public string Introduction { get; set; }}
namespace Catalog.WebAPI;public class ProductDTO{public string Id { get; set; }public string Name { get; set; }public decimal Price { get; set; }public string Introduction { get; set; }}
创建ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka。
using AutoMapper;using Catalog.WebAPI;using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;using static Confluent.Kafka.ConfigPropertyNames;var builder = WebApplication.CreateBuilder(args);// Add services to the container.// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbucklebuilder.Services.AddEndpointsApiExplorer();//注入CAPbuilder.Services.AddCap(x =>{//x.UseMongoDB("mongodb://account:password@mongodb-server:2717/products?authSource=admin");x.UseMongoDB("mongodb://localhost:2717/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});builder.Services.AddSwaggerGen();var app = builder.Build();// Configure the HTTP request pipeline.if (app.Environment.IsDevelopment()){app.UseSwagger();app.UseSwaggerUI();}app.UseHttpsRedirection();IList<Product> GetProductList(){IList<Product> products = new List<Product>{new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M, Introduction = "暂无介绍" },new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M, Introduction = "暂无介绍" },new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M, Introduction = "暂无介绍" },new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M, Introduction = "暂无介绍" },new Product { Id = "0005", Name = "电视机A", Price = 1899.90M, Introduction = "暂无介绍" },};return products;}app.MapGet("/Get", () =>{//return _mapper.Map<IList<ProductDTO>>(GetProductList());return GetProductList();});app.MapPut("/UpdatePrice", async (ICapPublisher _publisher,string id, decimal newPrice) =>{// 业务代码var product = GetProductList().FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 发布消息await _publisher.PublishAsync("ProductPriceChanged",new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price });});app.Run();internal record WeatherForecast(DateTime Date, int TemperatureC, string? Summary){public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);}
Basket.WebAPI
参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件。
Install-Package DotNetCore.CAPInstall-Package DotNetCore.CAP.MongoDBInstall-Package DotNetCore.CAP.KafkaInstall-Package AutoMapper
注入CAP
//注入CAPbuilder.Services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});
新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。
using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;namespace Basket.WebAPIC;[Route("[controller]")][ApiController]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 },}},new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}}
namespace Basket.WebAPI.Models;public class Catalog{public ProductDTO Product { get; set; }public int Count { get; set; }}
namespace Basket.WebAPI.Models;public class MyBasketDTO{public string UserId { get; set; }public IList<Catalog> Catalogs { get; set; }}
namespace Basket.WebAPI.Models;public class Product{public string Id { get; set; }public string Name { get; set; }public decimal Price { get; set; }public int Stock { get; set; }public string Introduction { get; set; }}
namespace Basket.WebAPI.Models;public class ProductDTO{public string Id { get; set; }public string Name { get; set; }public decimal Price { get; set; }public string Introduction { get; set; }}
测试效果
同时启动Catalog API 和 Basket API两个项目。
首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。
然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。

最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。
