可用的Kafka .NET客户端

confluent-kafka-dotnet

基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。

部署Kafka环境

参考Kafka学习二-基于Docker搭Kafka

创建项目

创建一个My.Kafka.Core.Demo控制台项目,安装Confluent.Kafka组件

  1. Install-Package Confluent.Kafka
  2. Install-Package System.Text.Json

编写KafkaService

  1. namespace My.Kafka.Core.Demo;
  2. public interface IKafkaService
  3. {
  4. /// <summary>
  5. /// 发布
  6. /// </summary>
  7. /// <typeparam name="T"></typeparam>
  8. /// <param name="topicName"></param>
  9. /// <param name="message"></param>
  10. /// <returns></returns>
  11. Task PublishAsync<T>(string topicName, T message) where T : class;
  12. /// <summary>
  13. /// 订阅
  14. /// </summary>
  15. /// <typeparam name="T"></typeparam>
  16. /// <param name="topics"></param>
  17. /// <param name="messageFunc"></param>
  18. /// <param name="cancellationToken"></param>
  19. /// <returns></returns>
  20. Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;
  21. }
  1. using Confluent.Kafka;
  2. using System.Text.Json;
  3. namespace My.Kafka.Core.Demo;
  4. public class KafkaService : IKafkaService
  5. {
  6. public static string KAFKA_SERVERS = "127.0.0.1:9091";
  7. public async Task PublishAsync<T>(string topicName, T message) where T : class
  8. {
  9. var config = new ProducerConfig
  10. {
  11. BootstrapServers = KAFKA_SERVERS,
  12. BatchSize = 16384, // 修改批次大小为16K
  13. LingerMs = 20 // 修改等待时间为20ms
  14. };
  15. using var producer = new ProducerBuilder<string, string>(config).Build();
  16. await producer.ProduceAsync(topicName, new Message<string, string>
  17. {
  18. Key = Guid.NewGuid().ToString(),
  19. Value = JsonSerializer.Serialize(message)
  20. });
  21. }
  22. public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
  23. {
  24. var config = new ConsumerConfig
  25. {
  26. BootstrapServers = KAFKA_SERVERS,
  27. GroupId = "Consumer",
  28. EnableAutoCommit = false, // 禁止AutoCommit
  29. Acks = Acks.Leader, // 假设只需要Leader响应即可
  30. AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
  31. };
  32. using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  33. {
  34. consumer.Subscribe(topics);
  35. try
  36. {
  37. while (true)
  38. {
  39. try
  40. {
  41. var consumeResult = consumer.Consume(cancellationToken);
  42. Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
  43. if (consumeResult.IsPartitionEOF)
  44. {
  45. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  46. continue;
  47. }
  48. T messageResult = null;
  49. try
  50. {
  51. //messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);
  52. if(consumeResult!=null&& consumeResult.Message!=null)
  53. messageResult = JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
  54. }
  55. catch (Exception ex)
  56. {
  57. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  58. Console.WriteLine(errorMessage);
  59. messageResult = null;
  60. }
  61. if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
  62. {
  63. messageFunc(messageResult);
  64. try
  65. {
  66. consumer.Commit(consumeResult);
  67. }
  68. catch (KafkaException e)
  69. {
  70. Console.WriteLine(e.Message);
  71. }
  72. }
  73. }
  74. catch (ConsumeException e)
  75. {
  76. Console.WriteLine($"Consume error: {e.Error.Reason}");
  77. }
  78. }
  79. }
  80. catch (OperationCanceledException)
  81. {
  82. Console.WriteLine("Closing consumer.");
  83. consumer.Close();
  84. }
  85. }
  86. await Task.CompletedTask;
  87. }
  88. }
  1. namespace My.Kafka.Core.Demo;
  2. public class EventData
  3. {
  4. public string TopicName { get; set; }
  5. public string Message { get; set; }
  6. public DateTime EventTime { get; set; }
  7. }

编写Producer

新建一个Console项目,暂且命名为:My.Kafka.Demo.Producer,其主体内容如下:

  1. using My.Kafka.Core.Demo;
  2. KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";
  3. var kafkaService = new KafkaService();
  4. for (int i = 0; i < 50; i++)
  5. {
  6. var eventData = new EventData
  7. {
  8. TopicName = "testtopic",
  9. Message = $"This is a message from Producer, Index : {i + 1}",
  10. EventTime = DateTime.Now
  11. };
  12. await kafkaService.PublishAsync<EventData>(eventData.TopicName, eventData);
  13. }
  14. Console.WriteLine("发布结束!");
  15. Console.ReadKey();

编写Consumer

新建一个Console项目,暂且命名为:My.Kafka.Demo.Consumer,其主体内容如下:

  1. using My.Kafka.Core.Demo;
  2. KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";
  3. var kafkaService = new KafkaService();
  4. var topics = new List<string> { "testtopic" };
  5. await kafkaService.SubscribeAsync<EventData>(topics, (eventData) =>
  6. {
  7. Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");
  8. });

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来(ctrl+F5),可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。
1663214475378.png

基于CAP项目的Sample

模拟场景说明

假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。
Kafkaafka学习三-.NET Core示例 - 图2

Catalog.WebAPI

新建一个名称为Catalog.WebAPI的ASP.NET Core WebAPI项目,然后分别安装以下组件:

  1. Install-Package DotNetCore.CAP
  2. Install-Package DotNetCore.CAP.MongoDB
  3. Install-Package DotNetCore.CAP.Kafka
  4. Install-Package AutoMapper

