Event Listeners

接入 CrewAI 事件,以构建自定义集成与监控能力

概览

CrewAI 提供了一个强大的事件系统,允许你监听并响应 Crew 执行过程中发生的各种事件。这个功能使你能够构建自定义集成、监控方案、日志系统,或任何其他需要基于 CrewAI 内部事件触发的功能。

工作原理

CrewAI 使用事件总线架构,在整个执行生命周期中发出事件。这个事件系统由以下组件构成:

  1. CrewAIEventsBus:一个单例事件总线,负责管理事件注册与事件发出
  2. BaseEvent:系统中所有事件的基类
  3. BaseEventListener:用于创建自定义事件监听器的抽象基类

当 CrewAI 中发生特定操作时(例如 Crew 开始执行、Agent 完成任务、工具被使用),系统会发出相应事件。你可以为这些事件注册处理器,在事件发生时执行自定义代码。

CrewAI AMP 提供了内置的 Prompt Tracing 功能,它利用事件系统来追踪、存储并可视化所有 prompt、completion 以及相关元数据。这为调试和 Agent 运行透明性提供了强大支持。 <img src="https://mintcdn.com/crewai/Tp3HEbbp9mp-dy3H/images/enterprise/traces-overview.png?fit=max&auto=format&n=Tp3HEbbp9mp-dy3H&q=85&s=9c02d5b7306bf7adaeadd77a018f8fea" alt="Prompt Tracing Dashboard" width="2244" height="1422" data-path="images/enterprise/traces-overview.png" /> 使用 Prompt Tracing,你可以: 查看发送给 LLM 的所有 prompt 完整历史 跟踪 token 使用量和成本 调试 Agent 推理失败问题 与团队共享 prompt 序列 比较不同的 prompt 策略 导出 trace 以用于合规和审计

创建自定义事件监听器

要创建一个自定义事件监听器,你需要:

  1. 创建一个继承自 BaseEventListener 的类
  2. 实现 setup_listeners 方法
  3. 为你感兴趣的事件注册处理器
  4. 在合适的文件中创建监听器实例

下面是一个简单的自定义事件监听器类示例:

  1. from crewai.events import (
  2. CrewKickoffStartedEvent,
  3. CrewKickoffCompletedEvent,
  4. AgentExecutionCompletedEvent,
  5. )
  6. from crewai.events import BaseEventListener
  7. class MyCustomListener(BaseEventListener):
  8. def __init__(self):
  9. super().__init__()
  10. def setup_listeners(self, crewai_event_bus):
  11. @crewai_event_bus.on(CrewKickoffStartedEvent)
  12. def on_crew_started(source, event):
  13. print(f"Crew '{event.crew_name}' has started execution!")
  14. @crewai_event_bus.on(CrewKickoffCompletedEvent)
  15. def on_crew_completed(source, event):
  16. print(f"Crew '{event.crew_name}' has completed execution!")
  17. print(f"Output: {event.output}")
  18. @crewai_event_bus.on(AgentExecutionCompletedEvent)
  19. def on_agent_execution_completed(source, event):
  20. print(f"Agent '{event.agent.role}' completed task")
  21. print(f"Output: {event.output}")

正确注册你的监听器

仅仅定义监听器类是不够的。你还需要创建其实例,并确保它在你的应用中被导入。这样可以确保:

  1. 事件处理器被注册到事件总线上
  2. 监听器实例会保留在内存中(不会被垃圾回收)
  3. 在事件发出时,监听器处于激活状态

方案 1:在你的 Crew 或 Flow 实现中导入并实例化

最重要的是,在定义并执行 Crew 或 Flow 的文件中创建监听器实例:

对于基于 Crew 的应用

在你的 Crew 实现文件顶部创建并导入监听器:

  1. # In your crew.py file
  2. from crewai import Agent, Crew, Task
  3. from my_listeners import MyCustomListener
  4. # Create an instance of your listener
  5. my_listener = MyCustomListener()
  6. class MyCustomCrew:
  7. # Your crew implementation...
  8. def crew(self):
  9. return Crew(
  10. agents=[...],
  11. tasks=[...],
  12. # ...
  13. )

