Crews

在 crewAI 框架中理解和使用 crew,包括其完整属性与功能。

概览

在 crewAI 中,crew 表示一组协作工作的 agents,它们共同完成一组任务。每个 crew 都定义了任务执行策略、agent 协作方式以及整体工作流。

Crew 属性

属性 参数 描述
Tasks tasks 分配给 crew 的任务列表。
Agents agents 属于该 crew 的 agent 列表。
Process (可选) process crew 遵循的流程方式(例如 sequential、hierarchical)。默认值为 sequential
Verbose (可选) verbose 执行期间日志记录的详细程度。默认值为 False
Manager LLM (可选) manager_llm 分层流程中管理 agent 使用的语言模型。在使用 hierarchical 流程时必须提供。
Function Calling LLM (可选) function_calling_llm 如果传入,crew 将为所有 agent 使用这个 LLM 进行工具的 function calling。每个 agent 也可以拥有自己的 LLM,并覆盖 crew 级别的 function calling LLM。
Config (可选) config crew 的可选配置,格式可以是 JsonDict[str, Any]
Max RPM (可选) max_rpm crew 在执行期间遵循的每分钟最大请求数。默认值为 None
Memory (可选) memory 用于存储执行记忆(短期记忆、长期记忆、实体记忆)。
Cache (可选) cache 是否使用缓存来存储工具执行结果。默认值为 True
Embedder (可选) embedder crew 使用的 embedder 配置。目前主要用于 memory。默认值为 {"provider": "openai"}
Step Callback (可选) step_callback 每个 agent 每一步执行后调用的函数。可用于记录 agent 行为或执行其他操作;它不会覆盖 agent 自身的 step_callback
Task Callback (可选) task_callback 每个任务完成后调用的函数。适用于监控或任务完成后的额外操作。
Share Crew (可选) share_crew 是否愿意将完整的 crew 信息和执行过程共享给 crewAI 团队,以帮助改进库并用于训练模型。
Output Log File (可选) output_log_file 设为 True 时会将日志保存为当前目录下的 logs.txt,或你也可以提供文件路径。如果文件名以 .json 结尾,则日志为 JSON 格式,否则为 .txt。默认值为 None
Manager Agent (可选) manager_agent manager 用于设置一个自定义 agent,作为管理者使用。
Prompt File (可选) prompt_file crew 使用的 prompt JSON 文件路径。
Planning (可选) planning 为 Crew 增加规划能力。启用后,在每次 Crew 迭代前,所有 Crew 数据都会发送给 AgentPlanner 进行任务规划,并将该计划附加到每个任务描述中。
Planning LLM (可选) planning_llm 在规划流程中,AgentPlanner 使用的语言模型。
Knowledge Sources (可选) knowledge_sources crew 级别可用的知识源,所有 agents 都可以访问。
Stream (可选) stream 启用流式输出,以在 crew 执行期间接收实时更新。返回一个可迭代 chunk 的 CrewStreamingOutput 对象。默认值为 False
Crew Max RPMmax_rpm 属性设置 crew 每分钟可执行的最大请求数,用于避免触发速率限制;如果设置了它,将覆盖各个 agent 自身的 max_rpm 设置。

创建 Crews

在 CrewAI 中,创建 crew 有两种方式:使用 YAML 配置(推荐),或者 直接通过代码定义

YAML 配置(推荐)

使用 YAML 配置可以让 crew 的定义更清晰、更易维护,并且与 CrewAI 项目中 agent 和 task 的定义方式保持一致。

在按照 Installation 章节创建 CrewAI 项目后,你可以在一个继承自 CrewBase 的类中定义 crew,并通过装饰器定义 agents、tasks 和 crew 本身。

