https://www.modb.pro/db/144291 https://blog.csdn.net/RoninYang/article/details/121128050 https://aioredis.readthedocs.io/en/latest/getting-started/
import aioredis# 在FastAPI创建前创建Redis连接@app.on_event("startup")async def startup_event():# app.state.redis = await StrictRedis(# host=REDIS_CONF['qa']['host'],# port=REDIS_CONF['qa']['port'],# password=REDIS_CONF['qa']['pw'],# # decode_responses=True# )# 给app.state添加新的属性值redis,用来存放Redis实例app.state.redis = aioredis.from_url(''.join(["redis://:", REDIS_CONF['qa']['pw'], "@",REDIS_CONF['qa']['host'], ":", str(REDIS_CONF['qa']['port']),"/0",# encoding="utf-8",# decode_responses=True]))@app.on_event("shutdown")async def shutdown_event():app.state.redis.close()await app.state.redis.close()---------------------------------------------------from fastapi.responses import JSONResponsefrom logzero import loggerimport jsonfrom fastapi import APIRouter, Path, Query, Request, FastAPI, statusrouter = APIRouter()@router.get("/redis_health", tags=["QA Redis"], summary="验证redis可用性")async def redis_health(request: Request):# 添加参数request: Request,通过request.app.state.redis获取Redis实例res = await request.app.state.redis.get("wang##user_info")if res:logger.info(f"res: {res}")return JSONResponse(content={"status_code": 200,"msg": 'success',"data": json.loads(res.decode('utf-8'))})else:return JSONResponse(status_code=status.HTTP_404_NOT_FOUND,content = {"msg": "not found"})
