1、Logstash介绍

Logstash能够将采集日志、格式化、过滤,最后将数据推送到Elasticsearch存储。

(二)ELK - Logstash - 图1

Input:输入,输入数据可以是Stdin、File、TCP、Redis、Syslog等。

Filter:过滤,将日志格式化。有丰富的过滤插件:Grok正则捕获、Date时间处理、Json编解码、Mutate数据修改等。

Output:输出,输出目标可以是Stdout、File、TCP、Redis、ES等。

2、Logstash部署

  1. 二进制方式部署:
  2. yum install java-1.8.0-openjdk y
  3. cd /opt/elk
  4. tar zxvf logstash-7.9.3.tar.gz
  5. mv logstash-7.9.3 logstash
  1. # vi /usr/lib/systemd/system/logstash.service
  2. [Unit]
  3. Description=logstash
  4. [Service]
  5. ExecStart=/opt/elk/logstash/bin/logstash
  6. ExecReload=/bin/kill -HUP $MAINPID
  7. KillMode=process
  8. Restart=on-failure
  9. [Install]
  10. WantedBy=multi-user.target
  1. # vim config/logstash.yml
  2. pipeline: # 管道配置
  3. batch:
  4. size: 125
  5. delay: 5
  6. path.config: /opt/elk/logstash/conf.d # conf.d目录自己创建
  7. # 定期检查配置是否修改,并重新加载管道。也可以使用SIGHUP信号手动触发
  8. # config.reload.automatic: false
  9. # config.reload.interval: 3s
  10. # http.enabled: true
  11. http.host: 0.0.0.0
  12. http.port: 9600-9700
  13. log.level: info
  14. path.logs: /opt/elk/logstash/logs

3、基本使用

3.1 示例:从标准输入获取日志并打印到标准输出

  1. # 使用 -e 参数,需要注释掉path.config(两个不能同时使用,后面会将-e后的参数写入path.config中)
  2. /opt/elk/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
  3. helloworld
  4. {
  5. "@timestamp" => 2021-11-19T02:06:06.269Z,
  6. "message" => "helloworld",
  7. "@version" => "1",
  8. "host" => "k8s-master1"
  9. }
  1. 默认给日志加的三个字段:
  2. "@timestamp" 标记事件发生的时间点
  3. "host" 标记事件发生的主机
  4. "type" 标记事件的唯一类型
  5. 命令行参数:
  6. -e 字符串形式写配置
  7. -f 指定配置文件
  8. -t 测试配置文件语法