对于基于 Flow 的应用

在你的 Flow 实现文件顶部创建并导入监听器:

  1. # In your main.py or flow.py file
  2. from crewai.flow import Flow, listen, start
  3. from my_listeners import MyCustomListener
  4. # Create an instance of your listener
  5. my_listener = MyCustomListener()
  6. class MyCustomFlow(Flow):
  7. # Your flow implementation...
  8. @start()
  9. def first_step(self):
  10. # ...

这样可以确保在执行 Crew 或 Flow 时,监听器已经被加载并处于激活状态。

方案 2:为监听器创建一个 package

如果你有多个监听器,推荐使用更结构化的方式:

  1. 为监听器创建一个 package:
  1. my_project/
  2. ├── listeners/
  3. ├── __init__.py
  4. ├── my_custom_listener.py
  5. └── another_listener.py
  1. my_custom_listener.py 中,定义监听器类并创建其实例:
  1. # my_custom_listener.py
  2. from crewai.events import BaseEventListener
  3. # ... import events ...
  4. class MyCustomListener(BaseEventListener):
  5. # ... implementation ...
  6. # Create an instance of your listener
  7. my_custom_listener = MyCustomListener()
  1. __init__.py 中导入监听器实例,以确保它们被加载:
  1. # __init__.py
  2. from .my_custom_listener import my_custom_listener
  3. from .another_listener import another_listener
  4. # Optionally export them if you need to access them elsewhere
  5. __all__ = ['my_custom_listener', 'another_listener']
  1. 在你的 Crew 或 Flow 文件中导入这个 listeners package:
  1. # In your crew.py or flow.py file
  2. import my_project.listeners # This loads all your listeners
  3. class MyCustomCrew:
  4. # Your crew implementation...

这也是 CrewAI 代码库中第三方事件监听器的注册方式。

可用事件类型

CrewAI 提供了丰富的事件类型供你监听:

Crew 事件

  • CrewKickoffStartedEvent:当 Crew 开始执行时发出
  • CrewKickoffCompletedEvent:当 Crew 执行完成时发出
  • CrewKickoffFailedEvent:当 Crew 执行失败时发出
  • CrewTestStartedEvent:当 Crew 开始测试时发出
  • CrewTestCompletedEvent:当 Crew 测试完成时发出
  • CrewTestFailedEvent:当 Crew 测试失败时发出
  • CrewTrainStartedEvent:当 Crew 开始训练时发出
  • CrewTrainCompletedEvent:当 Crew 训练完成时发出
  • CrewTrainFailedEvent:当 Crew 训练失败时发出
  • CrewTestResultEvent:当 Crew 测试结果可用时发出。包含质量分数、执行时长和所使用模型。

Agent 事件

  • AgentExecutionStartedEvent:当 Agent 开始执行任务时发出
  • AgentExecutionCompletedEvent:当 Agent 完成任务执行时发出
  • AgentExecutionErrorEvent:当 Agent 在执行过程中发生错误时发出
  • LiteAgentExecutionStartedEvent:当 LiteAgent 开始执行时发出。包含 Agent 信息、工具和消息。
  • LiteAgentExecutionCompletedEvent:当 LiteAgent 执行完成时发出。包含 Agent 信息和输出。
  • LiteAgentExecutionErrorEvent:当 LiteAgent 在执行过程中发生错误时发出。包含 Agent 信息和错误消息。
  • AgentEvaluationStartedEvent:当 Agent 评估开始时发出。包含 Agent ID、Agent 角色、可选 Task ID 以及迭代次数。
  • AgentEvaluationCompletedEvent:当 Agent 评估完成时发出。包含 Agent ID、Agent 角色、可选 Task ID、迭代次数、指标类别和分数。
  • AgentEvaluationFailedEvent:当 Agent 评估失败时发出。包含 Agent ID、Agent 角色、可选 Task ID、迭代次数和错误消息。

