https://blog.csdn.net/Likianta/article/details/90123678 https://blog.csdn.net/u012089823/article/details/82258738 https://elasticsearch-py.readthedocs.io/en/master/async.html https://ddeevv.com/docs/elasticsearch/7.9.1/async.html https://elasticsearch-py.readthedocs.io/en/7.9.1/async.html
安装:
python3 -m pip install elasticsearch[async]
Fastapi 使用ES示例:
from elasticsearch import AsyncElasticsearchfrom fastapi import APIRouter, Query, Pathfrom fastapi.encoders import jsonable_encoderfrom logzero import loggerfrom pydantic import BaseModel, Fieldrouter = APIRouter()# amis_action_data esES_ONLINE_HOST = "10.00.00.00"ES_QA_HOST = "10.00.00.00"ES_AUTH = ('user', 'test')ES_PORT = 8200TUIJIAN_INDEX = "test"# # 本地es# LOCAL_ES_HOST = "127.0.0.1"# LOCAL_ES_PORT = 9200@app.on_event("startup")async def startup_event():app.state.es = AsyncElasticsearch(hosts=[ES_QA_HOST],http_auth=ES_AUTH,port=ES_PORT)# 确保AsyncElasticsearch实例被垃圾回收之前close掉,此方法一旦应用程序被关闭时调用@router.on_event("shutdown")async def app_shutdown():await es.close()@router.get("/search_es_by_item_id", tags=["QA环境"], summary="通过item_id获取推荐库物料")async def search_es_by_item_id(item_id: str=Query(...,title="查询物料id",example="MEETING_9d5c50287")):body = {"query": {"match": {"item_id": item_id}}}queryData = await app.state.es.search(index=TUIJIAN_INDEX,doc_type="item_content",scroll='10m',size=1,body=body)logger.info(f"queryData: {queryData}")return queryDataclass UpdateModel(BaseModel):item_id: str = Field(..., title="物料id", example="MEETING_9d5c502872d34200")update_data: str = Field(..., title="待更新字段", example="title=\"侧丝222\"")@router.post("/update_field_by_item_id", tags=["QA环境"], summary="根据匹配条件修改某个字段")async def update_fields(data: UpdateModel):result = jsonable_encoder(data)body = {"query": {"match": {"item_id": result.get('item_id')}},'script': {"source": f"ctx._source.{result.get('update_data')}"}}update_data = await app.state.es.update_by_query(index=TUIJIAN_INDEX, doc_type="item_content", scroll="10m", body=body)if update_data["updated"] == 1:return {"status_code": 200,"msg": "更新成功",}return {"status_code": 422,"msg": "更新失败",}@router.delete("/delete_field_by_item_id", tags=["QA环境"], summary="删除某个字段")async def delete_fields(item_id: str=Query(..., title="物料id", example="MEETING_9d5c502872d"),delete_data: str=Query(..., title="待删除的字段", example="title")):body = {"query": {"match": {"item_id": item_id}},"script": {"source": f"ctx._source.remove(\"{delete_data}\")"}}delete_data = await app.state.es.update_by_query(index=TUIJIAN_INDEX, doc_type="item_content", scroll="10m", body=body)if delete_data["updated"] == 1:return {"status_code": 200,"msg": "删除成功",}return {"status_code": 422,"msg": "删除失败",}