使用装饰器的 Crew 类示例

  1. from crewai import Agent, Crew, Task, Process
  2. from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
  3. from crewai.agents.agent_builder.base_agent import BaseAgent
  4. from typing import List
  5. @CrewBase
  6. class YourCrewName:
  7. """你的 crew 描述"""
  8. agents: List[BaseAgent]
  9. tasks: List[Task]
  10. # YAML 配置文件路径
  11. # 如需查看 YAML 中定义的 agent 和 task 示例,请参考:
  12. # - Task: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
  13. # - Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
  14. agents_config = 'config/agents.yaml'
  15. tasks_config = 'config/tasks.yaml'
  16. @before_kickoff
  17. def prepare_inputs(self, inputs):
  18. # 在 crew 启动前修改输入
  19. inputs['additional_data'] = "Some extra information"
  20. return inputs
  21. @after_kickoff
  22. def process_output(self, output):
  23. # 在 crew 完成后修改输出
  24. output.raw += "\nProcessed after kickoff."
  25. return output
  26. @agent
  27. def agent_one(self) -> Agent:
  28. return Agent(
  29. config=self.agents_config['agent_one'], # type: ignore[index]
  30. verbose=True
  31. )
  32. @agent
  33. def agent_two(self) -> Agent:
  34. return Agent(
  35. config=self.agents_config['agent_two'], # type: ignore[index]
  36. verbose=True
  37. )
  38. @task
  39. def task_one(self) -> Task:
  40. return Task(
  41. config=self.tasks_config['task_one'] # type: ignore[index]
  42. )
  43. @task
  44. def task_two(self) -> Task:
  45. return Task(
  46. config=self.tasks_config['task_two'] # type: ignore[index]
  47. )
  48. @crew
  49. def crew(self) -> Crew:
  50. return Crew(
  51. agents=self.agents, # 由 @agent 装饰器自动收集
  52. tasks=self.tasks, # 由 @task 装饰器自动收集
  53. process=Process.sequential,
  54. verbose=True,
  55. )

运行上述代码的方法:

  1. YourCrewName().crew().kickoff(inputs={"any": "input here"})
任务会按照它们定义的顺序执行。

CrewBase 类以及这些装饰器会自动收集 agents 和 tasks,从而减少手动管理的工作量。

来自 annotations.py 的装饰器概览

CrewAI 在 annotations.py 文件中提供了多个装饰器,用于标记 crew 类中的方法,以便进行特殊处理:

  • @CrewBase:将类标记为 crew 基类。
  • @agent:标记一个返回 Agent 对象的方法。
  • @task:标记一个返回 Task 对象的方法。
  • @crew:标记一个返回 Crew 对象的方法。
  • @before_kickoff: (可选)标记一个在 crew 启动前执行的方法。
  • @after_kickoff: (可选)标记一个在 crew 完成后执行的方法。

这些装饰器有助于组织 crew 的结构,并自动收集 agents 和 tasks,而无需手动逐一列出。

直接代码定义(可选方式)

另一种方式是不使用 YAML 配置文件,而是直接在代码中定义 crew。

  1. from crewai import Agent, Crew, Task, Process
  2. from crewai_tools import YourCustomTool
  3. class YourCrewName:
  4. def agent_one(self) -> Agent:
  5. return Agent(
  6. role="Data Analyst",
  7. goal="Analyze data trends in the market",
  8. backstory="An experienced data analyst with a background in economics",
  9. verbose=True,
  10. tools=[YourCustomTool()]
  11. )
  12. def agent_two(self) -> Agent:
  13. return Agent(
  14. role="Market Researcher",
  15. goal="Gather information on market dynamics",
  16. backstory="A diligent researcher with a keen eye for detail",
  17. verbose=True
  18. )
  19. def task_one(self) -> Task:
  20. return Task(
  21. description="Collect recent market data and identify trends.",
  22. expected_output="A report summarizing key trends in the market.",
  23. agent=self.agent_one()
  24. )
  25. def task_two(self) -> Task:
  26. return Task(
  27. description="Research factors affecting market dynamics.",
  28. expected_output="An analysis of factors influencing the market.",
  29. agent=self.agent_two()
  30. )
  31. def crew(self) -> Crew:
  32. return Crew(
  33. agents=[self.agent_one(), self.agent_two()],
  34. tasks=[self.task_one(), self.task_two()],
  35. process=Process.sequential,
  36. verbose=True
  37. )

运行上述代码的方法:

  1. YourCrewName().crew().kickoff(inputs={})

