https://www.modb.pro/db/144291 https://blog.csdn.net/RoninYang/article/details/121128050 https://aioredis.readthedocs.io/en/latest/getting-started/
import aioredis
# 在FastAPI创建前创建Redis连接
@app.on_event("startup")
async def startup_event():
# app.state.redis = await StrictRedis(
# host=REDIS_CONF['qa']['host'],
# port=REDIS_CONF['qa']['port'],
# password=REDIS_CONF['qa']['pw'],
# # decode_responses=True
# )
# 给app.state添加新的属性值redis,用来存放Redis实例
app.state.redis = aioredis.from_url(''.join([
"redis://:", REDIS_CONF['qa']['pw'], "@",
REDIS_CONF['qa']['host'], ":", str(REDIS_CONF['qa']['port']),
"/0",
# encoding="utf-8",
# decode_responses=True
]))
@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
await app.state.redis.close()
---------------------------------------------------
from fastapi.responses import JSONResponse
from logzero import logger
import json
from fastapi import APIRouter, Path, Query, Request, FastAPI, status
router = APIRouter()
@router.get("/redis_health", tags=["QA Redis"], summary="验证redis可用性")
async def redis_health(request: Request):
# 添加参数request: Request,通过request.app.state.redis获取Redis实例
res = await request.app.state.redis.get("wang##user_info")
if res:
logger.info(f"res: {res}")
return JSONResponse(
content={
"status_code": 200,
"msg": 'success',
"data": json.loads(res.decode('utf-8'))
}
)
else:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content = {"msg": "not found"}
)