CAP

  • CAP docs看到更多详细资料。
  • CAP 视频教程,学习如何在项目中集成CAP。
  • GitHub源码:https://github.com/dotnetcore/cap
  • 示例代码:https://github.com/dotnetcore/CAP/tree/master/samples

    消息

    使用 ICapPublisher 接口发送出去的数据称之为 Message (消息)。

    补偿事务

    Compensating transaction
    某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。
    你可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定 callbackName 来得到消费者的执行结果,通常这仅适用于点对点的消费。以下是一个示例。

    补偿事务示例

    例如,在一个电商程序(CAP.Messaging.Demo)中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。

    NuGet

    你可以运行以下下命令在你的项目中安装 CAP。
    1. Install-Package DotNetCore.CAP
    2. Install-Package DotNetCore.CAP.InMemoryStorage
    3. Install-Package Savorboard.CAP.InMemoryMessageQueue

    注入CAP

    ```csharp using Savorboard.CAP.InMemoryMessageQueue;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddCap(config => { config.UseInMemoryMessageQueue(); //配置一个消息队列 config.UseInMemoryStorage(); //配置一个事件存储 });

  1. <a name="JCxZD"></a>
  2. ### 公共部分
  3. ```csharp
  4. using Microsoft.AspNetCore.Mvc;
  5. namespace CAP.Messaging.Demo.Controllers;
  6. /// <summary>
  7. /// 基础控制器
  8. /// </summary>
  9. [Route("api/[area]/[controller]/[action]")]
  10. [ApiController]
  11. public abstract class BaseController : ControllerBase
  12. {
  13. }
  1. using Microsoft.AspNetCore.Mvc;
  2. namespace CAP.Messaging.Demo.Controllers.Order;
  3. /// <summary>
  4. /// 域控制器
  5. /// </summary>
  6. [Area("Order")]
  7. public abstract class AreaController : BaseController
  8. {
  9. }

发布消息

新建立一个PublishController

  1. using DotNetCore.CAP;
  2. using Microsoft.AspNetCore.Mvc;
  3. using System.Text.Json;
  4. namespace CAP.Messaging.Demo.Controllers.Order;
  5. /// <summary>
  6. /// 发送消息
  7. /// </summary>
  8. public class PublishController : AreaController
  9. {
  10. ICapPublisher _capBus;
  11. public PublishController(ICapPublisher capBus)
  12. {
  13. _capBus = capBus;
  14. }
  15. /// <summary>
  16. /// 订单处理示例
  17. /// </summary>
  18. /// <returns></returns>
  19. [HttpGet]
  20. public IActionResult PublisherMessage()
  21. {
  22. Console.WriteLine("订单处理示例");
  23. //商品数量扣除调用模拟的ConsumerController的place.order.qty.deducted
  24. _capBus.Publish("place.order.qty.deducted",
  25. contentObj: new { OrderId = 1234, ProductId = 23255, Qty = 1 },
  26. //当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed
  27. callbackName: "place.order.mark.status");
  28. return Ok();
  29. }
  30. /// <summary>
  31. /// 当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed
  32. /// </summary>
  33. /// <param name="param"></param>
  34. [NonAction]//表示控制器方法不是动作方法
  35. [CapSubscribe("place.order.mark.status")]
  36. public void MarkOrderStatus(JsonElement param)
  37. {
  38. var orderId = param.GetProperty("OrderId").GetInt32();
  39. var isSuccess = param.GetProperty("IsSuccess").GetBoolean();
  40. if (isSuccess)
  41. {
  42. // mark order status to succeeded
  43. }
  44. else
  45. {
  46. // mark order status to failed
  47. }
  48. }
  49. }

处理消息

新建立一个ConsumerController

  1. using DotNetCore.CAP;
  2. using Microsoft.AspNetCore.Mvc;
  3. using System.Text.Json;
  4. namespace CAP.Messaging.Demo.Controllers.Order;
  5. /// <summary>
  6. /// 处理消息
  7. /// </summary>
  8. [ApiController]
  9. public class ConsumerController : ControllerBase
  10. {
  11. /// <summary>
  12. /// 商品数量扣除处理
  13. /// </summary>
  14. /// <param name="param"></param>
  15. /// <returns></returns>
  16. [NonAction]//表示控制器方法不是动作方法
  17. [CapSubscribe("place.order.qty.deducted")]
  18. public object DeductProductQty(JsonElement param)
  19. {
  20. Console.WriteLine("商品数量扣除处理被调用");
  21. var orderId = param.GetProperty("OrderId").GetInt32();
  22. var productId = param.GetProperty("ProductId").GetInt32();
  23. var qty = param.GetProperty("Qty").GetInt32();
  24. //business logic
  25. return new { OrderId = orderId, IsSuccess = true };
  26. }
  27. }

目录结构

1663585068953.png

测试

http://localhost:5289/api/Order/Publish/PublisherMessage
1663584500053.png
1663584527416.png

异构系统集成

参考官方文档说明

消息调度

CAP 接收到消息之后会将消息发送到 Transport, 由 Transport 进行运输。
当你使用 ICapPublisher 接口发送时,CAP将会将消息调度到相应的 Transport中去,目前还不支持批量发送消息。
有关 Transports 的更多信息,可以查看 Transports 章节。

消息存储

CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 Persistent 的更多信息,可以查看 Persistent 章节。

消息重试

参考官方文档说明

消息数据清理

数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期(可通过 FailedMessageExpiredAfter 配置)。
CAP 默认情况下会每隔5分钟将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。你可以通过 CollectorCleaningInterval 配置项来自定义间隔时间。