Event Listeners
接入 CrewAI 事件,以构建自定义集成与监控能力
概览
CrewAI 提供了一个强大的事件系统,允许你监听并响应 Crew 执行过程中发生的各种事件。这个功能使你能够构建自定义集成、监控方案、日志系统,或任何其他需要基于 CrewAI 内部事件触发的功能。
工作原理
CrewAI 使用事件总线架构,在整个执行生命周期中发出事件。这个事件系统由以下组件构成:
- CrewAIEventsBus:一个单例事件总线,负责管理事件注册与事件发出
- BaseEvent:系统中所有事件的基类
- BaseEventListener:用于创建自定义事件监听器的抽象基类
当 CrewAI 中发生特定操作时(例如 Crew 开始执行、Agent 完成任务、工具被使用),系统会发出相应事件。你可以为这些事件注册处理器,在事件发生时执行自定义代码。
<img src="https://mintcdn.com/crewai/Tp3HEbbp9mp-dy3H/images/enterprise/traces-overview.png?fit=max&auto=format&n=Tp3HEbbp9mp-dy3H&q=85&s=9c02d5b7306bf7adaeadd77a018f8fea" alt="Prompt Tracing Dashboard" width="2244" height="1422" data-path="images/enterprise/traces-overview.png" />
使用 Prompt Tracing,你可以:
查看发送给 LLM 的所有 prompt 完整历史
跟踪 token 使用量和成本
调试 Agent 推理失败问题
与团队共享 prompt 序列
比较不同的 prompt 策略
导出 trace 以用于合规和审计
创建自定义事件监听器
要创建一个自定义事件监听器,你需要:
- 创建一个继承自
BaseEventListener的类 - 实现
setup_listeners方法 - 为你感兴趣的事件注册处理器
- 在合适的文件中创建监听器实例
下面是一个简单的自定义事件监听器类示例:
from crewai.events import (CrewKickoffStartedEvent,CrewKickoffCompletedEvent,AgentExecutionCompletedEvent,)from crewai.events import BaseEventListenerclass MyCustomListener(BaseEventListener):def __init__(self):super().__init__()def setup_listeners(self, crewai_event_bus):@crewai_event_bus.on(CrewKickoffStartedEvent)def on_crew_started(source, event):print(f"Crew '{event.crew_name}' has started execution!")@crewai_event_bus.on(CrewKickoffCompletedEvent)def on_crew_completed(source, event):print(f"Crew '{event.crew_name}' has completed execution!")print(f"Output: {event.output}")@crewai_event_bus.on(AgentExecutionCompletedEvent)def on_agent_execution_completed(source, event):print(f"Agent '{event.agent.role}' completed task")print(f"Output: {event.output}")
正确注册你的监听器
仅仅定义监听器类是不够的。你还需要创建其实例,并确保它在你的应用中被导入。这样可以确保:
- 事件处理器被注册到事件总线上
- 监听器实例会保留在内存中(不会被垃圾回收)
- 在事件发出时,监听器处于激活状态
方案 1:在你的 Crew 或 Flow 实现中导入并实例化
最重要的是,在定义并执行 Crew 或 Flow 的文件中创建监听器实例:
对于基于 Crew 的应用
在你的 Crew 实现文件顶部创建并导入监听器:
# In your crew.py filefrom crewai import Agent, Crew, Taskfrom my_listeners import MyCustomListener# Create an instance of your listenermy_listener = MyCustomListener()class MyCustomCrew:# Your crew implementation...def crew(self):return Crew(agents=[...],tasks=[...],# ...)
对于基于 Flow 的应用
在你的 Flow 实现文件顶部创建并导入监听器:
# In your main.py or flow.py filefrom crewai.flow import Flow, listen, startfrom my_listeners import MyCustomListener# Create an instance of your listenermy_listener = MyCustomListener()class MyCustomFlow(Flow):# Your flow implementation...@start()def first_step(self):# ...
这样可以确保在执行 Crew 或 Flow 时,监听器已经被加载并处于激活状态。
方案 2:为监听器创建一个 package
如果你有多个监听器,推荐使用更结构化的方式:
- 为监听器创建一个 package:
my_project/├── listeners/│ ├── __init__.py│ ├── my_custom_listener.py│ └── another_listener.py
- 在
my_custom_listener.py中,定义监听器类并创建其实例:
# my_custom_listener.pyfrom crewai.events import BaseEventListener# ... import events ...class MyCustomListener(BaseEventListener):# ... implementation ...# Create an instance of your listenermy_custom_listener = MyCustomListener()
- 在
__init__.py中导入监听器实例,以确保它们被加载:
# __init__.pyfrom .my_custom_listener import my_custom_listenerfrom .another_listener import another_listener# Optionally export them if you need to access them elsewhere__all__ = ['my_custom_listener', 'another_listener']
- 在你的 Crew 或 Flow 文件中导入这个 listeners package:
# In your crew.py or flow.py fileimport my_project.listeners # This loads all your listenersclass MyCustomCrew:# Your crew implementation...
这也是 CrewAI 代码库中第三方事件监听器的注册方式。
可用事件类型
CrewAI 提供了丰富的事件类型供你监听:
Crew 事件
- CrewKickoffStartedEvent:当 Crew 开始执行时发出
- CrewKickoffCompletedEvent:当 Crew 执行完成时发出
- CrewKickoffFailedEvent:当 Crew 执行失败时发出
- CrewTestStartedEvent:当 Crew 开始测试时发出
- CrewTestCompletedEvent:当 Crew 测试完成时发出
- CrewTestFailedEvent:当 Crew 测试失败时发出
- CrewTrainStartedEvent:当 Crew 开始训练时发出
- CrewTrainCompletedEvent:当 Crew 训练完成时发出
- CrewTrainFailedEvent:当 Crew 训练失败时发出
- CrewTestResultEvent:当 Crew 测试结果可用时发出。包含质量分数、执行时长和所使用模型。
Agent 事件
- AgentExecutionStartedEvent:当 Agent 开始执行任务时发出
- AgentExecutionCompletedEvent:当 Agent 完成任务执行时发出
- AgentExecutionErrorEvent:当 Agent 在执行过程中发生错误时发出
- LiteAgentExecutionStartedEvent:当 LiteAgent 开始执行时发出。包含 Agent 信息、工具和消息。
- LiteAgentExecutionCompletedEvent:当 LiteAgent 执行完成时发出。包含 Agent 信息和输出。
- LiteAgentExecutionErrorEvent:当 LiteAgent 在执行过程中发生错误时发出。包含 Agent 信息和错误消息。
- AgentEvaluationStartedEvent:当 Agent 评估开始时发出。包含 Agent ID、Agent 角色、可选 Task ID 以及迭代次数。
- AgentEvaluationCompletedEvent:当 Agent 评估完成时发出。包含 Agent ID、Agent 角色、可选 Task ID、迭代次数、指标类别和分数。
- AgentEvaluationFailedEvent:当 Agent 评估失败时发出。包含 Agent ID、Agent 角色、可选 Task ID、迭代次数和错误消息。
Task 事件
- TaskStartedEvent:当 Task 开始执行时发出
- TaskCompletedEvent:当 Task 执行完成时发出
- TaskFailedEvent:当 Task 执行失败时发出
- TaskEvaluationEvent:当 Task 被评估时发出
工具使用事件
- ToolUsageStartedEvent:当工具开始执行时发出
- ToolUsageFinishedEvent:当工具执行完成时发出
- ToolUsageErrorEvent:当工具执行过程中出现错误时发出
- ToolValidateInputErrorEvent:当工具输入校验过程中出现错误时发出
- ToolExecutionErrorEvent:当工具执行过程中出现错误时发出
- ToolSelectionErrorEvent:当选择工具时出现错误时发出
MCP 事件
- MCPConnectionStartedEvent:当开始连接到 MCP server 时发出。包含 server 名称、URL、传输类型、连接超时时间以及是否为重连尝试。
- MCPConnectionCompletedEvent:当成功连接到 MCP server 时发出。包含 server 名称、连接耗时(毫秒)以及是否为重连。
- MCPConnectionFailedEvent:当连接 MCP server 失败时发出。包含 server 名称、错误消息以及错误类型(
timeout、authentication、network等)。 - MCPToolExecutionStartedEvent:当开始执行 MCP 工具时发出。包含 server 名称、工具名称和工具参数。
- MCPToolExecutionCompletedEvent:当 MCP 工具执行成功完成时发出。包含 server 名称、工具名称、结果以及执行耗时(毫秒)。
- MCPToolExecutionFailedEvent:当 MCP 工具执行失败时发出。包含 server 名称、工具名称、错误消息以及错误类型(
timeout、validation、server_error等)。 - MCPConfigFetchFailedEvent:当获取 MCP server 配置失败时发出(例如 MCP 未在你的账户中连接、API 错误,或获取配置后连接失败)。包含 slug、错误消息以及错误类型(
not_connected、api_error、connection_failed)。
Knowledge 事件
- KnowledgeRetrievalStartedEvent:当 Knowledge 检索开始时发出
- KnowledgeRetrievalCompletedEvent:当 Knowledge 检索完成时发出
- KnowledgeQueryStartedEvent:当 Knowledge 查询开始时发出
- KnowledgeQueryCompletedEvent:当 Knowledge 查询完成时发出
- KnowledgeQueryFailedEvent:当 Knowledge 查询失败时发出
- KnowledgeSearchQueryFailedEvent:当 Knowledge 搜索查询失败时发出
LLM Guardrail 事件
- LLMGuardrailStartedEvent:当 Guardrail 校验开始时发出。包含当前应用的 Guardrail 详情以及重试次数。
- LLMGuardrailCompletedEvent:当 Guardrail 校验完成时发出。包含校验成功 / 失败信息、结果,以及可能的错误消息。
- LLMGuardrailFailedEvent:当 Guardrail 校验失败时发出。包含错误消息和重试次数。
Flow 事件
- FlowCreatedEvent:当 Flow 被创建时发出
- FlowStartedEvent:当 Flow 开始执行时发出
- FlowFinishedEvent:当 Flow 执行完成时发出
- FlowPausedEvent:当 Flow 因等待人工反馈而暂停时发出。包含 Flow 名称、Flow ID、方法名、当前状态、请求反馈时显示的消息,以及可选的路由结果列表。
- FlowPlotEvent:当 Flow 被绘制时发出
- MethodExecutionStartedEvent:当 Flow 方法开始执行时发出
- MethodExecutionFinishedEvent:当 Flow 方法执行完成时发出
- MethodExecutionFailedEvent:当 Flow 方法执行失败时发出
- MethodExecutionPausedEvent:当 Flow 方法因等待人工反馈而暂停时发出。包含 Flow 名称、方法名、当前状态、Flow ID、请求反馈时显示的消息,以及可选的路由结果列表。
Human In The Loop 事件
- FlowInputRequestedEvent:当 Flow 通过
Flow.ask()请求用户输入时发出。包含 Flow 名称、方法名、展示给用户的问题或提示,以及可选元数据(例如用户 ID、频道、会话上下文)。 - FlowInputReceivedEvent:当通过
Flow.ask()收到用户输入后发出。包含 Flow 名称、方法名、原始问题、用户回复(若超时则为None)、可选请求元数据,以及来自 provider 的可选响应元数据(例如是谁回复的、线程 ID、时间戳)。 - HumanFeedbackRequestedEvent:当使用
@human_feedback装饰的方法需要人工审阅输入时发出。包含 Flow 名称、方法名、展示给人工审阅者的方法输出、请求反馈时显示的消息,以及可选的路由结果列表。 - HumanFeedbackReceivedEvent:当人工对
@human_feedback装饰的方法提供反馈时发出。包含 Flow 名称、方法名、人工提供的原始文本反馈,以及折叠后的 outcome 字符串(如果指定了 emit)。
LLM 事件
- LLMCallStartedEvent:当 LLM 调用开始时发出
- LLMCallCompletedEvent:当 LLM 调用完成时发出
- LLMCallFailedEvent:当 LLM 调用失败时发出
- LLMStreamChunkEvent:在流式 LLM 响应过程中,每接收到一个 chunk 时发出
- LLMThinkingChunkEvent:当从 thinking model 中接收到 thinking / reasoning chunk 时发出。包含 chunk 文本和可选 response ID。
Memory 事件
- MemoryQueryStartedEvent:当 Memory 查询开始时发出。包含查询内容、limit 以及可选 score threshold。
- MemoryQueryCompletedEvent:当 Memory 查询成功完成时发出。包含查询内容、结果、limit、score threshold 以及查询耗时。
- MemoryQueryFailedEvent:当 Memory 查询失败时发出。包含查询内容、limit、score threshold 以及错误消息。
- MemorySaveStartedEvent:当 Memory 保存操作开始时发出。包含待保存的值、元数据以及可选 Agent 角色。
- MemorySaveCompletedEvent:当 Memory 保存操作成功完成时发出。包含已保存的值、元数据、Agent 角色以及保存耗时。
- MemorySaveFailedEvent:当 Memory 保存操作失败时发出。包含待保存的值、元数据、Agent 角色以及错误消息。
- MemoryRetrievalStartedEvent:当针对 Task prompt 的 Memory 检索开始时发出。包含可选 Task ID。
- MemoryRetrievalCompletedEvent:当针对 Task prompt 的 Memory 检索成功完成时发出。包含 Task ID、Memory 内容以及检索耗时。
- MemoryRetrievalFailedEvent:当针对 Task prompt 的 Memory 检索失败时发出。包含可选 Task ID 和错误消息。
Reasoning 事件
- AgentReasoningStartedEvent:当 Agent 开始对任务进行推理时发出。包含 Agent 角色、Task ID 和尝试次数。
- AgentReasoningCompletedEvent:当 Agent 完成推理过程时发出。包含 Agent 角色、Task ID、生成的计划以及 Agent 是否已准备好继续执行。
- AgentReasoningFailedEvent:当推理过程失败时发出。包含 Agent 角色、Task ID 和错误消息。
Observation 事件
- StepObservationStartedEvent:当 Planner 开始观察某一步骤结果时发出。它会在每个步骤执行后、观察用 LLM 调用前触发。包含 Agent 角色、步骤编号和步骤描述。
- StepObservationCompletedEvent:当 Planner 完成对某一步骤结果的观察时发出。包含该步骤是否成功完成、学到的关键信息、剩余计划是否仍然有效、是否需要完全重新规划,以及建议的优化项。
- StepObservationFailedEvent:当观察用 LLM 调用本身失败时发出。系统会默认继续执行原计划。包含错误消息。
- PlanRefinementEvent:当 Planner 在不进行完整重规划的情况下,对后续步骤描述进行优化时发出。包含被优化的步骤数量及具体优化内容。
- PlanReplanTriggeredEvent:当 Planner 认为剩余计划从根本上有问题并触发完整重规划时发出。包含重规划原因、重规划次数以及被保留的已完成步骤数量。
- GoalAchievedEarlyEvent:当 Planner 检测到目标已提前达成,因此剩余步骤将被跳过时发出。包含剩余步骤数量和已完成步骤数量。
A2A(Agent-to-Agent)事件
Delegation 事件
- A2ADelegationStartedEvent:当 A2A 委派开始时发出。包含 endpoint URL、任务描述、Agent ID、Context ID、是否为多轮、轮次编号、Agent card 元数据、协议版本、provider 信息以及可选 skill ID。
- A2ADelegationCompletedEvent:当 A2A 委派完成时发出。包含完成状态(
completed、input_required、failed等)、结果、错误消息、Context ID 以及 Agent card 元数据。 - A2AParallelDelegationStartedEvent:当开始并行委派给多个 A2A Agent 时发出。包含 endpoint 列表和任务描述。
- A2AParallelDelegationCompletedEvent:当并行委派给多个 A2A Agent 完成时发出。包含 endpoint 列表、成功数、失败数以及结果摘要。
Conversation 事件
- A2AConversationStartedEvent:在多轮 A2A 对话开始时、第一次消息交换之前发出。包含 Agent ID、endpoint、Context ID、Agent card 元数据、协议版本和 provider 信息。
- A2AMessageSentEvent:当消息被发送给 A2A Agent 时发出。包含消息内容、轮次编号、Context ID、Message ID 以及是否为多轮。
- A2AResponseReceivedEvent:当从 A2A Agent 收到响应时发出。包含响应内容、轮次编号、Context ID、Message ID、状态以及是否为最终响应。
- A2AConversationCompletedEvent:在多轮 A2A 对话结束时发出。包含最终状态(
completed或failed)、最终结果、错误消息、Context ID 以及总轮次数。
Streaming 事件
- A2AStreamingStartedEvent:当 A2A 委派进入流式模式时发出。包含 Task ID、Context ID、endpoint、轮次编号以及是否为多轮。
- A2AStreamingChunkEvent:当接收到一个流式 chunk 时发出。包含 chunk 文本、chunk 索引、是否为最终 chunk、Task ID、Context ID 以及轮次编号。
Polling 与 Push Notification 事件
- A2APollingStartedEvent:当 A2A 委派开始进入轮询模式时发出。包含 Task ID、Context ID、轮询间隔(秒)和 endpoint。
- A2APollingStatusEvent:在每次轮询迭代时发出。包含 Task ID、Context ID、当前任务状态、已耗时秒数以及轮询次数。
- A2APushNotificationRegisteredEvent:当注册 push notification 回调时发出。包含 Task ID、Context ID、回调 URL 和 endpoint。
- A2APushNotificationReceivedEvent:当从远程 A2A Agent 收到 push notification 时发出。包含 Task ID、Context ID 和当前状态。
- A2APushNotificationSentEvent:当 push notification 被发送到回调 URL 时发出。包含 Task ID、Context ID、回调 URL、状态、是否发送成功以及可选错误消息。
- A2APushNotificationTimeoutEvent:当等待 push notification 超时时发出。包含 Task ID、Context ID 和超时时长(秒)。
Connection 与 Authentication 事件
- A2AAgentCardFetchedEvent:当成功获取 agent card 时发出。包含 endpoint、Agent 名称、Agent card 元数据、协议版本、provider 信息、是否来自缓存以及获取耗时(毫秒)。
- A2AAuthenticationFailedEvent:当对 A2A Agent 的身份验证失败时发出。包含 endpoint、尝试的认证类型(例如
bearer、oauth2、api_key)、错误消息以及 HTTP 状态码。 - A2AConnectionErrorEvent:当 A2A 通信期间发生连接错误时发出。包含 endpoint、错误消息、错误类型(例如
timeout、connection_refused、dns_error)、HTTP 状态码以及正在尝试的操作。 - A2ATransportNegotiatedEvent:当与 A2A Agent 协商传输协议时发出。包含协商后的 transport、协商后的 URL、选择来源(
client_preferred、server_preferred、fallback)以及客户端 / 服务端支持的 transport。 - A2AContentTypeNegotiatedEvent:当与 A2A Agent 协商内容类型时发出。包含客户端 / 服务端输入输出模式、协商后的输入输出模式,以及协商是否成功。
Artifact 事件
- A2AArtifactReceivedEvent:当从远程 A2A Agent 接收到 artifact 时发出。包含 Task ID、Artifact ID、Artifact 名称、描述、MIME 类型、字节大小以及内容是否应追加。
Server Task 事件
- A2AServerTaskStartedEvent:当 A2A server task 执行开始时发出。包含 Task ID 和 Context ID。
- A2AServerTaskCompletedEvent:当 A2A server task 执行完成时发出。包含 Task ID、Context ID 和结果。
- A2AServerTaskCanceledEvent:当 A2A server task 执行被取消时发出。包含 Task ID 和 Context ID。
- A2AServerTaskFailedEvent:当 A2A server task 执行失败时发出。包含 Task ID、Context ID 和错误消息。
Context 生命周期事件
- A2AContextCreatedEvent:当 A2A Context 被创建时发出。Context 用于将对话或工作流中的相关任务分组。包含 Context ID 和创建时间戳。
- A2AContextExpiredEvent:当 A2A Context 因 TTL 到期而过期时发出。包含 Context ID、创建时间戳、已存在时长(秒)以及任务数量。
- A2AContextIdleEvent:当 A2A Context 进入空闲状态(在配置阈值时间内无活动)时发出。包含 Context ID、空闲时长(秒)以及任务数量。
- A2AContextCompletedEvent:当 A2A Context 中所有任务都完成时发出。包含 Context ID、任务总数以及总耗时(秒)。
- A2AContextPrunedEvent:当 A2A Context 被裁剪(删除)时发出。包含 Context ID、任务数量以及存在时长(秒)。
事件处理器结构
每个事件处理器会接收两个参数:
- source:发出该事件的对象
- event:事件实例,包含与该事件相关的特定数据
event 对象的结构取决于事件类型,但所有事件都继承自 BaseEvent,并包含:
- timestamp:事件发出的时间
- type:事件类型的字符串标识符
其他字段会根据具体事件类型而变化。例如,CrewKickoffCompletedEvent 包含 crew_name 和 output 字段。
高级用法:Scoped Handlers
对于临时事件处理(适合测试或特定操作),你可以使用 scoped_handlers 上下文管理器:
from crewai.events import crewai_event_bus, CrewKickoffStartedEventwith crewai_event_bus.scoped_handlers():@crewai_event_bus.on(CrewKickoffStartedEvent)def temp_handler(source, event):print("This handler only exists within this context")# Do something that emits events# Outside the context, the temporary handler is removed
使用场景
事件监听器可以用于多种用途:
- 日志与监控:跟踪 Crew 的执行过程,并记录关键事件
- 分析:收集关于 Crew 性能和行为的数据
- 调试:设置临时监听器来排查特定问题
- 集成:将 CrewAI 与外部系统连接,例如监控平台、数据库或通知服务
- 自定义行为:基于特定事件触发自定义动作
最佳实践
- 保持处理器轻量:事件处理器应尽量轻量,避免阻塞性操作
- 错误处理:在事件处理器中加入适当的错误处理,防止异常影响主执行流程
- 资源清理:如果监听器分配了资源,请确保这些资源被正确清理
- 选择性监听:只监听你真正需要处理的事件
- 测试:独立测试你的事件监听器,确保其行为符合预期
通过利用 CrewAI 的事件系统,你可以扩展其功能,并将其无缝集成到现有基础设施中。
