title: 跟我一起学习logstash #标题tags: logstash #标签
date: 2020-05-31
categories: elastic stack # 分类

参考文档:

logstash在elkstack中是承担了一个过滤、定义消息格式的角色,filebeat收集日志发送到kafka,logstash去kafka消费消息,然后通过input、filters、output等一系列操作,将格式化处理后的数据发送到es集群进行存储。

logstash工作示意图如下:

跟我一起学习logstash - 图1

logstash安装比较简单,配置下java环境,下载相应的包即可(官方提供的tar.gz包是免安装的,开箱即用),这里就跳过安装了。

hello world

先用logstash输出一个hello world信息,开始学习之路。

  1. # 启动logstash
  2. $ logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
  3. # 终端会等待我们输入,然后输入hello world
  4. hello world # 输出如下
  5. {
  6. "@timestamp" => 2020-05-12T03:51:23.532Z,
  7. "@version" => "1",
  8. "message" => "hello world",
  9. "host" => "es2"
  10. }

从上面可以看出,logstash会给我们输入的事件添加一些额外的信息,各个信息解释如下:

  • @timestamp:用来标记事件的发生事件,因为这个字段涉及到logstash的内部流转,所以必须是一个joda对象,如果你尝试自定义一个字符串的名字为@timestamp的话,那么logstash会直接报错。
  • host:标记事件发生在哪里。
  • type:标记事件的唯一类型。
  • tags:标记事件的某方面属性。这是一个属组,一个事件可以有多个标签。

我们可以随意给事件添加字段或者从事件里删除字段。

使用logstash解析日志

在开始之前,请先去官方下载样本数据集,位置如下:

跟我一起学习logstash - 图2

配置filebeat

在创建Logstash管道之前,将配置Filebeat以将日志行发送到Logstash。filebeat配置如下:

  1. $ cat filebeat.yml
  2. filebeat.inputs:
  3. - type: log
  4. enabled: true
  5. paths:
  6. - /tmp/logstash-tutorial-dataset # 模板日志文件
  7. output.logstash:
  8. hosts: ["es2:5044"]
  9. $ ./filebeat -c filebeat.yml & # 后台启动此yml文件

配置logstash

  1. $ cat first-pipline.conf
  2. input {
  3. beats {
  4. port => 5044 # 输入是5044端口
  5. }
  6. }
  7. output {
  8. stdout { codec => rubydebug } # 输出到屏幕
  9. }
  10. $ logstash -f first-pipline.conf --config.test_and_exit # 检测配置文件语法并报告任何错误
  11. $ logstash -f first-pipline.conf --config.reload.automatic # 如果语法无误,则使用此指令启动
  • —config.test_and_exit:解析配置文件并报告任何错误。
  • —config.reload.automatic:启用自动重新加载配置,因此不必在每次修改配置文件时都停止并重新启动Logstash。

当logstash启动后,我们可以看到很多下面那样的事件输出到屏幕上,如下:

跟我一起学习logstash - 图3

使用grok过滤器插件解析web日志

在上面输出的事件消息中,可以看到日志消息格式不是很理想,因此,需要使用grok过滤器插件。

插件相关查询指令

  1. bin/logstash-plugin list # 列出已安装的插件
  2. bin/logstash-plugin list --verbose # 列出已安装的插件以及版本信息
  3. bin/logstash-plugin list --group output # 将列出指定组的所有已安装插件 (input, filter, codec, output)

插件相关安装指令

可以使用下面的指令安装公共资源库上托管的插件:

  1. $ bin/logstash-plugin install logstash-output-kafka

更新插件

  1. $ bin/logstash-plugin update # 更新所有插件
  2. $ bin/logstash-plugin update logstash-output-kafka # 仅更新指定插件

删除插件

  1. $ bin/logstash-plugin remove logstash-output-kafka

配置安装代理

由于安装、更新插件等操作,需要访问RubyGems.org,而一般国内会出现超时现象,所以也可以使用下面的指令,为logstash配置代理,通过代理去访问RubyGems.org。

  1. $ export HTTP_PROXY=http://127.0.0.1:3128
  2. $ bin/logstash-plugin install logstash-output-kafka