Task 事件

  • TaskStartedEvent:当 Task 开始执行时发出
  • TaskCompletedEvent:当 Task 执行完成时发出
  • TaskFailedEvent:当 Task 执行失败时发出
  • TaskEvaluationEvent:当 Task 被评估时发出

工具使用事件

  • ToolUsageStartedEvent:当工具开始执行时发出
  • ToolUsageFinishedEvent:当工具执行完成时发出
  • ToolUsageErrorEvent:当工具执行过程中出现错误时发出
  • ToolValidateInputErrorEvent:当工具输入校验过程中出现错误时发出
  • ToolExecutionErrorEvent:当工具执行过程中出现错误时发出
  • ToolSelectionErrorEvent:当选择工具时出现错误时发出

MCP 事件

  • MCPConnectionStartedEvent:当开始连接到 MCP server 时发出。包含 server 名称、URL、传输类型、连接超时时间以及是否为重连尝试。
  • MCPConnectionCompletedEvent:当成功连接到 MCP server 时发出。包含 server 名称、连接耗时(毫秒)以及是否为重连。
  • MCPConnectionFailedEvent:当连接 MCP server 失败时发出。包含 server 名称、错误消息以及错误类型(timeoutauthenticationnetwork 等)。
  • MCPToolExecutionStartedEvent:当开始执行 MCP 工具时发出。包含 server 名称、工具名称和工具参数。
  • MCPToolExecutionCompletedEvent:当 MCP 工具执行成功完成时发出。包含 server 名称、工具名称、结果以及执行耗时(毫秒)。
  • MCPToolExecutionFailedEvent:当 MCP 工具执行失败时发出。包含 server 名称、工具名称、错误消息以及错误类型(timeoutvalidationserver_error 等)。
  • MCPConfigFetchFailedEvent:当获取 MCP server 配置失败时发出(例如 MCP 未在你的账户中连接、API 错误,或获取配置后连接失败)。包含 slug、错误消息以及错误类型(not_connectedapi_errorconnection_failed)。

Knowledge 事件

  • KnowledgeRetrievalStartedEvent:当 Knowledge 检索开始时发出
  • KnowledgeRetrievalCompletedEvent:当 Knowledge 检索完成时发出
  • KnowledgeQueryStartedEvent:当 Knowledge 查询开始时发出
  • KnowledgeQueryCompletedEvent:当 Knowledge 查询完成时发出
  • KnowledgeQueryFailedEvent:当 Knowledge 查询失败时发出
  • KnowledgeSearchQueryFailedEvent:当 Knowledge 搜索查询失败时发出

LLM Guardrail 事件

  • LLMGuardrailStartedEvent:当 Guardrail 校验开始时发出。包含当前应用的 Guardrail 详情以及重试次数。
  • LLMGuardrailCompletedEvent:当 Guardrail 校验完成时发出。包含校验成功 / 失败信息、结果,以及可能的错误消息。
  • LLMGuardrailFailedEvent:当 Guardrail 校验失败时发出。包含错误消息和重试次数。

Flow 事件

  • FlowCreatedEvent:当 Flow 被创建时发出
  • FlowStartedEvent:当 Flow 开始执行时发出
  • FlowFinishedEvent:当 Flow 执行完成时发出
  • FlowPausedEvent:当 Flow 因等待人工反馈而暂停时发出。包含 Flow 名称、Flow ID、方法名、当前状态、请求反馈时显示的消息,以及可选的路由结果列表。
  • FlowPlotEvent:当 Flow 被绘制时发出
  • MethodExecutionStartedEvent:当 Flow 方法开始执行时发出
  • MethodExecutionFinishedEvent:当 Flow 方法执行完成时发出
  • MethodExecutionFailedEvent:当 Flow 方法执行失败时发出
  • MethodExecutionPausedEvent:当 Flow 方法因等待人工反馈而暂停时发出。包含 Flow 名称、方法名、当前状态、Flow ID、请求反馈时显示的消息,以及可选的路由结果列表。