在这个例子中:

  • agents 和 tasks 直接在类中定义,不使用装饰器。
  • 我们手动创建并管理 agent 和 task 的列表。
  • 这种方式提供了更高的控制力,但在大型项目中可维护性通常较差。

Crew 输出

在 CrewAI 框架中,crew 的输出被封装在 CrewOutput 类中。 这个类提供了一种结构化的方式来访问 crew 的执行结果,包括原始字符串、JSON 和 Pydantic 模型等多种格式。 CrewOutput 包含最终任务输出结果、token 使用情况,以及各个任务的输出结果。

Crew 输出属性

属性 参数 类型 描述
Raw raw str crew 的原始输出。这是默认输出格式。
Pydantic pydantic Optional[BaseModel] 表示结构化输出的 Pydantic 模型对象。
JSON Dict json_dict Optional[Dict[str, Any]] 表示 JSON 输出的字典。
Tasks Output tasks_output List[TaskOutput] TaskOutput 对象列表,每个对象代表 crew 中一个任务的输出。
Token Usage token_usage Dict[str, Any] token 使用摘要,用于洞察执行期间语言模型的表现。

Crew 输出的方法与属性

方法 / 属性 描述
json 如果输出格式为 JSON,则返回 crew 输出的 JSON 字符串表示。
to_dict 将 JSON 和 Pydantic 输出转换为字典。
str 返回 crew 输出的字符串表示,优先级依次为 Pydantic、JSON、raw。

访问 Crew 输出

当 crew 执行完成后,可以通过 Crew 对象的 output 属性访问其输出。CrewOutput 类提供了多种与输出交互和展示的方式。

示例

  1. # crew 执行示例
  2. crew = Crew(
  3. agents=[research_agent, writer_agent],
  4. tasks=[research_task, write_article_task],
  5. verbose=True
  6. )
  7. crew_output = crew.kickoff()
  8. # 访问 crew 输出
  9. print(f"Raw Output: {crew_output.raw}")
  10. if crew_output.json_dict:
  11. print(f"JSON Output: {json.dumps(crew_output.json_dict, indent=2)}")
  12. if crew_output.pydantic:
  13. print(f"Pydantic Output: {crew_output.pydantic}")
  14. print(f"Tasks Output: {crew_output.tasks_output}")
  15. print(f"Token Usage: {crew_output.token_usage}")

访问 Crew 日志

你可以通过将 output_log_file 设置为 True(Boolean)file_name(str) 来查看 crew 执行的实时日志。支持将事件记录为 file_name.txtfile_name.json 两种格式。 当设置为 True(Boolean) 时,会保存为 logs.txt

如果 output_log_file 设置为 False(Boolean)None,则不会生成日志。

  1. # 保存 crew 日志
  2. crew = Crew(output_log_file = True) # 日志将保存为 logs.txt
  3. crew = Crew(output_log_file = file_name) # 日志将保存为 file_name.txt
  4. crew = Crew(output_log_file = file_name.txt) # 日志将保存为 file_name.txt
  5. crew = Crew(output_log_file = file_name.json) # 日志将保存为 file_name.json

Memory 的使用

Crews 可以使用 memory(短期记忆、长期记忆和实体记忆)来增强执行效果,并随着时间推移不断学习。这个特性允许 crews 存储和回忆执行记忆,从而帮助决策和任务执行策略的制定。

Cache 的使用

可以使用缓存来存储工具执行结果,从而减少重复执行相同任务的需要,提高整体效率。

Crew 使用指标

在 crew 执行完成后,你可以访问 usage_metrics 属性,以查看该 crew 中所有任务的语言模型(LLM)使用指标。这有助于分析运行效率并发现优化空间。

  1. # 访问 crew 的 usage metrics
  2. crew = Crew(agents=[agent1, agent2], tasks=[task1, task2])
  3. crew.kickoff()
  4. print(crew.usage_metrics)

Crew 执行流程

  • Sequential Process:任务按顺序依次执行,形成线性工作流。
  • Hierarchical Process:由一个 manager agent 协调 crew,在继续之前负责委派任务并验证结果。注意:这种流程需要提供 manager_llmmanager_agent,并且它对于验证流程执行至关重要。

