你可以定义在返回响应后运行的后台任务。

这对需要在请求之后执行的操作很有用,但客户端不必在接收响应之前等待操作完成。

包括这些例子:

  • 执行操作后发送的电子邮件通知:
    • 由于连接到电子邮件服务器并发送电子邮件往往很“慢”(几秒钟),您可以立即返回响应并在后台发送电子邮件通知。
  • 处理数据:
    • 例如,假设您收到的文件必须经过一个缓慢的过程,您可以返回一个”Accepted”(HTTP 202)响应并在后台处理它。

使用 BackgroundTasks

首先导入 BackgroundTasks 并在 路径操作函数 中使用类型声明 BackgroundTasks 定义一个参数:

  1. from fastapi import BackgroundTasks, FastAPI
  2. app = FastAPI()
  3. def write_notification(email: str, message=""):
  4. with open("log.txt", mode="w") as email_file:
  5. content = f"notification for {email}: {message}"
  6. email_file.write(content)
  7. @app.post("/send-notification/{email}")
  8. async def send_notification(email: str, background_tasks: BackgroundTasks):
  9. background_tasks.add_task(write_notification, email, message="some notification")
  10. return {"message": "Notification sent in the background"}

FastAPI 会创建一个 BackgroundTasks 类型的对象并作为该参数传入。

创建一个任务函数

创建要作为后台任务运行的函数。

它只是一个可以接收参数的标准函数。

它可以是 async def 或普通的 def 函数,FastAPI 知道如何正确处理。

在这种情况下,任务函数将写入一个文件(模拟发送电子邮件)。

由于写操作不使用 asyncawait,我们用普通的 def 定义函数:

  1. from fastapi import BackgroundTasks, FastAPI
  2. app = FastAPI()
  3. def write_notification(email: str, message=""):
  4. with open("log.txt", mode="w") as email_file: content = f"notification for {email}: {message}" email_file.write(content)
  5. @app.post("/send-notification/{email}")
  6. async def send_notification(email: str, background_tasks: BackgroundTasks):
  7. background_tasks.add_task(write_notification, email, message="some notification")
  8. return {"message": "Notification sent in the background"}

添加后台任务

在你的 路径操作函数 里,用 .add_task() 方法将任务函数传到 后台任务 对象中:

  1. from fastapi import BackgroundTasks, FastAPI
  2. app = FastAPI()
  3. def write_notification(email: str, message=""):
  4. with open("log.txt", mode="w") as email_file:
  5. content = f"notification for {email}: {message}"
  6. email_file.write(content)
  7. @app.post("/send-notification/{email}")
  8. async def send_notification(email: str, background_tasks: BackgroundTasks):
  9. background_tasks.add_task(write_notification, email, message="some notification") return {"message": "Notification sent in the background"}

.add_task() 接收以下参数:

  • 在后台运行的任务函数(write_notification)。
  • 应按顺序传递给任务函数的任意参数序列(email)。
  • 应传递给任务函数的任意关键字参数(message="some notification")。

依赖注入

使用 BackgroundTasks 也适用于依赖注入系统,你可以在多个级别声明 BackgroundTasks 类型的参数:在 路径操作函数 里,在依赖中(可依赖),在子依赖中,等等。

FastAPI 知道在每种情况下该做什么以及如何复用同一对象,因此所有后台任务被合并在一起并且随后在后台运行:

Python 3.10+Python 3.9+Python 3.8+Python 3.10+ 没AnnotatedPython 3.8+ 没Annotated

  1. from typing import Annotated
  2. from fastapi import BackgroundTasks, Depends, FastAPI
  3. app = FastAPI()
  4. def write_log(message: str):
  5. with open("log.txt", mode="a") as log:
  6. log.write(message)
  7. def get_query(background_tasks: BackgroundTasks, q: str | None = None):
  8. if q:
  9. message = f"found query: {q}\n" background_tasks.add_task(write_log, message)
  10. return q
  11. @app.post("/send-notification/{email}")
  12. async def send_notification(
  13. email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)] ):
  14. message = f"message to {email}\n"
  15. background_tasks.add_task(write_log, message) return {"message": "Message sent"}
  1. from typing import Annotated, Union
  2. from fastapi import BackgroundTasks, Depends, FastAPI
  3. app = FastAPI()
  4. def write_log(message: str):
  5. with open("log.txt", mode="a") as log:
  6. log.write(message)
  7. def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
  8. if q:
  9. message = f"found query: {q}\n" background_tasks.add_task(write_log, message)
  10. return q
  11. @app.post("/send-notification/{email}")
  12. async def send_notification(
  13. email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)] ):
  14. message = f"message to {email}\n"
  15. background_tasks.add_task(write_log, message) return {"message": "Message sent"}
  1. from typing import Union
  2. from fastapi import BackgroundTasks, Depends, FastAPI
  3. from typing_extensions import Annotated
  4. app = FastAPI()
  5. def write_log(message: str):
  6. with open("log.txt", mode="a") as log:
  7. log.write(message)
  8. def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
  9. if q:
  10. message = f"found query: {q}\n" background_tasks.add_task(write_log, message)
  11. return q
  12. @app.post("/send-notification/{email}")
  13. async def send_notification(
  14. email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)] ):
  15. message = f"message to {email}\n"
  16. background_tasks.add_task(write_log, message) return {"message": "Message sent"}

尽可能选择使用 Annotated 的版本。

  1. from fastapi import BackgroundTasks, Depends, FastAPI
  2. app = FastAPI()
  3. def write_log(message: str):
  4. with open("log.txt", mode="a") as log:
  5. log.write(message)
  6. def get_query(background_tasks: BackgroundTasks, q: str | None = None):
  7. if q:
  8. message = f"found query: {q}\n" background_tasks.add_task(write_log, message)
  9. return q
  10. @app.post("/send-notification/{email}")
  11. async def send_notification(
  12. email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query) ):
  13. message = f"message to {email}\n"
  14. background_tasks.add_task(write_log, message) return {"message": "Message sent"}

Tip

尽可能选择使用 Annotated 的版本。

  1. from typing import Union
  2. from fastapi import BackgroundTasks, Depends, FastAPI
  3. app = FastAPI()
  4. def write_log(message: str):
  5. with open("log.txt", mode="a") as log:
  6. log.write(message)
  7. def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
  8. if q:
  9. message = f"found query: {q}\n" background_tasks.add_task(write_log, message)
  10. return q
  11. @app.post("/send-notification/{email}")
  12. async def send_notification(
  13. email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query) ):
  14. message = f"message to {email}\n"
  15. background_tasks.add_task(write_log, message) return {"message": "Message sent"}

该示例中,信息会在响应发出 之后 被写到 log.txt 文件。

如果请求中有查询,它将在后台任务中写入日志。

然后另一个在 路径操作函数 生成的后台任务会使用路径参数 email 写入一条信息。

技术细节

BackgroundTasks 类直接来自 starlette.background

它被直接导入/包含到FastAPI以便你可以从 fastapi 导入,并避免意外从 starlette.background 导入备用的 BackgroundTask (后面没有 s)。

通过仅使用 BackgroundTasks (而不是 BackgroundTask),使得能将它作为 路径操作函数 的参数 ,并让FastAPI为您处理其余部分, 就像直接使用 Request 对象。

在FastAPI中仍然可以单独使用 BackgroundTask,但您必须在代码中创建对象,并返回包含它的Starlette Response

更多细节查看 Starlette’s official docs for Background Tasks.

告诫

如果您需要执行繁重的后台计算,并且不一定需要由同一进程运行(例如,您不需要共享内存、变量等),那么使用其他更大的工具(如 Celery)可能更好。

它们往往需要更复杂的配置,即消息/作业队列管理器,如RabbitMQ或Redis,但它们允许您在多个进程中运行后台任务,甚至是在多个服务器中。

要查看示例,查阅 Project Generators,它们都包括已经配置的Celery。

但是,如果您需要从同一个FastAPI应用程序访问变量和对象,或者您需要执行小型后台任务(如发送电子邮件通知),您只需使用 BackgroundTasks 即可。

回顾

导入并使用 BackgroundTasks 通过 路径操作函数 中的参数和依赖项来添加后台任务。