Human In The Loop 事件

  • FlowInputRequestedEvent:当 Flow 通过 Flow.ask() 请求用户输入时发出。包含 Flow 名称、方法名、展示给用户的问题或提示,以及可选元数据(例如用户 ID、频道、会话上下文)。
  • FlowInputReceivedEvent:当通过 Flow.ask() 收到用户输入后发出。包含 Flow 名称、方法名、原始问题、用户回复(若超时则为 None)、可选请求元数据,以及来自 provider 的可选响应元数据(例如是谁回复的、线程 ID、时间戳)。
  • HumanFeedbackRequestedEvent:当使用 @human_feedback 装饰的方法需要人工审阅输入时发出。包含 Flow 名称、方法名、展示给人工审阅者的方法输出、请求反馈时显示的消息,以及可选的路由结果列表。
  • HumanFeedbackReceivedEvent:当人工对 @human_feedback 装饰的方法提供反馈时发出。包含 Flow 名称、方法名、人工提供的原始文本反馈,以及折叠后的 outcome 字符串(如果指定了 emit)。

LLM 事件

  • LLMCallStartedEvent:当 LLM 调用开始时发出
  • LLMCallCompletedEvent:当 LLM 调用完成时发出
  • LLMCallFailedEvent:当 LLM 调用失败时发出
  • LLMStreamChunkEvent:在流式 LLM 响应过程中,每接收到一个 chunk 时发出
  • LLMThinkingChunkEvent:当从 thinking model 中接收到 thinking / reasoning chunk 时发出。包含 chunk 文本和可选 response ID。

Memory 事件

  • MemoryQueryStartedEvent:当 Memory 查询开始时发出。包含查询内容、limit 以及可选 score threshold。
  • MemoryQueryCompletedEvent:当 Memory 查询成功完成时发出。包含查询内容、结果、limit、score threshold 以及查询耗时。
  • MemoryQueryFailedEvent:当 Memory 查询失败时发出。包含查询内容、limit、score threshold 以及错误消息。
  • MemorySaveStartedEvent:当 Memory 保存操作开始时发出。包含待保存的值、元数据以及可选 Agent 角色。
  • MemorySaveCompletedEvent:当 Memory 保存操作成功完成时发出。包含已保存的值、元数据、Agent 角色以及保存耗时。
  • MemorySaveFailedEvent:当 Memory 保存操作失败时发出。包含待保存的值、元数据、Agent 角色以及错误消息。
  • MemoryRetrievalStartedEvent:当针对 Task prompt 的 Memory 检索开始时发出。包含可选 Task ID。
  • MemoryRetrievalCompletedEvent:当针对 Task prompt 的 Memory 检索成功完成时发出。包含 Task ID、Memory 内容以及检索耗时。
  • MemoryRetrievalFailedEvent:当针对 Task prompt 的 Memory 检索失败时发出。包含可选 Task ID 和错误消息。

Reasoning 事件

  • AgentReasoningStartedEvent:当 Agent 开始对任务进行推理时发出。包含 Agent 角色、Task ID 和尝试次数。
  • AgentReasoningCompletedEvent:当 Agent 完成推理过程时发出。包含 Agent 角色、Task ID、生成的计划以及 Agent 是否已准备好继续执行。
  • AgentReasoningFailedEvent:当推理过程失败时发出。包含 Agent 角色、Task ID 和错误消息。

Observation 事件

  • StepObservationStartedEvent:当 Planner 开始观察某一步骤结果时发出。它会在每个步骤执行后、观察用 LLM 调用前触发。包含 Agent 角色、步骤编号和步骤描述。
  • StepObservationCompletedEvent:当 Planner 完成对某一步骤结果的观察时发出。包含该步骤是否成功完成、学到的关键信息、剩余计划是否仍然有效、是否需要完全重新规划,以及建议的优化项。
  • StepObservationFailedEvent:当观察用 LLM 调用本身失败时发出。系统会默认继续执行原计划。包含错误消息。
  • PlanRefinementEvent:当 Planner 在不进行完整重规划的情况下,对后续步骤描述进行优化时发出。包含被优化的步骤数量及具体优化内容。
  • PlanReplanTriggeredEvent:当 Planner 认为剩余计划从根本上有问题并触发完整重规划时发出。包含重规划原因、重规划次数以及被保留的已完成步骤数量。
  • GoalAchievedEarlyEvent:当 Planner 检测到目标已提前达成,因此剩余步骤将被跳过时发出。包含剩余步骤数量和已完成步骤数量。

