这个文档可以帮助新手学习:

    • 在测试环境中安装并运行Elasticsearch
    • 向Elasticsearch中增加数据
    • 搜索和排序数据
    • 在搜索中提取凌乱文档的字段

    第一步:运行Elasticsearch
    快速搭建Elasticsearch方式是在Elastic Cloud上创建一个可管理的Elasticsearch服务(要钱!)。如果你偏爱管理自己的测试环境,可以用Docker运行Elasticsearch。
    安装和运行Elasticsearch
    1.安装和运行Docker Desktop
    2.运行命令:

    1. docker network create elastic
    2. docker pull docker.elastic.co/elasticsearch/elasticsearch:7.14.0
    3. docker run --name es01-test --net elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.14.0

    安装和运行Kibana
    安装Kibana进行界面交互来分析、可视化、管理你的数据。
    1.在新的命令控制台,运行:

    1. docker pull docker.elastic.co/kibana/kibana:7.14.0
    2. docker run --name kib01-test --net elastic -p 5601:5601 -e "ELASTICSEARCH_HOSTS=http://es01-test:9200" docker.elastic.co/kibana/kibana:7.14.0

    2.访问Kibana,http://localhost:5601
    第二步:向Elasticsearch发送请求
    使用REST API来向Elasticsearch发送数据和请求。你可以使用发送HTTP请求来与Elasticsearch交互,列如curl。你也可以使用Kibana的控制台向Elasticsearch发送请求。
    使用curl
    在新的控制台运行下面的curl命令来提交一个简单的API请求。

    1. curl -X GET http://localhost:9200/

    使用Kibana
    1.打开Kibana,找到菜单Dev Tools > Console
    image.png
    2.在控制台中运行下面的API请求
    GET /
    第三步:添加数据
    按照JSON格式向Elasticsearch添加的数据叫做文档,Elasticsearch把这些文档存储在可被搜索的索引中。

    对于时间序列化数据,通常将文档添加到由多个自动生成的backing indices(索引)组成的数据流中。

    数据流需要一个匹配它名字的索引模板。Elasticsearch用这个模板来配置流式的backing indices。发送到数据流中的文档必须有一个@timestamp字段。

    添加单个文档
    提交下面的索引请求来添加单个的日志到logs-my_app-default数据流中。因为logs-my_app-default不存在,这个请求会使用logs--索引模板自动创建它。

    1. POST logs-my_app-default/_doc
    2. {
    3. "@timestamp": "2099-05-06T16:21:15.000Z",
    4. "event": {
    5. "original": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
    6. }
    7. }

    对应的返回数据会包含Elasticsearch为这个文档生成的元数据信息。

    • 返回的_index包含了该文档,Elasticsearch自动生成了索引的名字。
    • 该索引下唯一的id
      1. {
      2. "_index": ".ds-logs-my_app-default-2099-05-06-000001",
      3. "_type": "_doc",
      4. "_id": "gl5MJXMBMk1dGnErnBW8",
      5. "_version": 1,
      6. "result": "created",
      7. "_shards": {
      8. "total": 2,
      9. "successful": 1,
      10. "failed": 0
      11. },
      12. "_seq_no": 0,
      13. "_primary_term": 1
      14. }
      添加多个文档
      在一个请求中使用_bulk端点来添加多个文档。海量数据必须用换行来分割每一个JSON。每一行必须以换行符(\n)结束,包括最后一行也是。
    1. PUT logs-my_app-default/_bulk
    2. { "create": { } }
    3. { "@timestamp": "2099-05-07T16:24:32.000Z", "event": { "original": "192.0.2.242 - - [07/May/2020:16:24:32 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0" } }
    4. { "create": { } }
    5. { "@timestamp": "2099-05-08T16:25:42.000Z", "event": { "original": "192.0.2.255 - - [08/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" } }

    第四步:搜索数据
    文档被索引后几乎立刻就可以被搜索到。下面的搜索请求搜索在logs-my_app-default中所有匹配的数据,并且以@timestamp倒叙排序。

    1. GET logs-my_app-default/_search
    2. {
    3. "query": {
    4. "match_all": { }
    5. },
    6. "sort": [
    7. {
    8. "@timestamp": "desc"
    9. }
    10. ]
    11. }

    一般情况下,返回的hits节点会包含10个匹配的数据。hit中的每个_source包含了提交时原始的json数据。

    1. {
    2. "took": 2,
    3. "timed_out": false,
    4. "_shards": {
    5. "total": 1,
    6. "successful": 1,
    7. "skipped": 0,
    8. "failed": 0
    9. },
    10. "hits": {
    11. "total": {
    12. "value": 3,
    13. "relation": "eq"
    14. },
    15. "max_score": null,
    16. "hits": [
    17. {
    18. "_index": ".ds-logs-my_app-default-2099-05-06-000001",
    19. "_type": "_doc",
    20. "_id": "PdjWongB9KPnaVm2IyaL",
    21. "_score": null,
    22. "_source": {
    23. "@timestamp": "2099-05-08T16:25:42.000Z",
    24. "event": {
    25. "original": "192.0.2.255 - - [08/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638"
    26. }
    27. },
    28. "sort": [
    29. 4081940742000
    30. ]
    31. },
    32. ...
    33. ]
    34. }
    35. }

    获取指定的字段
    解析大型文档的整个_source是非常笨重的。如果要从返回的响应中排除这种它,把_source参数设置为false。使用fields参数来检索你想要的字段。

    1. GET logs-my_app-default/_search
    2. {
    3. "query": {
    4. "match_all": { }
    5. },
    6. "fields": [
    7. "@timestamp"
    8. ],
    9. "_source": false,
    10. "sort": [
    11. {
    12. "@timestamp": "desc"
    13. }
    14. ]
    15. }

    返回的响应中,每个hit字段的field是一个简单数组。

    1. {
    2. ...
    3. "hits": {
    4. ...
    5. "hits": [
    6. {
    7. "_index": ".ds-logs-my_app-default-2099-05-06-000001",
    8. "_type": "_doc",
    9. "_id": "PdjWongB9KPnaVm2IyaL",
    10. "_score": null,
    11. "fields": {
    12. "@timestamp": [
    13. "2099-05-08T16:25:42.000Z"
    14. ]
    15. },
    16. "sort": [
    17. 4081940742000
    18. ]
    19. },
    20. ...
    21. ]
    22. }
    23. }

    搜索时间范围内数据
    搜索时间段或者IP段的数据,使用range查询。

    1. GET logs-my_app-default/_search
    2. {
    3. "query": {
    4. "range": {
    5. "@timestamp": {
    6. "gte": "2099-05-05",
    7. "lt": "2099-05-08"
    8. }
    9. }
    10. },
    11. "fields": [
    12. "@timestamp"
    13. ],
    14. "_source": false,
    15. "sort": [
    16. {
    17. "@timestamp": "desc"
    18. }
    19. ]
    20. }

    您可以使用日期数学来定义相对时间范围,下面的请求目的是搜索前一天的数据,因此在logs-my_app-default中没有数据匹配。

    1. GET logs-my_app-default/_search
    2. {
    3. "query": {
    4. "range": {
    5. "@timestamp": {
    6. "gte": "now-1d/d",
    7. "lt": "now/d"
    8. }
    9. }
    10. },
    11. "fields": [
    12. "@timestamp"
    13. ],
    14. "_source": false,
    15. "sort": [
    16. {
    17. "@timestamp": "desc"
    18. }
    19. ]
    20. }

    从非结构化的数据中提取字段
    你可以在搜索中从非结构化的数据中提取runtime字段,例如日志消息。

    下面的搜索从从event.original提取了source.ip字段。在fields参数中添加source.ip,使得返回的响应中包含这些字段。

    1. GET logs-my_app-default/_search
    2. {
    3. "runtime_mappings": {
    4. "source.ip": {
    5. "type": "ip",
    6. "script": """
    7. String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "event.original" ].value)?.sourceip;
    8. if (sourceip != null) emit(sourceip);
    9. """
    10. }
    11. },
    12. "query": {
    13. "range": {
    14. "@timestamp": {
    15. "gte": "2099-05-05",
    16. "lt": "2099-05-08"
    17. }
    18. }
    19. },
    20. "fields": [
    21. "@timestamp",
    22. "source.ip"
    23. ],
    24. "_source": false,
    25. "sort": [
    26. {
    27. "@timestamp": "desc"
    28. }
    29. ]
    30. }

    联合搜索
    你可以使用bool查询来组合多个查询。下面的搜索示例组合了两个范围查询:一个使用@timestamp,另一个使用source.ip字段。

    1. GET logs-my_app-default/_search
    2. {
    3. "runtime_mappings": {
    4. "source.ip": {
    5. "type": "ip",
    6. "script": """
    7. String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "event.original" ].value)?.sourceip;
    8. if (sourceip != null) emit(sourceip);
    9. """
    10. }
    11. },
    12. "query": {
    13. "bool": {
    14. "filter": [
    15. {
    16. "range": {
    17. "@timestamp": {
    18. "gte": "2099-05-05",
    19. "lt": "2099-05-08"
    20. }
    21. }
    22. },
    23. {
    24. "range": {
    25. "source.ip": {
    26. "gte": "192.0.2.0",
    27. "lte": "192.0.2.240"
    28. }
    29. }
    30. }
    31. ]
    32. }
    33. },
    34. "fields": [
    35. "@timestamp",
    36. "source.ip"
    37. ],
    38. "_source": false,
    39. "sort": [
    40. {
    41. "@timestamp": "desc"
    42. }
    43. ]
    44. }

    聚合数据
    使用聚合来总结数据,例如指标,统计,分析。

    下面的搜索使用聚合和http.response.body.bytes运行时字段来计算average_response_size。这个请求只会统计匹配的文档。

    1. GET logs-my_app-default/_search
    2. {
    3. "runtime_mappings": {
    4. "http.response.body.bytes": {
    5. "type": "long",
    6. "script": """
    7. String bytes=grok('%{COMMONAPACHELOG}').extract(doc[ "event.original" ].value)?.bytes;
    8. if (bytes != null) emit(Integer.parseInt(bytes));
    9. """
    10. }
    11. },
    12. "aggs": {
    13. "average_response_size":{
    14. "avg": {
    15. "field": "http.response.body.bytes"
    16. }
    17. }
    18. },
    19. "query": {
    20. "bool": {
    21. "filter": [
    22. {
    23. "range": {
    24. "@timestamp": {
    25. "gte": "2099-05-05",
    26. "lt": "2099-05-08"
    27. }
    28. }
    29. }
    30. ]
    31. }
    32. },
    33. "fields": [
    34. "@timestamp",
    35. "http.response.body.bytes"
    36. ],
    37. "_source": false,
    38. "sort": [
    39. {
    40. "@timestamp": "desc"
    41. }
    42. ]
    43. }

    返回的aggregations对象包括了聚合的结果。

    探索更多的选项
    To keep exploring, index more data to your data stream and check out Common search options.

    第五步:清除
    当你结束后,删除你的测试数据流和索引。
    DELETE _data_stream/logs-my_app-default

    你也可以删除你的测试部署。
    停止你的Elasticsearch和Kibana Docker容器,运行:
    docker stop es01-test
    docker stop kib01-test

    移除容器和网络,运行:
    docker network rm elastic
    docker rm es01-test
    docker rm kib01-test

    下一步?

    • 通过设置数据层和ILM,最大限度地利用时间序列数据。有关时间序列数据,请参见使用Elasticsearch
    • 使用Fleet和Elastic Agent来直接从你的数据源中收集日志和metrics并发送到Elasticsearch。详情见Fleet quick start guide。
    • 使用Kibana来探索、可视化、和管理你的Elasticsearch数据。详情见Kibana quick start guide。