Crews
在 crewAI 框架中理解和使用 crew,包括其完整属性与功能。
概览
在 crewAI 中,crew 表示一组协作工作的 agents,它们共同完成一组任务。每个 crew 都定义了任务执行策略、agent 协作方式以及整体工作流。
Crew 属性
| 属性 | 参数 | 描述 | |
|---|---|---|---|
| Tasks | tasks |
分配给 crew 的任务列表。 | |
| Agents | agents |
属于该 crew 的 agent 列表。 | |
| Process (可选) | process |
crew 遵循的流程方式(例如 sequential、hierarchical)。默认值为 sequential。 |
|
| Verbose (可选) | verbose |
执行期间日志记录的详细程度。默认值为 False。 |
|
| Manager LLM (可选) | manager_llm |
分层流程中管理 agent 使用的语言模型。在使用 hierarchical 流程时必须提供。 | |
| Function Calling LLM (可选) | function_calling_llm |
如果传入,crew 将为所有 agent 使用这个 LLM 进行工具的 function calling。每个 agent 也可以拥有自己的 LLM,并覆盖 crew 级别的 function calling LLM。 | |
| Config (可选) | config |
crew 的可选配置,格式可以是 Json 或 Dict[str, Any]。 |
|
| Max RPM (可选) | max_rpm |
crew 在执行期间遵循的每分钟最大请求数。默认值为 None。 |
|
| Memory (可选) | memory |
用于存储执行记忆(短期记忆、长期记忆、实体记忆)。 | |
| Cache (可选) | cache |
是否使用缓存来存储工具执行结果。默认值为 True。 |
|
| Embedder (可选) | embedder |
crew 使用的 embedder 配置。目前主要用于 memory。默认值为 {"provider": "openai"}。 |
|
| Step Callback (可选) | step_callback |
每个 agent 每一步执行后调用的函数。可用于记录 agent 行为或执行其他操作;它不会覆盖 agent 自身的 step_callback。 |
|
| Task Callback (可选) | task_callback |
每个任务完成后调用的函数。适用于监控或任务完成后的额外操作。 | |
| Share Crew (可选) | share_crew |
是否愿意将完整的 crew 信息和执行过程共享给 crewAI 团队,以帮助改进库并用于训练模型。 | |
| Output Log File (可选) | output_log_file |
设为 True 时会将日志保存为当前目录下的 logs.txt,或你也可以提供文件路径。如果文件名以 .json 结尾,则日志为 JSON 格式,否则为 .txt。默认值为 None。 |
|
| Manager Agent (可选) | manager_agent |
manager 用于设置一个自定义 agent,作为管理者使用。 |
|
| Prompt File (可选) | prompt_file |
crew 使用的 prompt JSON 文件路径。 | |
| Planning (可选) | planning |
为 Crew 增加规划能力。启用后,在每次 Crew 迭代前,所有 Crew 数据都会发送给 AgentPlanner 进行任务规划,并将该计划附加到每个任务描述中。 | |
| Planning LLM (可选) | planning_llm |
在规划流程中,AgentPlanner 使用的语言模型。 | |
| Knowledge Sources (可选) | knowledge_sources |
crew 级别可用的知识源,所有 agents 都可以访问。 | |
| Stream (可选) | stream |
启用流式输出,以在 crew 执行期间接收实时更新。返回一个可迭代 chunk 的 CrewStreamingOutput 对象。默认值为 False。 |
max_rpm 属性设置 crew 每分钟可执行的最大请求数,用于避免触发速率限制;如果设置了它,将覆盖各个 agent 自身的 max_rpm 设置。
创建 Crews
在 CrewAI 中,创建 crew 有两种方式:使用 YAML 配置(推荐),或者 直接通过代码定义。
YAML 配置(推荐)
使用 YAML 配置可以让 crew 的定义更清晰、更易维护,并且与 CrewAI 项目中 agent 和 task 的定义方式保持一致。
在按照 Installation 章节创建 CrewAI 项目后,你可以在一个继承自 CrewBase 的类中定义 crew,并通过装饰器定义 agents、tasks 和 crew 本身。
使用装饰器的 Crew 类示例
from crewai import Agent, Crew, Task, Processfrom crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickofffrom crewai.agents.agent_builder.base_agent import BaseAgentfrom typing import List@CrewBaseclass YourCrewName:"""你的 crew 描述"""agents: List[BaseAgent]tasks: List[Task]# YAML 配置文件路径# 如需查看 YAML 中定义的 agent 和 task 示例,请参考:# - Task: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended# - Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommendedagents_config = 'config/agents.yaml'tasks_config = 'config/tasks.yaml'@before_kickoffdef prepare_inputs(self, inputs):# 在 crew 启动前修改输入inputs['additional_data'] = "Some extra information"return inputs@after_kickoffdef process_output(self, output):# 在 crew 完成后修改输出output.raw += "\nProcessed after kickoff."return output@agentdef agent_one(self) -> Agent:return Agent(config=self.agents_config['agent_one'], # type: ignore[index]verbose=True)@agentdef agent_two(self) -> Agent:return Agent(config=self.agents_config['agent_two'], # type: ignore[index]verbose=True)@taskdef task_one(self) -> Task:return Task(config=self.tasks_config['task_one'] # type: ignore[index])@taskdef task_two(self) -> Task:return Task(config=self.tasks_config['task_two'] # type: ignore[index])@crewdef crew(self) -> Crew:return Crew(agents=self.agents, # 由 @agent 装饰器自动收集tasks=self.tasks, # 由 @task 装饰器自动收集process=Process.sequential,verbose=True,)
运行上述代码的方法:
YourCrewName().crew().kickoff(inputs={"any": "input here"})
CrewBase 类以及这些装饰器会自动收集 agents 和 tasks,从而减少手动管理的工作量。
来自 annotations.py 的装饰器概览
CrewAI 在 annotations.py 文件中提供了多个装饰器,用于标记 crew 类中的方法,以便进行特殊处理:
@CrewBase:将类标记为 crew 基类。@agent:标记一个返回Agent对象的方法。@task:标记一个返回Task对象的方法。@crew:标记一个返回Crew对象的方法。@before_kickoff: (可选)标记一个在 crew 启动前执行的方法。@after_kickoff: (可选)标记一个在 crew 完成后执行的方法。
这些装饰器有助于组织 crew 的结构,并自动收集 agents 和 tasks,而无需手动逐一列出。
直接代码定义(可选方式)
另一种方式是不使用 YAML 配置文件,而是直接在代码中定义 crew。
from crewai import Agent, Crew, Task, Processfrom crewai_tools import YourCustomToolclass YourCrewName:def agent_one(self) -> Agent:return Agent(role="Data Analyst",goal="Analyze data trends in the market",backstory="An experienced data analyst with a background in economics",verbose=True,tools=[YourCustomTool()])def agent_two(self) -> Agent:return Agent(role="Market Researcher",goal="Gather information on market dynamics",backstory="A diligent researcher with a keen eye for detail",verbose=True)def task_one(self) -> Task:return Task(description="Collect recent market data and identify trends.",expected_output="A report summarizing key trends in the market.",agent=self.agent_one())def task_two(self) -> Task:return Task(description="Research factors affecting market dynamics.",expected_output="An analysis of factors influencing the market.",agent=self.agent_two())def crew(self) -> Crew:return Crew(agents=[self.agent_one(), self.agent_two()],tasks=[self.task_one(), self.task_two()],process=Process.sequential,verbose=True)
运行上述代码的方法:
YourCrewName().crew().kickoff(inputs={})
在这个例子中:
- agents 和 tasks 直接在类中定义,不使用装饰器。
- 我们手动创建并管理 agent 和 task 的列表。
- 这种方式提供了更高的控制力,但在大型项目中可维护性通常较差。
Crew 输出
在 CrewAI 框架中,crew 的输出被封装在 CrewOutput 类中。
这个类提供了一种结构化的方式来访问 crew 的执行结果,包括原始字符串、JSON 和 Pydantic 模型等多种格式。
CrewOutput 包含最终任务输出结果、token 使用情况,以及各个任务的输出结果。
Crew 输出属性
| 属性 | 参数 | 类型 | 描述 |
|---|---|---|---|
| Raw | raw |
str |
crew 的原始输出。这是默认输出格式。 |
| Pydantic | pydantic |
Optional[BaseModel] |
表示结构化输出的 Pydantic 模型对象。 |
| JSON Dict | json_dict |
Optional[Dict[str, Any]] |
表示 JSON 输出的字典。 |
| Tasks Output | tasks_output |
List[TaskOutput] |
TaskOutput 对象列表,每个对象代表 crew 中一个任务的输出。 |
| Token Usage | token_usage |
Dict[str, Any] |
token 使用摘要,用于洞察执行期间语言模型的表现。 |
Crew 输出的方法与属性
| 方法 / 属性 | 描述 |
|---|---|
| json | 如果输出格式为 JSON,则返回 crew 输出的 JSON 字符串表示。 |
| to_dict | 将 JSON 和 Pydantic 输出转换为字典。 |
| str | 返回 crew 输出的字符串表示,优先级依次为 Pydantic、JSON、raw。 |
访问 Crew 输出
当 crew 执行完成后,可以通过 Crew 对象的 output 属性访问其输出。CrewOutput 类提供了多种与输出交互和展示的方式。
示例
# crew 执行示例crew = Crew(agents=[research_agent, writer_agent],tasks=[research_task, write_article_task],verbose=True)crew_output = crew.kickoff()# 访问 crew 输出print(f"Raw Output: {crew_output.raw}")if crew_output.json_dict:print(f"JSON Output: {json.dumps(crew_output.json_dict, indent=2)}")if crew_output.pydantic:print(f"Pydantic Output: {crew_output.pydantic}")print(f"Tasks Output: {crew_output.tasks_output}")print(f"Token Usage: {crew_output.token_usage}")
访问 Crew 日志
你可以通过将 output_log_file 设置为 True(Boolean) 或 file_name(str) 来查看 crew 执行的实时日志。支持将事件记录为 file_name.txt 和 file_name.json 两种格式。
当设置为 True(Boolean) 时,会保存为 logs.txt。
如果 output_log_file 设置为 False(Boolean) 或 None,则不会生成日志。
# 保存 crew 日志crew = Crew(output_log_file = True) # 日志将保存为 logs.txtcrew = Crew(output_log_file = file_name) # 日志将保存为 file_name.txtcrew = Crew(output_log_file = file_name.txt) # 日志将保存为 file_name.txtcrew = Crew(output_log_file = file_name.json) # 日志将保存为 file_name.json
Memory 的使用
Crews 可以使用 memory(短期记忆、长期记忆和实体记忆)来增强执行效果,并随着时间推移不断学习。这个特性允许 crews 存储和回忆执行记忆,从而帮助决策和任务执行策略的制定。
Cache 的使用
可以使用缓存来存储工具执行结果,从而减少重复执行相同任务的需要,提高整体效率。
Crew 使用指标
在 crew 执行完成后,你可以访问 usage_metrics 属性,以查看该 crew 中所有任务的语言模型(LLM)使用指标。这有助于分析运行效率并发现优化空间。
# 访问 crew 的 usage metricscrew = Crew(agents=[agent1, agent2], tasks=[task1, task2])crew.kickoff()print(crew.usage_metrics)
Crew 执行流程
- Sequential Process:任务按顺序依次执行,形成线性工作流。
- Hierarchical Process:由一个 manager agent 协调 crew,在继续之前负责委派任务并验证结果。注意:这种流程需要提供
manager_llm或manager_agent,并且它对于验证流程执行至关重要。
启动 Crew
当你的 crew 组装完成后,可通过 kickoff() 方法启动工作流。这会根据所定义的流程方式开始执行。
# 启动 crew 的任务执行result = my_crew.kickoff()print(result)
启动 Crew 的不同方式
当你的 crew 组装完成后,可以使用合适的 kickoff 方法来启动。CrewAI 提供了多种方法,以更灵活地控制 kickoff 过程。
同步方法
kickoff():按照定义的流程启动执行。kickoff_for_each():对提供的每个输入事件或集合中的每个项目,顺序执行任务。
异步方法
CrewAI 提供两种异步执行方式:
| 方法 | 类型 | 描述 |
|---|---|---|
akickoff() |
原生异步 | 整个执行链路都使用真正的 async / await |
akickoff_for_each() |
原生异步 | 对列表中的每个输入执行原生异步处理 |
kickoff_async() |
基于线程 | 使用 asyncio.to_thread 包装同步执行 |
kickoff_for_each_async() |
基于线程 | 对每个输入执行基于线程的异步处理 |
akickoff() 和 akickoff_for_each(),因为它们在任务执行、memory 操作和知识检索中都使用原生异步。
# 启动 crew 的任务执行result = my_crew.kickoff()print(result)# kickoff_for_each 的使用示例inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]results = my_crew.kickoff_for_each(inputs=inputs_array)for result in results:print(result)# 使用 akickoff 的原生异步示例inputs = {'topic': 'AI in healthcare'}async_result = await my_crew.akickoff(inputs=inputs)print(async_result)# 使用 akickoff_for_each 的原生异步示例inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]async_results = await my_crew.akickoff_for_each(inputs=inputs_array)for async_result in async_results:print(async_result)# 使用 kickoff_async 的线程异步示例inputs = {'topic': 'AI in healthcare'}async_result = await my_crew.kickoff_async(inputs=inputs)print(async_result)# 使用 kickoff_for_each_async 的线程异步示例inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]async_results = await my_crew.kickoff_for_each_async(inputs=inputs_array)for async_result in async_results:print(async_result)
这些方法让你可以更灵活地管理和执行 crew 中的任务,从而根据需求选择同步或异步工作流。更多异步示例,请参见 Kickoff Crew Asynchronously 指南。
流式 Crew 执行
如果你希望实时查看 crew 的执行过程,可以启用流式输出,在内容生成时即时接收:
# 启用流式输出crew = Crew(agents=[researcher],tasks=[task],stream=True)# 迭代流式输出streaming = crew.kickoff(inputs={"topic": "AI"})for chunk in streaming:print(chunk.content, end="", flush=True)# 获取最终结果result = streaming.result
更多内容请参见 Streaming Crew Execution 指南。
从特定任务重新回放
你现在可以使用 CLI 命令 replay 从某个特定任务开始重新回放。
CrewAI 的 replay 功能允许你通过命令行接口(CLI)从指定任务开始重新执行。运行命令 crewai replay -t <task_id>,即可指定用于回放的 task_id。
现在,kickoff 会将最近一次 kickoff 返回的任务输出保存在本地,以便你能够从该任务继续回放。
使用 CLI 从特定任务重新回放
使用 replay 功能的步骤如下:
- 打开终端或命令提示符。
- 进入你的 CrewAI 项目所在目录。
- 运行以下命令:
查看最近一次 kickoff 的任务 ID:
crewai log-tasks-outputs
然后,从指定任务开始回放:
crewai replay -t <task_id>
这些命令允许你从最近一次 kickoff 的任务开始回放,同时保留此前已执行任务的上下文。