A2A(Agent-to-Agent)事件

Delegation 事件

  • A2ADelegationStartedEvent:当 A2A 委派开始时发出。包含 endpoint URL、任务描述、Agent ID、Context ID、是否为多轮、轮次编号、Agent card 元数据、协议版本、provider 信息以及可选 skill ID。
  • A2ADelegationCompletedEvent:当 A2A 委派完成时发出。包含完成状态(completedinput_requiredfailed 等)、结果、错误消息、Context ID 以及 Agent card 元数据。
  • A2AParallelDelegationStartedEvent:当开始并行委派给多个 A2A Agent 时发出。包含 endpoint 列表和任务描述。
  • A2AParallelDelegationCompletedEvent:当并行委派给多个 A2A Agent 完成时发出。包含 endpoint 列表、成功数、失败数以及结果摘要。

Conversation 事件

  • A2AConversationStartedEvent:在多轮 A2A 对话开始时、第一次消息交换之前发出。包含 Agent ID、endpoint、Context ID、Agent card 元数据、协议版本和 provider 信息。
  • A2AMessageSentEvent:当消息被发送给 A2A Agent 时发出。包含消息内容、轮次编号、Context ID、Message ID 以及是否为多轮。
  • A2AResponseReceivedEvent:当从 A2A Agent 收到响应时发出。包含响应内容、轮次编号、Context ID、Message ID、状态以及是否为最终响应。
  • A2AConversationCompletedEvent:在多轮 A2A 对话结束时发出。包含最终状态(completedfailed)、最终结果、错误消息、Context ID 以及总轮次数。

Streaming 事件

  • A2AStreamingStartedEvent:当 A2A 委派进入流式模式时发出。包含 Task ID、Context ID、endpoint、轮次编号以及是否为多轮。
  • A2AStreamingChunkEvent:当接收到一个流式 chunk 时发出。包含 chunk 文本、chunk 索引、是否为最终 chunk、Task ID、Context ID 以及轮次编号。

Polling 与 Push Notification 事件

  • A2APollingStartedEvent:当 A2A 委派开始进入轮询模式时发出。包含 Task ID、Context ID、轮询间隔(秒)和 endpoint。
  • A2APollingStatusEvent:在每次轮询迭代时发出。包含 Task ID、Context ID、当前任务状态、已耗时秒数以及轮询次数。
  • A2APushNotificationRegisteredEvent:当注册 push notification 回调时发出。包含 Task ID、Context ID、回调 URL 和 endpoint。
  • A2APushNotificationReceivedEvent:当从远程 A2A Agent 收到 push notification 时发出。包含 Task ID、Context ID 和当前状态。
  • A2APushNotificationSentEvent:当 push notification 被发送到回调 URL 时发出。包含 Task ID、Context ID、回调 URL、状态、是否发送成功以及可选错误消息。
  • A2APushNotificationTimeoutEvent:当等待 push notification 超时时发出。包含 Task ID、Context ID 和超时时长(秒)。

Connection 与 Authentication 事件

  • A2AAgentCardFetchedEvent:当成功获取 agent card 时发出。包含 endpoint、Agent 名称、Agent card 元数据、协议版本、provider 信息、是否来自缓存以及获取耗时(毫秒)。
  • A2AAuthenticationFailedEvent:当对 A2A Agent 的身份验证失败时发出。包含 endpoint、尝试的认证类型(例如 beareroauth2api_key)、错误消息以及 HTTP 状态码。
  • A2AConnectionErrorEvent:当 A2A 通信期间发生连接错误时发出。包含 endpoint、错误消息、错误类型(例如 timeoutconnection_refuseddns_error)、HTTP 状态码以及正在尝试的操作。
  • A2ATransportNegotiatedEvent:当与 A2A Agent 协商传输协议时发出。包含协商后的 transport、协商后的 URL、选择来源(client_preferredserver_preferredfallback)以及客户端 / 服务端支持的 transport。
  • A2AContentTypeNegotiatedEvent:当与 A2A Agent 协商内容类型时发出。包含客户端 / 服务端输入输出模式、协商后的输入输出模式,以及协商是否成功。