插件相关指令至此结束。

使用grok过滤插件,可以将非结构化日志数据解析为结构化和可查询的内容。

由于grok过滤器插件会在传入的日志数据中查找匹配,因此要定义我们感兴趣的字段。下面是一段经典的web日志格式:

  1. 86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /projects/xdotool/ HTTP/1.1" 200 12292 "http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"

该行开头的IP地址很容易识别,括号中的时间戳也很容易识别。要解析数据,可以使用%{COMBINEDAPACHELOG}grok模式,该模式使用以下模式从日志构造行:

信息 列名
IP地址 clientip
用户身份 ident
用户认证 auth
时间戳记 timestamp
HTTP动词 verb
请求体 request
HTTP版本 httpversion
HTTP状态码 response
字节数 bytes
推荐连结网址 referrer
用户代理 agent

现在我们再来编辑logstash的配置文件,如下:

  1. $ cat config/first-pipline.conf
  2. input {
  3. beats {
  4. port => 5044
  5. }
  6. }
  7. filter {
  8. grok {
  9. match => { "message" => "%{COMBINEDAPACHELOG}" }
  10. }
  11. }
  12. output {
  13. stdout { codec => rubydebug }
  14. }

配置filebeat重新发送消息:

  1. $ rm -rf data/registry/ # 删除filebeat上的注册文件
  2. $ filebeat -c filebeat.yml # 重启filebeat

logstash收到的消息如下:

  1. {
  2. "bytes" => "12292",
  3. "host" => {
  4. "name" => "es1"
  5. },
  6. "log" => {
  7. "offset" => 23786,
  8. "file" => {
  9. "path" => "/tmp/logstash-tutorial-dataset"
  10. }
  11. },
  12. "referrer" => "\"http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions\"",
  13. "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /projects/xdotool/ HTTP/1.1\" 200 12292 \"http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
  14. "input" => {
  15. "type" => "log"
  16. },
  17. "agent" => {
  18. "ephemeral_id" => "1d06239f-272e-4617-881a-11a294b8ff96",
  19. "type" => "filebeat",
  20. "version" => "7.6.2",
  21. "hostname" => "es1",
  22. "id" => "6d0a05bb-0135-407f-bc56-7f158cb484d3"
  23. },
  24. "@version" => "1",
  25. "ecs" => {
  26. "version" => "1.4.0"
  27. },
  28. "httpversion" => "1.1",
  29. "clientip" => "86.1.76.62",
  30. "response" => "200",
  31. "verb" => "GET",
  32. "auth" => "-",
  33. "request" => "/projects/xdotool/",
  34. "tags" => [
  35. [0] "beats_input_codec_plain_applied"
  36. ],
  37. "timestamp" => "04/Jan/2015:05:30:37 +0000",
  38. "@timestamp" => 2020-05-12T14:02:10.125Z,
  39. "ident" => "-"
  40. }

%{COMBINEDAPACHELOG}模式

%{COMBINEDAPACHELOG} 是logstash自带的匹配模式,它的grok表达式如下:

  1. COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:req}

自定义grok语法表达式

grok语法表达式学习参考:logstash过滤插件——grok