注入CAP

  1. //注入CAP
  2. builder.Services.AddCap(x =>
  3. {
  4. x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");
  5. x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");
  6. });
  1. namespace Catalog.WebAPI;
  2. public class Product
  3. {
  4. public string Id { get; set; }
  5. public string Name { get; set; }
  6. public decimal Price { get; set; }
  7. public int Stock { get; set; }
  8. public string Introduction { get; set; }
  9. }
  1. namespace Catalog.WebAPI;
  2. public class ProductDTO
  3. {
  4. public string Id { get; set; }
  5. public string Name { get; set; }
  6. public decimal Price { get; set; }
  7. public string Introduction { get; set; }
  8. }

创建ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka。

  1. using AutoMapper;
  2. using Catalog.WebAPI;
  3. using DotNetCore.CAP;
  4. using Microsoft.AspNetCore.Mvc;
  5. using static Confluent.Kafka.ConfigPropertyNames;
  6. var builder = WebApplication.CreateBuilder(args);
  7. // Add services to the container.
  8. // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
  9. builder.Services.AddEndpointsApiExplorer();
  10. //注入CAP
  11. builder.Services.AddCap(x =>
  12. {
  13. //x.UseMongoDB("mongodb://account:password@mongodb-server:2717/products?authSource=admin");
  14. x.UseMongoDB("mongodb://localhost:2717/products?authSource=admin");
  15. x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");
  16. });
  17. builder.Services.AddSwaggerGen();
  18. var app = builder.Build();
  19. // Configure the HTTP request pipeline.
  20. if (app.Environment.IsDevelopment())
  21. {
  22. app.UseSwagger();
  23. app.UseSwaggerUI();
  24. }
  25. app.UseHttpsRedirection();
  26. IList<Product> GetProductList()
  27. {
  28. IList<Product> products = new List<Product>
  29. {
  30. new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M, Introduction = "暂无介绍" },
  31. new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M, Introduction = "暂无介绍" },
  32. new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M, Introduction = "暂无介绍" },
  33. new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M, Introduction = "暂无介绍" },
  34. new Product { Id = "0005", Name = "电视机A", Price = 1899.90M, Introduction = "暂无介绍" },
  35. };
  36. return products;
  37. }
  38. app.MapGet("/Get", () =>
  39. {
  40. //return _mapper.Map<IList<ProductDTO>>(GetProductList());
  41. return GetProductList();
  42. });
  43. app.MapPut("/UpdatePrice", async (ICapPublisher _publisher,string id, decimal newPrice) =>
  44. {
  45. // 业务代码
  46. var product = GetProductList().FirstOrDefault(p => p.Id == id);
  47. product.Price = newPrice;
  48. // 发布消息
  49. await _publisher.PublishAsync("ProductPriceChanged",
  50. new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price });
  51. });
  52. app.Run();
  53. internal record WeatherForecast(DateTime Date, int TemperatureC, string? Summary)
  54. {
  55. public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);
  56. }

Basket.WebAPI

参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件。

  1. Install-Package DotNetCore.CAP
  2. Install-Package DotNetCore.CAP.MongoDB
  3. Install-Package DotNetCore.CAP.Kafka
  4. Install-Package AutoMapper

注入CAP

  1. //注入CAP
  2. builder.Services.AddCap(x =>
  3. {
  4. x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");
  5. x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");
  6. });

新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。

  1. using DotNetCore.CAP;
  2. using Microsoft.AspNetCore.Mvc;
  3. namespace Basket.WebAPIC;
  4. [Route("[controller]")]
  5. [ApiController]
  6. public class BasketController : ControllerBase
  7. {
  8. private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>
  9. {
  10. new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>
  11. {
  12. new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 },
  13. new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 },
  14. }
  15. },
  16. new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>
  17. {
  18. new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 },
  19. new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 },
  20. }
  21. }
  22. };
  23. [HttpGet]
  24. public IList<MyBasketDTO> Get()
  25. {
  26. return Baskets;
  27. }
  28. [NonAction]
  29. [CapSubscribe("ProductPriceChanged")]
  30. public async Task RefreshBasketProductPrice(ProductDTO productDTO)
  31. {
  32. if (productDTO == null)
  33. return;
  34. foreach (var basket in Baskets)
  35. {
  36. foreach (var catalog in basket.Catalogs)
  37. {
  38. if (catalog.Product.Id == productDTO.Id)
  39. {
  40. catalog.Product.Price = productDTO.Price;
  41. break;
  42. }
  43. }
  44. }
  45. await Task.CompletedTask;
  46. }
  47. }
  1. namespace Basket.WebAPI.Models;
  2. public class Catalog
  3. {
  4. public ProductDTO Product { get; set; }
  5. public int Count { get; set; }
  6. }
  1. namespace Basket.WebAPI.Models;
  2. public class MyBasketDTO
  3. {
  4. public string UserId { get; set; }
  5. public IList<Catalog> Catalogs { get; set; }
  6. }
  1. namespace Basket.WebAPI.Models;
  2. public class Product
  3. {
  4. public string Id { get; set; }
  5. public string Name { get; set; }
  6. public decimal Price { get; set; }
  7. public int Stock { get; set; }
  8. public string Introduction { get; set; }
  9. }
  1. namespace Basket.WebAPI.Models;
  2. public class ProductDTO
  3. {
  4. public string Id { get; set; }
  5. public string Name { get; set; }
  6. public decimal Price { get; set; }
  7. public string Introduction { get; set; }
  8. }

测试效果

同时启动Catalog API 和 Basket API两个项目。
首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。
Kafkaafka学习三-.NET Core示例 - 图3
然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。
Kafkaafka学习三-.NET Core示例 - 图4
1663232629763.png
最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。