启动 Crew

当你的 crew 组装完成后,可通过 kickoff() 方法启动工作流。这会根据所定义的流程方式开始执行。

  1. # 启动 crew 的任务执行
  2. result = my_crew.kickoff()
  3. print(result)

启动 Crew 的不同方式

当你的 crew 组装完成后,可以使用合适的 kickoff 方法来启动。CrewAI 提供了多种方法,以更灵活地控制 kickoff 过程。

同步方法

  • kickoff():按照定义的流程启动执行。
  • kickoff_for_each():对提供的每个输入事件或集合中的每个项目,顺序执行任务。

异步方法

CrewAI 提供两种异步执行方式:

方法 类型 描述
akickoff() 原生异步 整个执行链路都使用真正的 async / await
akickoff_for_each() 原生异步 对列表中的每个输入执行原生异步处理
kickoff_async() 基于线程 使用 asyncio.to_thread 包装同步执行
kickoff_for_each_async() 基于线程 对每个输入执行基于线程的异步处理
对于高并发工作负载,推荐使用 akickoff()akickoff_for_each(),因为它们在任务执行、memory 操作和知识检索中都使用原生异步。
  1. # 启动 crew 的任务执行
  2. result = my_crew.kickoff()
  3. print(result)
  4. # kickoff_for_each 的使用示例
  5. inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
  6. results = my_crew.kickoff_for_each(inputs=inputs_array)
  7. for result in results:
  8. print(result)
  9. # 使用 akickoff 的原生异步示例
  10. inputs = {'topic': 'AI in healthcare'}
  11. async_result = await my_crew.akickoff(inputs=inputs)
  12. print(async_result)
  13. # 使用 akickoff_for_each 的原生异步示例
  14. inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
  15. async_results = await my_crew.akickoff_for_each(inputs=inputs_array)
  16. for async_result in async_results:
  17. print(async_result)
  18. # 使用 kickoff_async 的线程异步示例
  19. inputs = {'topic': 'AI in healthcare'}
  20. async_result = await my_crew.kickoff_async(inputs=inputs)
  21. print(async_result)
  22. # 使用 kickoff_for_each_async 的线程异步示例
  23. inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
  24. async_results = await my_crew.kickoff_for_each_async(inputs=inputs_array)
  25. for async_result in async_results:
  26. print(async_result)

这些方法让你可以更灵活地管理和执行 crew 中的任务,从而根据需求选择同步或异步工作流。更多异步示例,请参见 Kickoff Crew Asynchronously 指南。

流式 Crew 执行

如果你希望实时查看 crew 的执行过程,可以启用流式输出,在内容生成时即时接收:

  1. # 启用流式输出
  2. crew = Crew(
  3. agents=[researcher],
  4. tasks=[task],
  5. stream=True
  6. )
  7. # 迭代流式输出
  8. streaming = crew.kickoff(inputs={"topic": "AI"})
  9. for chunk in streaming:
  10. print(chunk.content, end="", flush=True)
  11. # 获取最终结果
  12. result = streaming.result

更多内容请参见 Streaming Crew Execution 指南。

从特定任务重新回放

你现在可以使用 CLI 命令 replay 从某个特定任务开始重新回放。

CrewAI 的 replay 功能允许你通过命令行接口(CLI)从指定任务开始重新执行。运行命令 crewai replay -t <task_id>,即可指定用于回放的 task_id

现在,kickoff 会将最近一次 kickoff 返回的任务输出保存在本地,以便你能够从该任务继续回放。

使用 CLI 从特定任务重新回放

使用 replay 功能的步骤如下:

  1. 打开终端或命令提示符。
  2. 进入你的 CrewAI 项目所在目录。
  3. 运行以下命令:

查看最近一次 kickoff 的任务 ID:

  1. crewai log-tasks-outputs

然后,从指定任务开始回放:

  1. crewai replay -t <task_id>

这些命令允许你从最近一次 kickoff 的任务开始回放,同时保留此前已执行任务的上下文。