定义在返回响应后后台运行的任务,例如像发送邮件这种耗时动作

    1. 在路径操作函数中导入BackgroundTasks并定义一个参数,其类型声明为:BackgroundTasks
    2. 通过.add_task()添加任务

    add_task()作为参数接收:

    • 要在后台运行的任务函数 ( write_notification)。
    • 后面参数为任务函数的所需参数
    1. from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, status
    2. from fastapi.responses import JSONResponse
    3. import uvicorn
    4. import time
    5. app = FastAPI()
    6. def write_notification(email: str, message=""):
    7. with open('log.txt', "a") as email_file:
    8. time.sleep(3)
    9. content = f"输入的邮箱是: {email}, 发送的消息是: {message} \n"
    10. email_file.write(content)
    11. @app.post("/send_nitification/{email}")
    12. async def send_nitification(email: str, backgroud_task: BackgroundTasks):
    13. backgroud_task.add_task(write_notification, email, message="这是发送的消息")
    14. return JSONResponse(status_code=status.HTTP_200_OK,
    15. content={
    16. "code": 200,
    17. "msg": "消息正在后台执行"
    18. })
    19. if __name__ == "__main__":
    20. uvicorn.run(
    21. app="error_handler:app",
    22. host="127.0.0.1",
    23. port=8080,
    24. reload=True,
    25. debug=True,
    26. log_level="debug",
    27. access_log=True,
    28. use_colors=True
    29. )

    image.png