https://blog.csdn.net/Likianta/article/details/90123678 https://blog.csdn.net/u012089823/article/details/82258738 https://elasticsearch-py.readthedocs.io/en/master/async.html https://ddeevv.com/docs/elasticsearch/7.9.1/async.html https://elasticsearch-py.readthedocs.io/en/7.9.1/async.html
安装:
python3 -m pip install elasticsearch[async]
Fastapi 使用ES示例:
from elasticsearch import AsyncElasticsearch
from fastapi import APIRouter, Query, Path
from fastapi.encoders import jsonable_encoder
from logzero import logger
from pydantic import BaseModel, Field
router = APIRouter()
# amis_action_data es
ES_ONLINE_HOST = "10.00.00.00"
ES_QA_HOST = "10.00.00.00"
ES_AUTH = ('user', 'test')
ES_PORT = 8200
TUIJIAN_INDEX = "test"
# # 本地es
# LOCAL_ES_HOST = "127.0.0.1"
# LOCAL_ES_PORT = 9200
@app.on_event("startup")
async def startup_event():
app.state.es = AsyncElasticsearch(
hosts=[ES_QA_HOST],
http_auth=ES_AUTH,
port=ES_PORT
)
# 确保AsyncElasticsearch实例被垃圾回收之前close掉,此方法一旦应用程序被关闭时调用
@router.on_event("shutdown")
async def app_shutdown():
await es.close()
@router.get("/search_es_by_item_id", tags=["QA环境"], summary="通过item_id获取推荐库物料")
async def search_es_by_item_id(
item_id: str=Query(...,
title="查询物料id",
example="MEETING_9d5c50287"
)):
body = {
"query": {
"match": {
"item_id": item_id
}
}
}
queryData = await app.state.es.search(index=TUIJIAN_INDEX,
doc_type="item_content",
scroll='10m',
size=1,
body=body)
logger.info(f"queryData: {queryData}")
return queryData
class UpdateModel(BaseModel):
item_id: str = Field(..., title="物料id", example="MEETING_9d5c502872d34200")
update_data: str = Field(..., title="待更新字段", example="title=\"侧丝222\"")
@router.post("/update_field_by_item_id", tags=["QA环境"], summary="根据匹配条件修改某个字段")
async def update_fields(data: UpdateModel):
result = jsonable_encoder(data)
body = {
"query": {
"match": {
"item_id": result.get('item_id')
}
},
'script': {
"source": f"ctx._source.{result.get('update_data')}"
}
}
update_data = await app.state.es.update_by_query(index=TUIJIAN_INDEX, doc_type="item_content", scroll="10m", body=body)
if update_data["updated"] == 1:
return {
"status_code": 200,
"msg": "更新成功",
}
return {
"status_code": 422,
"msg": "更新失败",
}
@router.delete("/delete_field_by_item_id", tags=["QA环境"], summary="删除某个字段")
async def delete_fields(
item_id: str=Query(..., title="物料id", example="MEETING_9d5c502872d"),
delete_data: str=Query(..., title="待删除的字段", example="title")):
body = {
"query": {
"match": {
"item_id": item_id
}
},
"script": {
"source": f"ctx._source.remove(\"{delete_data}\")"
}
}
delete_data = await app.state.es.update_by_query(index=TUIJIAN_INDEX, doc_type="item_content", scroll="10m", body=body)
if delete_data["updated"] == 1:
return {
"status_code": 200,
"msg": "删除成功",
}
return {
"status_code": 422,
"msg": "删除失败",
}