CAP
- CAP docs看到更多详细资料。
- CAP 视频教程,学习如何在项目中集成CAP。
- GitHub源码:https://github.com/dotnetcore/cap
- 示例代码:https://github.com/dotnetcore/CAP/tree/master/samples
消息
使用 ICapPublisher 接口发送出去的数据称之为 Message (消息)。补偿事务
Compensating transaction
某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。
你可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定 callbackName 来得到消费者的执行结果,通常这仅适用于点对点的消费。以下是一个示例。补偿事务示例
例如,在一个电商程序(CAP.Messaging.Demo)中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。NuGet
你可以运行以下下命令在你的项目中安装 CAP。Install-Package DotNetCore.CAPInstall-Package DotNetCore.CAP.InMemoryStorageInstall-Package Savorboard.CAP.InMemoryMessageQueue
注入CAP
```csharp using Savorboard.CAP.InMemoryMessageQueue;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddCap(config => { config.UseInMemoryMessageQueue(); //配置一个消息队列 config.UseInMemoryStorage(); //配置一个事件存储 });
<a name="JCxZD"></a>### 公共部分```csharpusing Microsoft.AspNetCore.Mvc;namespace CAP.Messaging.Demo.Controllers;/// <summary>/// 基础控制器/// </summary>[Route("api/[area]/[controller]/[action]")][ApiController]public abstract class BaseController : ControllerBase{}
using Microsoft.AspNetCore.Mvc;namespace CAP.Messaging.Demo.Controllers.Order;/// <summary>/// 域控制器/// </summary>[Area("Order")]public abstract class AreaController : BaseController{}
发布消息
新建立一个PublishController
using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;using System.Text.Json;namespace CAP.Messaging.Demo.Controllers.Order;/// <summary>/// 发送消息/// </summary>public class PublishController : AreaController{ICapPublisher _capBus;public PublishController(ICapPublisher capBus){_capBus = capBus;}/// <summary>/// 订单处理示例/// </summary>/// <returns></returns>[HttpGet]public IActionResult PublisherMessage(){Console.WriteLine("订单处理示例");//商品数量扣除调用模拟的ConsumerController的place.order.qty.deducted_capBus.Publish("place.order.qty.deducted",contentObj: new { OrderId = 1234, ProductId = 23255, Qty = 1 },//当商品数量成功扣除时将状态标记为 succeeded ,否则为 failedcallbackName: "place.order.mark.status");return Ok();}/// <summary>/// 当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed/// </summary>/// <param name="param"></param>[NonAction]//表示控制器方法不是动作方法[CapSubscribe("place.order.mark.status")]public void MarkOrderStatus(JsonElement param){var orderId = param.GetProperty("OrderId").GetInt32();var isSuccess = param.GetProperty("IsSuccess").GetBoolean();if (isSuccess){// mark order status to succeeded}else{// mark order status to failed}}}
处理消息
新建立一个ConsumerController
using DotNetCore.CAP;using Microsoft.AspNetCore.Mvc;using System.Text.Json;namespace CAP.Messaging.Demo.Controllers.Order;/// <summary>/// 处理消息/// </summary>[ApiController]public class ConsumerController : ControllerBase{/// <summary>/// 商品数量扣除处理/// </summary>/// <param name="param"></param>/// <returns></returns>[NonAction]//表示控制器方法不是动作方法[CapSubscribe("place.order.qty.deducted")]public object DeductProductQty(JsonElement param){Console.WriteLine("商品数量扣除处理被调用");var orderId = param.GetProperty("OrderId").GetInt32();var productId = param.GetProperty("ProductId").GetInt32();var qty = param.GetProperty("Qty").GetInt32();//business logicreturn new { OrderId = orderId, IsSuccess = true };}}
目录结构
测试
http://localhost:5289/api/Order/Publish/PublisherMessage
异构系统集成
参考官方文档说明
消息调度
CAP 接收到消息之后会将消息发送到 Transport, 由 Transport 进行运输。
当你使用 ICapPublisher 接口发送时,CAP将会将消息调度到相应的 Transport中去,目前还不支持批量发送消息。
有关 Transports 的更多信息,可以查看 Transports 章节。
消息存储
CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 Persistent 的更多信息,可以查看 Persistent 章节。
消息重试
参考官方文档说明
消息数据清理
数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期(可通过 FailedMessageExpiredAfter 配置)。
CAP 默认情况下会每隔5分钟将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。你可以通过 CollectorCleaningInterval 配置项来自定义间隔时间。