一般在logstash中时这样使用grok插件的:

  1. filter {
  2. if [type] == "gt_third_ngx" {
  3. grok {
  4. patterns_dir => ["/opt/logstash/conf/patterns/"]
  5. match => {"message" => "%{GT_THIRD}"}
  6. }
  7. }

patterns_dir指定的/opt/logstash/conf/patterns/目录下,则有一个名为NGINXACCESS(此文件名是自定义),文件内容如下:

  1. # gt third
  2. GT_THIRD \[%{TIMESTAMP_ISO8601:time_local}\] %{NOTSPACE:status:int} %{NOTSPACE:uuid} %{NOTSPACE:X_system} %{NOTSPACE:hostname} %{IPORHOST:remote_addr} \[%{NOSQUARE:upstream_addr}\] %{NUMBER:request_time:float} (%{NUMBER:upstraem_response_time}|-) %{NOTSPACE:Method} %{NOTSPACE:proxy_host} %{URIPATH:uri}(%{URIPARAM:uri_param})? %{NUMBER:body_bytes_sent:int} \| %{NOTSPACE:X_debug}

使用geoip插件

参考:配置geoip

该geoip插件主要是提取客户的ip,以便在kibana中可以根据geoip提取的经纬度,画出客户分布图。一般在web日志中,clientip字段包含IP地址(此IP地址必须为公网IP,否则无法匹配)。

由于筛选器是按顺序求值的,因此请确保该geoip部分位于grok配置文件的该部分之后,并且grok和geoip部分都嵌套在该filter部分中。

完整的logstash配置文件应如下所示:

  1. $ cat config/first-pipline.conf
  2. input {
  3. beats {
  4. port => 5044
  5. }
  6. }
  7. filter {
  8. grok {
  9. match => { "message" => "%{COMBINEDAPACHELOG}" }
  10. }
  11. geoip {
  12. source => "clientip"
  13. }
  14. }
  15. output {
  16. stdout { codec => rubydebug }
  17. }

保存上面的配置文件后,删除filebeat的registry文件,并重新启动filebeat,则会发现logstash收到的日志消息中输出中包含以下信息:

  1. "geoip" => {
  2. "latitude" => 34.7725,
  3. "ip" => "114.114.114.114",
  4. "continent_code" => "AS",
  5. "country_code2" => "CN",
  6. "country_code3" => "CN",
  7. "location" => {
  8. "lat" => 34.7725,
  9. "lon" => 113.7266
  10. },
  11. "country_name" => "China",
  12. "timezone" => "Asia/Shanghai",
  13. "longitude" => 113.7266
  14. },
  15. # 上面的信息中包含ip所处国家、城市、经纬度等等详细信息。

geoip的其他配置参数

target

指定Logstash应将geoip数据存储到的字段。例如,如果你有src_ip和dst_ip字段,并且想要两个IP的GeoIP信息,这将很有用。默认保存到geoip字段。

database

Logstash使用的Maxmind数据库文件的路径。默认的数据库是GeoLite2-City。GeoLite2-City、GeoLite2-Country、GeoLite2-ASN是Maxmind支持的免费数据库。GeolP2-City、GeolP2-ISP GeolP2-Country是Maxmind支持的商业数据库。如果没有指定,将默认为与Logstash一起发布的GeoLite2 City数据库。

可以去下载GeoLite2 City数据库文件(需要注册账号)。

建议定期更新自己服务器的geoip数据库(官方免费的约两周左右更新一次)。至于如何配置自动更新本地geoip数据库,我还没研究,知道的小伙伴可以评论里告诉我。

add_field

如果匹配成功,则添加字段。

geoip使用示例
  1. filter {
  2. grok {
  3. patterns_dir => ["/usr/local/logstash/patterns/"]
  4. match => {"message" => "%{FKNGX}"}
  5. }
  6. geoip {
  7. source => "client_ip"
  8. target => "geoip"
  9. database => "/usr/local/logstash/GeoLite2-City.mmdb" # 指定geoip数据库文件
  10. add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
  11. add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
  12. }
  13. if "_geoip_lookup_failure" in [tags] { drop { } }
  14. mutate {
  15. convert => [ "[geoip][coordinates]", "float"] # 将经纬度转换成浮点型(默认都是string类型)
  16. }

if “_geoip_lookup_failure” in [tags] { drop { } }:如果是一个内网地址访问产生的日志,由于geoip无法识别其地理位置,则就可以将该条消息删除,当然,这种方式有些粗暴,我们可以换一个姿势来过滤。就是增加if判断,如果是内网地址,就不进行geoip处理,但logstash还是可以将这条消息输出。如下:

  1. if [client_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
  2. geoip {
  3. source => "client_ip"
  4. target => "geoip"
  5. database => "/usr/local/logstash/GeoLite2-City.mmdb"
  6. add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
  7. add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
  8. }
  9. }

输出如下:

  1. "geoip" => {
  2. "coordinates" => [
  3. [0] "113.7266",
  4. [1] "34.7725"
  5. ],
  6. "continent_code" => "AS",
  7. "country_code2" => "CN",
  8. "latitude" => 34.7725,
  9. "country_code3" => "CN",
  10. "location" => {
  11. "lon" => 113.7266,
  12. "lat" => 34.7725
  13. },
  14. "timezone" => "Asia/Shanghai",
  15. "country_name" => "China",
  16. "ip" => "211.136.64.35",
  17. "longitude" => 113.7266
  18. },

drop

用于删除符合条件的消息,如下:

  1. filter {
  2. if [loglevel] == "debug" {
  3. drop { }
  4. }
  5. }

上面的配置中,表示如果loglevel字段值为debug,那么此条消息将被删除,不会被传入到es中。

mutate

mutate插件可以用来改变字段,比如重命名、删除、替换和修改事件中的字段。

其插件执行顺序如下:

  • coerce
  • rename
  • update
  • replace
  • convert
  • gsub
  • uppercase
  • capitalize
  • lowercase
  • strip
  • remove
  • split
  • join
  • merge
  • copy

convert

convert的作用是将字段的值转换为其他类型,例如将字符串转换为整数,如果字段值为数组,则将转换所有成员。如果字段值为hash,则不会采取任何措施。

有效的转换目标及其在不同输入下的预期行为有:

  • integer:

    • 字符串被解析;支持逗号分隔符(例如,字符串”1,000”产生一个值为一千的整数);当字符串具有小数部分时,它们将被截断。
    • 浮点数和小数点被截断(例如,3.99变为3,-2.7变为-2)
    • 布尔真和布尔假分别被转换为1和0
  • integer_eu:

    • 与相同integer,但是字符串值支持点分隔符和逗号小数(例如,”1.000”产生一个值为一千的整数)
  • float:

    • 整数转换为浮点数
    • 字符串被解析;支持逗号分隔符和点小数(例如,”1,000.5”产生一个值为1000的整数)
    • 布尔真和布尔假被转换为1.0和0.0分别
  • float_eu:

    • 与相同float,不同之处在于字符串值支持点分隔符和逗号小数(例如,”1.000,5”产生一个值为1000的整数)
  • string:

    • 所有值均已字符串化并使用UTF-8编码
  • boolean:

    • 整数0转换为布尔值 false
    • 整数1转换为布尔值 true
    • 浮点数0.0转换为布尔值 false
    • float 1.0转换为boolean true
    • 字符串”true”,”t”,”yes”,”y”,”1”and“1.0”被转换成布尔true
    • 字符串”false”,”f”,”no”,”n”,”0”和”0.0”被转换成布尔false
    • 空字符串将转换为布尔值 false
    • 所有其他值直接通过而不进行转换并记录警告消息
    • 对于数组,使用上面的规则分别处理每个值

该插件可以转换同一文档中的多个字段,示例如下:

  1. filter {
  2. mutate {
  3. convert => {
  4. "fieldname" => "integer"
  5. "booleanfield" => "boolean"
  6. }
  7. }
  8. }

copy

copy用于将字段复制到另一个字段,现有目标字段将被覆盖。

如下:

  1. filter {
  2. mutate {
  3. copy => { "source_field" => "dest_field" }
  4. }
  5. }

gsub

gsub用于将正则表达式与字段值匹配,然后将所有匹配项替换为替换字符串。仅支持字符串或字符串数组的字段。对于其他类型的字段,将不采取任何措施。

此配置采用每个字段/替换由3个元素组成的数组。

请注意转义配置文件中的任何反斜杠。

如下:

  1. filter {
  2. mutate {
  3. gsub => [
  4. # 用下划线替换所有的斜杠
  5. "fieldname", "/", "_",
  6. # 用点号“ . ”替换\、?、#和-
  7. "fieldname2", "[\\?#-]", "."
  8. ]
  9. }
  10. }

join

join用分隔符连接数组,对非数组字段不执行任何操作。

如:

  1. filter {
  2. mutate {
  3. join => { "fieldname" => "," }
  4. }
  5. }

lowercase

lowercase用于将字符串转换为小写字符。

如:

  1. filter {
  2. mutate {
  3. lowercase => [ "fieldname" ]
  4. }
  5. }

merge

用于合并两个数组或哈希字段,字符串字段将自动转换为数组,因此,数组+字符串将起作用,字符串+字符串将在dest_field中产生2个条目数组,数组和哈希将不起作用。

如:

  1. filter {
  2. mutate {
  3. merge => { "dest_field" => "added_field" }
  4. }
  5. }

coerce

设置存在但为空的字段的默认值。

如:

  1. filter {
  2. mutate {
  3. # 将'field1'字段的默认值设置为'default_value'
  4. coerce => { "field1" => "default_value" }
  5. }
  6. }

rename

重命名一个或多个字段。

如:

  1. filter {
  2. mutate {
  3. # 将 'HOSTORIP' 列 重命名为 'client_ip'
  4. rename => { "HOSTORIP" => "client_ip" }
  5. }
  6. }

replace

用新值替换字段的值,新值可以包含%{foo}字符串,以帮助我们从事件的其他部分构建新值。

如:

  1. filter {
  2. mutate {
  3. replace => { "message" => "%{source_host}: My new message" }
  4. }
  5. }

split

使用分隔符将字段拆分为数组。仅适用于字符串字段。

如:

  1. filter {
  2. mutate {
  3. split => { "fieldname" => "," }
  4. }
  5. }

strip

从字段中删除空格。注意:这仅适用于开头和结尾的空格。

如:

  1. filter {
  2. mutate {
  3. strip => ["field1", "field2"]
  4. }
  5. }

update

用新值更新现有字段。如果该字段不存在,则不采取任何操作。

如下:

  1. filter {
  2. mutate {
  3. update => { "sample" => "My new message" }
  4. }
  5. }

uppercase

将字符串转换为对应的大写字母。

如:

  1. filter {
  2. mutate {
  3. uppercase => [ "fieldname" ]
  4. }
  5. }

tag_on_failure

如果在此筛选器的应用过程中发生故障,则将终止其余操作,并将提供的标记添加到事件中。

通用插件

设置 输入类型
add_field hash
add_tag array
enable_metric boolean
id string
periodic_flush boolean
remove_field array
remove_tag array

这里只挑选几个我常用的记录下,关于详细内容可以参考官方文档

add_field

如果此筛选成功,字段将添加到此事件。字段名可以是动态的,并包含使用%{ffield}的事件的部分。

如下:

  1. filter {
  2. mutate {
  3. add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
  4. }
  5. }
  1. # 也可以一次添加多个字段:
  2. filter {
  3. mutate {
  4. add_field => {
  5. "testField1" => "0"
  6. "testField2" => "%{name}"
  7. }
  8. }
  9. }

如果事件有字段“somefield”==“hello”,这个过滤器成功时,将添加字段foo hello,如果它是存在的,与上面的值和%(host}块替换为该值从该事件。第二个示例还将添加硬编码字段。

remove_field

用于删除事件中的某个字段。

这个字段非常有用,很多时候,我们已经对日志中的事件进行了分段匹配,如以下输出:

  1. {
  2. "@timestamp" => 2020-05-18T23:49:05.822Z,
  3. "method" => "POST",
  4. "http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()",
  5. "message" => "10.252.201.136 - - [27/Apr/2020:00:00:01 +0800] \"POST /api-stkp/callback HTTP/1.1 200 221\" \"//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()\" \"Java/1.8.0_77\"",
  6. "host" => "es2",
  7. "path" => "/var/log/http.log",
  8. "http_date" => "27/Apr/2020:00:00:01 +0800",
  9. "url" => "/api-stkp/callback",
  10. "http_x_forwarded_for" => "Java/1.8.0_77",
  11. "http_version" => "HTTP/1.1",
  12. "status" => 200,
  13. "body_bytes" => 221,
  14. "@version" => "1",
  15. "client" => "10.252.201.136"
  16. }

可以看到messages中的所有内容,我们都使用grok进行了匹配赋值,那么此时,我们还需要messages这个字段么?所以为了减少资源浪费,可以将其删除。

如下:

  1. filter {
  2. grok {
  3. patterns_dir => ["/usr/local/logstash/patterns/"]
  4. match => {"message" => "%{FKNGX}"}
  5. }
  6. mutate {
  7. remove_field => [ "message" ]
  8. }
  9. }
  10. output {
  11. stdout { codec => rubydebug }
  12. }

输出如下:

  1. {
  2. "@timestamp" => 2020-05-18T23:51:45.870Z,
  3. "method" => "POST",
  4. "http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugg
  5. "host" => "es2",
  6. "path" => "/var/log/http.log",
  7. "http_date" => "27/Apr/2020:00:00:01 +0800",
  8. "url" => "/api-stkp/callback",
  9. "http_x_forwarded_for" => "Java/1.8.0_77",
  10. "http_version" => "HTTP/1.1",
  11. "status" => 200,
  12. "body_bytes" => 221,
  13. "@version" => "1",
  14. "client" => "10.252.201.136"
  15. }

解决@timestamp和日志实际时间相差8小时

@timestamp时间默认使用UTC时间,所以会比北京时间慢8个小时,如果不解决这个问题,那么导致的结果就是我们每天在早上8点输出到es的数据才会创建新的索引,对于此问题,我是这么解决的,使用ruby插件,增加新的索引:

  1. ruby {
  2. code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
  3. }
  4. mutate {
  5. remove_field => [ "message","@version","@timestamp" ]
  6. # @timestamp就没啥用了,可以删除了
  7. }

输出如下:

  1. {
  2. "url" => "/api-stkp/callback",
  3. "path" => "/var/log/http.log",
  4. "http_x_forwarded_for" => "Java/1.8.0_77",
  5. "http_version" => "HTTP/1.1",
  6. "http_date" => "27/Apr/2020:2:59:59 +0800",
  7. "client" => "10.252.201.136",
  8. "status" => 200,
  9. "body_bytes" => 221,
  10. "index_day" => "2020-05-23", # 增加新的索引“index_day”
  11. "http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()",
  12. "host" => "es2",
  13. "method" => "POST"
  14. }

这样,我们在输出到es时,就可以这样指定了:

  1. output {
  2. if [type] == "gt_third_ngx" {
  3. elasticsearch {
  4. hosts => ["elasticsearch1:9200" ,"elasticsearch2:9200", "elasticsearch3:9200"]
  5. index => "ngx-%{index_day}"
  6. }
  7. }
  8. }

细心的同学可能已经发现我上面的有问题了,哎,你这不对啊,http_date是 “27/Apr/2020…..”可index_day怎么是2020-05-23,呵呵,那是因为,index_day和http_date压根没什么关系,http_date记录的是日志产生的时间,比如用户2020-05-12 13:00产生的,但logstash是2020-05-23处理这条信息的,那么就会出现上面的情况(在ruby插件中,timestamp时间被转换成了系统时间,然后又保存到index_day字段中),正常情况下,这种情况不会产生,因为从日志采集端到logstash处理,误差可能只有几微秒,但如果我们处理的是几个月之前的日志呢?(logstash刚搭建,要把前几个月的日志都处理掉),那么可以这样写规则:

  1. date {
  2. match => [ "http_date","dd/MMM/yyyy:HH:mm:ss Z" ]
  3. }
  4. # date匹配到的日期保存的target默认为@timestamp。然后ruby再处理@timestamp。
  5. ruby {
  6. code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
  7. remove_field => [ "message","@version","@timestamp" ]
  8. }

然后处理后,输出如下:

  1. {
  2. "http_x_forwarded_for" => "Java/1.8.0_77",
  3. "body_bytes" => 221,
  4. "http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()",
  5. "http_version" => "HTTP/1.1",
  6. "url" => "/api-stkp/callback",
  7. "path" => "/var/log/http.log",
  8. "host" => "es2",
  9. "index_day" => "2018-04-27", # 这里的日期和http_date就对上了
  10. "status" => 200,
  11. "client" => "10.252.201.136",
  12. "method" => "POST",
  13. "http_date" => "27/Apr/2018:2:59:59 +0800"
  14. }

过滤插件中的通用选项

以下所有插件都支持配置:

跟我一起学习logstash - 图4

好了,关于logstash就啃到这里了,日后有东西的话,再接着补充吧。