https://www.modb.pro/db/144291 https://blog.csdn.net/RoninYang/article/details/121128050 https://aioredis.readthedocs.io/en/latest/getting-started/

    1. import aioredis
    2. # 在FastAPI创建前创建Redis连接
    3. @app.on_event("startup")
    4. async def startup_event():
    5. # app.state.redis = await StrictRedis(
    6. # host=REDIS_CONF['qa']['host'],
    7. # port=REDIS_CONF['qa']['port'],
    8. # password=REDIS_CONF['qa']['pw'],
    9. # # decode_responses=True
    10. # )
    11. # 给app.state添加新的属性值redis,用来存放Redis实例
    12. app.state.redis = aioredis.from_url(''.join([
    13. "redis://:", REDIS_CONF['qa']['pw'], "@",
    14. REDIS_CONF['qa']['host'], ":", str(REDIS_CONF['qa']['port']),
    15. "/0",
    16. # encoding="utf-8",
    17. # decode_responses=True
    18. ]))
    19. @app.on_event("shutdown")
    20. async def shutdown_event():
    21. app.state.redis.close()
    22. await app.state.redis.close()
    23. ---------------------------------------------------
    24. from fastapi.responses import JSONResponse
    25. from logzero import logger
    26. import json
    27. from fastapi import APIRouter, Path, Query, Request, FastAPI, status
    28. router = APIRouter()
    29. @router.get("/redis_health", tags=["QA Redis"], summary="验证redis可用性")
    30. async def redis_health(request: Request):
    31. # 添加参数request: Request,通过request.app.state.redis获取Redis实例
    32. res = await request.app.state.redis.get("wang##user_info")
    33. if res:
    34. logger.info(f"res: {res}")
    35. return JSONResponse(
    36. content={
    37. "status_code": 200,
    38. "msg": 'success',
    39. "data": json.loads(res.decode('utf-8'))
    40. }
    41. )
    42. else:
    43. return JSONResponse(
    44. status_code=status.HTTP_404_NOT_FOUND,
    45. content = {"msg": "not found"}
    46. )