Artifact 事件

  • A2AArtifactReceivedEvent:当从远程 A2A Agent 接收到 artifact 时发出。包含 Task ID、Artifact ID、Artifact 名称、描述、MIME 类型、字节大小以及内容是否应追加。

Server Task 事件

  • A2AServerTaskStartedEvent:当 A2A server task 执行开始时发出。包含 Task ID 和 Context ID。
  • A2AServerTaskCompletedEvent:当 A2A server task 执行完成时发出。包含 Task ID、Context ID 和结果。
  • A2AServerTaskCanceledEvent:当 A2A server task 执行被取消时发出。包含 Task ID 和 Context ID。
  • A2AServerTaskFailedEvent:当 A2A server task 执行失败时发出。包含 Task ID、Context ID 和错误消息。

Context 生命周期事件

  • A2AContextCreatedEvent:当 A2A Context 被创建时发出。Context 用于将对话或工作流中的相关任务分组。包含 Context ID 和创建时间戳。
  • A2AContextExpiredEvent:当 A2A Context 因 TTL 到期而过期时发出。包含 Context ID、创建时间戳、已存在时长(秒)以及任务数量。
  • A2AContextIdleEvent:当 A2A Context 进入空闲状态(在配置阈值时间内无活动)时发出。包含 Context ID、空闲时长(秒)以及任务数量。
  • A2AContextCompletedEvent:当 A2A Context 中所有任务都完成时发出。包含 Context ID、任务总数以及总耗时(秒)。
  • A2AContextPrunedEvent:当 A2A Context 被裁剪(删除)时发出。包含 Context ID、任务数量以及存在时长(秒)。

事件处理器结构

每个事件处理器会接收两个参数:

  1. source:发出该事件的对象
  2. event:事件实例,包含与该事件相关的特定数据

event 对象的结构取决于事件类型,但所有事件都继承自 BaseEvent,并包含:

  • timestamp:事件发出的时间
  • type:事件类型的字符串标识符

其他字段会根据具体事件类型而变化。例如,CrewKickoffCompletedEvent 包含 crew_nameoutput 字段。

高级用法:Scoped Handlers

对于临时事件处理(适合测试或特定操作),你可以使用 scoped_handlers 上下文管理器:

  1. from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
  2. with crewai_event_bus.scoped_handlers():
  3. @crewai_event_bus.on(CrewKickoffStartedEvent)
  4. def temp_handler(source, event):
  5. print("This handler only exists within this context")
  6. # Do something that emits events
  7. # Outside the context, the temporary handler is removed

使用场景

事件监听器可以用于多种用途:

  1. 日志与监控:跟踪 Crew 的执行过程,并记录关键事件
  2. 分析:收集关于 Crew 性能和行为的数据
  3. 调试:设置临时监听器来排查特定问题
  4. 集成:将 CrewAI 与外部系统连接,例如监控平台、数据库或通知服务
  5. 自定义行为:基于特定事件触发自定义动作

最佳实践

  1. 保持处理器轻量:事件处理器应尽量轻量,避免阻塞性操作
  2. 错误处理:在事件处理器中加入适当的错误处理,防止异常影响主执行流程
  3. 资源清理:如果监听器分配了资源,请确保这些资源被正确清理
  4. 选择性监听:只监听你真正需要处理的事件
  5. 测试:独立测试你的事件监听器,确保其行为符合预期

通过利用 CrewAI 的事件系统,你可以扩展其功能,并将其无缝集成到现有基础设施中。