4、输入插件

  1. 输入阶段:从哪里获取日志
  2. 常用插件:
  3. Stdin(一般用于调试)
  4. File
  5. Redis
  6. Beats(例如filebeat

4.1 输入插件:File

  1. File插件:用于读取指定日志文件
  2. 常用字段:
  3. path 日志文件路径,可以使用通配符
  4. exclude 排除采集的日志文件
  5. start_position 指定日志文件什么位置开始读,默认从结尾开始,指定beginning表示从头开始读
  1. 示例:读取日志文件并输出到文件
  2. input {
  3. # 指定采集的日志
  4. file {
  5. path => "/var/log/test/*.log" # 指定采集的日志文件
  6. exclude => "error.log" # 要排除的日志文件
  7. #start_position => "beginning" # 指定读取的日志文件位置,默认是从结尾,beginning从头开始
  8. }
  9. }
  10. # 过滤是可选的
  11. filter {
  12. }
  13. output {
  14. # 指定采集的日志写入位置
  15. file {
  16. path => "/tmp/test.log"
  17. }
  18. }

输入插件都支持的字段:

• add_field 添加一个字段到一个事件,放到事件顶部,一般用于标记日志来源。例如属于哪个项目,哪个应用

• tags 添加任意数量的标签,用于标记日志的其他属性,例如表明访问日志还是错误日志

• type 为所有输入添加一个字段,例如表明日志类型

  1. 示例:配置日志来源
  2. input {
  3. file {
  4. path => "/var/log/test/*.log"
  5. exclude => "error.log"
  6. #start_position => "beginning"
  7. tags => "web" # 标签
  8. tags => "nginx"
  9. type => "access" # 日志类型
  10. add_field => { # 字段
  11. "project" => "microservice" # 例如:微服务项目
  12. "app" => "product" # 微服务下的商品项目
  13. }
  14. }
  15. }

(二)ELK - Logstash - 图2

4.2 输入插件:Beats

Beats插件:接收来自Beats数据采集器发来的数据,例如Filebeat(后面讲解)

常用字段:

• host 监听地址

• port 监听端口

(二)ELK - Logstash - 图3

  1. 示例:
  2. input {
  3. beats {
  4. host => "0.0.0.0"
  5. port => 5044
  6. }
  7. }
  8. filter {
  9. }
  10. output {
  11. file {
  12. path => "/tmp/test.log"
  13. }
  14. }

5、过滤插件

  1. 过滤阶段:将日志格式化处理
  2. 常用插件:
  3. json
  4. kv
  5. grok
  6. geoip
  7. date
  1. 过滤插件都支持的字段:
  2. add_field 如果过滤成功,添加一个字段到这个事件
  3. add_tags 如果过滤成功,添加任意数量的标签到这个事件
  4. remove_field 如果过滤成功,从这个事件移除任意字段
  5. remove_tag 如果过滤成功,从这个事件移除任意标签

5.1 部署Kibana用于查看测试数据

Kibana 是一个图形页面系统,用于对 Elasticsearch 数据可视化

5.1.1 二进制方式部署

  1. 二进制方式部署:
  2. cd /opt/elk
  3. tar zxvf kibana-7.9.3-linux-x86_64.tar.gz
  4. mv kibana-7.9.3-linux-x86_64 kibana
  5. # vi config/kibana.yml
  6. server.port: 5601
  7. server.host: "0.0.0.0"
  8. elasticsearch.hosts: ["http://192.168.31.61:9200"] # 指定集群中一台即可,集群同步
  9. i18n.locale: "zh-CN"
  1. 配置系统服务管理:
  2. # vi /usr/lib/systemd/system/kibana.service
  3. [Unit]
  4. Description=kibana
  5. [Service]
  6. ExecStart=/opt/elk/kibana/bin/kibana --allow-root
  7. ExecReload=/bin/kill -HUP $MAINPID
  8. KillMode=process
  9. Restart=on-failure
  10. [Install]
  11. WantedBy=multi-user.target

访问:http://ip:5601

5.1.2 Logstash指定日志写入es

  1. output {
  2. # 指定采集的日志写入位置(写入到es)
  3. # elasticsearch {
  4. # hosts => ["192.168.6.20:9200", "192.168.6.21:9200", "192.168.6.22:9200"]
  5. # index => "test-%{+YYYY.MM.dd}"
  6. # }
  7. file {
  8. path => "/tmp/test.log"
  9. }
  10. }

5.1.3 过滤插件:Json

  1. JSON插件:接收一个json数据,将其展开为Logstash事件中的数据结构,放到事件顶层。
  2. 常用字段:
  3. source 指定要解析的字段,一般是原始消息message字段
  4. target 将解析的结果放到指定字段,如果不指定,默认在事件的顶层
  1. # 添加过滤 - 使用Json进行解析
  2. filter {
  3. json {
  4. source => "message"
  5. }
  6. }
  1. 模拟数据:
  2. {"remote_addr": "192.168.1.10","url":"/index","status":"200"}
  3. 示例:解析HTTP请求
  4. filter {
  5. json {
  6. source => "message"
  7. }
  8. }

示例:解析HTTP请求

  • 当输入非json字符串,会解析使用

(二)ELK - Logstash - 图4

  • 当输入json字符串后查看解析 - 解析成功

(二)ELK - Logstash - 图5

5.1.4 过滤插件:Json解析到ES,使用kibana查看

  1. output {
  2. # 指定采集的日志写入位置(写入到es)
  3. elasticsearch {
  4. hosts => ["192.168.6.20:9200", "192.168.6.21:9200", "192.168.6.22:9200"]
  5. index => "test-%{+YYYY.MM.dd}"
  6. }
  7. # file {
  8. # path => "/tmp/test.log"
  9. # }
  10. }
  • 注意hosts中必须指定端口
  • index中的+,是一定的

kibana管理页面:discover -> 索引管理

(二)ELK - Logstash - 图6

  • 非json的无法被解析,错误信息如下:
  1. Parsed JSON object/hash requires a target configuration option {:source=>"message", :raw=>"444"}

创建索引模式:只展示以test-开头的索引

索引模式 -> 创建索引模式 test-* 时间戳 (二)ELK - Logstash - 图7 创建索引之后可以通过discover查看日志信息: (二)ELK - Logstash - 图8 查询状态码是200的数据: (二)ELK - Logstash - 图9 #### 5.1.5 过滤插件:KV KV插件:接收一个键值数据,按照指定分隔符解析为Logstash事件中的数据结构,放到事件顶层。 常用字段: • field_split 指定键值分隔符,默认空 bash 模拟数据: www.ctnrs.com?id=1&name=aliang&age=30 示例:解析URL中参数 filter { kv { field_split => "&?" } } (二)ELK - Logstash - 图10 + 可以看到kv的模拟信息已经被解析

5.1.6 过滤插件:Grok

Grok插件:如果采集的日志格式是非结构化的,可以写正则表达式提取,grok是正则表达式支持的实现。

常用字段:

• match 正则匹配模式

• patterns_dir 自定义正则模式文件

(二)ELK - Logstash - 图11

  1. Logstash内置的正则匹配模式,在安装目录下可以看到,路径:
  2. vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns
  3. 正则匹配模式语法格式:%{SYNTAX:SEMANTIC}
  4. SYNTAX 模式名称,模式文件中的第一列
  5. SEMANTIC 匹配文件的字段名
  6. 例如: %{IP:client}

logstash内置的一些正则表达式:

(二)ELK - Logstash - 图12

6、输出插件

7、条件判断