https://blog.csdn.net/Likianta/article/details/90123678 https://blog.csdn.net/u012089823/article/details/82258738 https://elasticsearch-py.readthedocs.io/en/master/async.html https://ddeevv.com/docs/elasticsearch/7.9.1/async.html https://elasticsearch-py.readthedocs.io/en/7.9.1/async.html

    安装:

    1. python3 -m pip install elasticsearch[async]

    Fastapi 使用ES示例:

    1. from elasticsearch import AsyncElasticsearch
    2. from fastapi import APIRouter, Query, Path
    3. from fastapi.encoders import jsonable_encoder
    4. from logzero import logger
    5. from pydantic import BaseModel, Field
    6. router = APIRouter()
    7. # amis_action_data es
    8. ES_ONLINE_HOST = "10.00.00.00"
    9. ES_QA_HOST = "10.00.00.00"
    10. ES_AUTH = ('user', 'test')
    11. ES_PORT = 8200
    12. TUIJIAN_INDEX = "test"
    13. # # 本地es
    14. # LOCAL_ES_HOST = "127.0.0.1"
    15. # LOCAL_ES_PORT = 9200
    16. @app.on_event("startup")
    17. async def startup_event():
    18. app.state.es = AsyncElasticsearch(
    19. hosts=[ES_QA_HOST],
    20. http_auth=ES_AUTH,
    21. port=ES_PORT
    22. )
    23. # 确保AsyncElasticsearch实例被垃圾回收之前close掉,此方法一旦应用程序被关闭时调用
    24. @router.on_event("shutdown")
    25. async def app_shutdown():
    26. await es.close()
    27. @router.get("/search_es_by_item_id", tags=["QA环境"], summary="通过item_id获取推荐库物料")
    28. async def search_es_by_item_id(
    29. item_id: str=Query(...,
    30. title="查询物料id",
    31. example="MEETING_9d5c50287"
    32. )):
    33. body = {
    34. "query": {
    35. "match": {
    36. "item_id": item_id
    37. }
    38. }
    39. }
    40. queryData = await app.state.es.search(index=TUIJIAN_INDEX,
    41. doc_type="item_content",
    42. scroll='10m',
    43. size=1,
    44. body=body)
    45. logger.info(f"queryData: {queryData}")
    46. return queryData
    47. class UpdateModel(BaseModel):
    48. item_id: str = Field(..., title="物料id", example="MEETING_9d5c502872d34200")
    49. update_data: str = Field(..., title="待更新字段", example="title=\"侧丝222\"")
    50. @router.post("/update_field_by_item_id", tags=["QA环境"], summary="根据匹配条件修改某个字段")
    51. async def update_fields(data: UpdateModel):
    52. result = jsonable_encoder(data)
    53. body = {
    54. "query": {
    55. "match": {
    56. "item_id": result.get('item_id')
    57. }
    58. },
    59. 'script': {
    60. "source": f"ctx._source.{result.get('update_data')}"
    61. }
    62. }
    63. update_data = await app.state.es.update_by_query(index=TUIJIAN_INDEX, doc_type="item_content", scroll="10m", body=body)
    64. if update_data["updated"] == 1:
    65. return {
    66. "status_code": 200,
    67. "msg": "更新成功",
    68. }
    69. return {
    70. "status_code": 422,
    71. "msg": "更新失败",
    72. }
    73. @router.delete("/delete_field_by_item_id", tags=["QA环境"], summary="删除某个字段")
    74. async def delete_fields(
    75. item_id: str=Query(..., title="物料id", example="MEETING_9d5c502872d"),
    76. delete_data: str=Query(..., title="待删除的字段", example="title")):
    77. body = {
    78. "query": {
    79. "match": {
    80. "item_id": item_id
    81. }
    82. },
    83. "script": {
    84. "source": f"ctx._source.remove(\"{delete_data}\")"
    85. }
    86. }
    87. delete_data = await app.state.es.update_by_query(index=TUIJIAN_INDEX, doc_type="item_content", scroll="10m", body=body)
    88. if delete_data["updated"] == 1:
    89. return {
    90. "status_code": 200,
    91. "msg": "删除成功",
    92. }
    93. return {
    94. "status_code": 422,
    95. "msg": "删除失败",
    96. }