https://art049.github.io/odmantic/
from odmantic import AIOEnginefrom motor.motor_asyncio import AsyncIOMotorClientfrom logzero import loggerapp = FastAPI( title="SO测试工具API", description="SO测试工具API", version="0.0.1")@app.on_event("startup")async def startup_event(): try: # 链接mongodb mongodb_str = "mongodb://%s:%s@" % (MONGODB_USER, MONGODB_PASSWORD) if isinstance(MONGODB_URL_LIST, list): for mongodb_url in MONGODB_URL_LIST: mongodb_str += mongodb_url + ":" + str(MONGODB_PORT) + "," mongodb_str = mongodb_str[:-1] else: mongodb_str = "mongodb://" + MONGODB_URL_LIST + ":" + str(MONGODB_PORT) # client = AsyncIOMotorClient("mongodb://localhost:27017/") client = AsyncIOMotorClient(mongodb_str) app.state.mongo = AIOEngine(motor_client=client, database="test") logger.info(f"mongo connect success-->>{app.state.mongo}") except BaseException as e: logger.error(f"链接失败:{str(e)}")app.include_router(recommend_mongo_operation.router)if __name__ == "__main__": uvicorn.run( app="main:app", host="0.0.0.0", port=8082, reload=True, debug=True, log_level="debug", access_log=True, use_colors=True, log_config="so_fastapi/uvicorn_config.json" )------------------------------------------------ from odmantic import Modelfrom fastapi import Request, APIRouterfrom logzero import loggerrouter = APIRouter()class TestModel(Model): name: str age: int@router.post("/save_mongo_data", tags=["mongodb"], summary="插入新数据")async def save_mongo_data(req: Request, data: TestModel): res = await req.app.state.mongo.save(data) logger.info(f"res: {res}") return res@router.get("/get_mongo_data", tags=["mongodb"], summary="查询数据")async def get_mongo_data(req: Request): res = await req.app.state.mongo.find_one(TestModel, {"age": 7888}) logger.info(f"res: {res}") return res