title: 跟我一起学习logstash #标题tags: logstash #标签
date: 2020-05-31
categories: elastic stack # 分类
参考文档:
- 官方文档
- ELKstack中文社区
logstash简介
logstash在elkstack中是承担了一个过滤、定义消息格式的角色,filebeat收集日志发送到kafka,logstash去kafka消费消息,然后通过input、filters、output等一系列操作,将格式化处理后的数据发送到es集群进行存储。
logstash工作示意图如下:

logstash安装比较简单,配置下java环境,下载相应的包即可(官方提供的tar.gz包是免安装的,开箱即用),这里就跳过安装了。
hello world
先用logstash输出一个hello world信息,开始学习之路。
# 启动logstash$ logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'# 终端会等待我们输入,然后输入hello worldhello world # 输出如下{"@timestamp" => 2020-05-12T03:51:23.532Z,"@version" => "1","message" => "hello world","host" => "es2"}
从上面可以看出,logstash会给我们输入的事件添加一些额外的信息,各个信息解释如下:
- @timestamp:用来标记事件的发生事件,因为这个字段涉及到logstash的内部流转,所以必须是一个joda对象,如果你尝试自定义一个字符串的名字为@timestamp的话,那么logstash会直接报错。
- host:标记事件发生在哪里。
- type:标记事件的唯一类型。
- tags:标记事件的某方面属性。这是一个属组,一个事件可以有多个标签。
我们可以随意给事件添加字段或者从事件里删除字段。
使用logstash解析日志
在开始之前,请先去官方下载样本数据集,位置如下:

配置filebeat
在创建Logstash管道之前,将配置Filebeat以将日志行发送到Logstash。filebeat配置如下:
$ cat filebeat.ymlfilebeat.inputs:- type: logenabled: truepaths:- /tmp/logstash-tutorial-dataset # 模板日志文件output.logstash:hosts: ["es2:5044"]$ ./filebeat -c filebeat.yml & # 后台启动此yml文件
配置logstash
$ cat first-pipline.confinput {beats {port => 5044 # 输入是5044端口}}output {stdout { codec => rubydebug } # 输出到屏幕}$ logstash -f first-pipline.conf --config.test_and_exit # 检测配置文件语法并报告任何错误$ logstash -f first-pipline.conf --config.reload.automatic # 如果语法无误,则使用此指令启动
- —config.test_and_exit:解析配置文件并报告任何错误。
- —config.reload.automatic:启用自动重新加载配置,因此不必在每次修改配置文件时都停止并重新启动Logstash。
当logstash启动后,我们可以看到很多下面那样的事件输出到屏幕上,如下:

使用grok过滤器插件解析web日志
在上面输出的事件消息中,可以看到日志消息格式不是很理想,因此,需要使用grok过滤器插件。
插件相关查询指令
bin/logstash-plugin list # 列出已安装的插件bin/logstash-plugin list --verbose # 列出已安装的插件以及版本信息bin/logstash-plugin list --group output # 将列出指定组的所有已安装插件 (input, filter, codec, output)
插件相关安装指令
可以使用下面的指令安装公共资源库上托管的插件:
$ bin/logstash-plugin install logstash-output-kafka
更新插件
$ bin/logstash-plugin update # 更新所有插件$ bin/logstash-plugin update logstash-output-kafka # 仅更新指定插件
删除插件
$ bin/logstash-plugin remove logstash-output-kafka
配置安装代理
由于安装、更新插件等操作,需要访问RubyGems.org,而一般国内会出现超时现象,所以也可以使用下面的指令,为logstash配置代理,通过代理去访问RubyGems.org。
$ export HTTP_PROXY=http://127.0.0.1:3128$ bin/logstash-plugin install logstash-output-kafka
插件相关指令至此结束。
使用grok过滤插件,可以将非结构化日志数据解析为结构化和可查询的内容。
由于grok过滤器插件会在传入的日志数据中查找匹配,因此要定义我们感兴趣的字段。下面是一段经典的web日志格式:
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /projects/xdotool/ HTTP/1.1" 200 12292 "http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
该行开头的IP地址很容易识别,括号中的时间戳也很容易识别。要解析数据,可以使用%{COMBINEDAPACHELOG}grok模式,该模式使用以下模式从日志构造行:
| 信息 | 列名 |
|---|---|
| IP地址 | clientip |
| 用户身份 | ident |
| 用户认证 | auth |
| 时间戳记 | timestamp |
| HTTP动词 | verb |
| 请求体 | request |
| HTTP版本 | httpversion |
| HTTP状态码 | response |
| 字节数 | bytes |
| 推荐连结网址 | referrer |
| 用户代理 | agent |
现在我们再来编辑logstash的配置文件,如下:
$ cat config/first-pipline.confinput {beats {port => 5044}}filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}" }}}output {stdout { codec => rubydebug }}
配置filebeat重新发送消息:
$ rm -rf data/registry/ # 删除filebeat上的注册文件$ filebeat -c filebeat.yml # 重启filebeat
logstash收到的消息如下:
{"bytes" => "12292","host" => {"name" => "es1"},"log" => {"offset" => 23786,"file" => {"path" => "/tmp/logstash-tutorial-dataset"}},"referrer" => "\"http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions\"","message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /projects/xdotool/ HTTP/1.1\" 200 12292 \"http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"","input" => {"type" => "log"},"agent" => {"ephemeral_id" => "1d06239f-272e-4617-881a-11a294b8ff96","type" => "filebeat","version" => "7.6.2","hostname" => "es1","id" => "6d0a05bb-0135-407f-bc56-7f158cb484d3"},"@version" => "1","ecs" => {"version" => "1.4.0"},"httpversion" => "1.1","clientip" => "86.1.76.62","response" => "200","verb" => "GET","auth" => "-","request" => "/projects/xdotool/","tags" => [[0] "beats_input_codec_plain_applied"],"timestamp" => "04/Jan/2015:05:30:37 +0000","@timestamp" => 2020-05-12T14:02:10.125Z,"ident" => "-"}
%{COMBINEDAPACHELOG}模式
%{COMBINEDAPACHELOG} 是logstash自带的匹配模式,它的grok表达式如下:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:req}
自定义grok语法表达式
grok语法表达式学习参考:logstash过滤插件——grok
一般在logstash中时这样使用grok插件的:
filter {if [type] == "gt_third_ngx" {grok {patterns_dir => ["/opt/logstash/conf/patterns/"]match => {"message" => "%{GT_THIRD}"}}}
而patterns_dir指定的/opt/logstash/conf/patterns/目录下,则有一个名为NGINXACCESS(此文件名是自定义),文件内容如下:
# gt thirdGT_THIRD \[%{TIMESTAMP_ISO8601:time_local}\] %{NOTSPACE:status:int} %{NOTSPACE:uuid} %{NOTSPACE:X_system} %{NOTSPACE:hostname} %{IPORHOST:remote_addr} \[%{NOSQUARE:upstream_addr}\] %{NUMBER:request_time:float} (%{NUMBER:upstraem_response_time}|-) %{NOTSPACE:Method} %{NOTSPACE:proxy_host} %{URIPATH:uri}(%{URIPARAM:uri_param})? %{NUMBER:body_bytes_sent:int} \| %{NOTSPACE:X_debug}
使用geoip插件
参考:配置geoip
该geoip插件主要是提取客户的ip,以便在kibana中可以根据geoip提取的经纬度,画出客户分布图。一般在web日志中,clientip字段包含IP地址(此IP地址必须为公网IP,否则无法匹配)。
由于筛选器是按顺序求值的,因此请确保该geoip部分位于grok配置文件的该部分之后,并且grok和geoip部分都嵌套在该filter部分中。
完整的logstash配置文件应如下所示:
$ cat config/first-pipline.confinput {beats {port => 5044}}filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}" }}geoip {source => "clientip"}}output {stdout { codec => rubydebug }}
保存上面的配置文件后,删除filebeat的registry文件,并重新启动filebeat,则会发现logstash收到的日志消息中输出中包含以下信息:
"geoip" => {"latitude" => 34.7725,"ip" => "114.114.114.114","continent_code" => "AS","country_code2" => "CN","country_code3" => "CN","location" => {"lat" => 34.7725,"lon" => 113.7266},"country_name" => "China","timezone" => "Asia/Shanghai","longitude" => 113.7266},# 上面的信息中包含ip所处国家、城市、经纬度等等详细信息。
geoip的其他配置参数
target
指定Logstash应将geoip数据存储到的字段。例如,如果你有src_ip和dst_ip字段,并且想要两个IP的GeoIP信息,这将很有用。默认保存到geoip字段。
database
Logstash使用的Maxmind数据库文件的路径。默认的数据库是GeoLite2-City。GeoLite2-City、GeoLite2-Country、GeoLite2-ASN是Maxmind支持的免费数据库。GeolP2-City、GeolP2-ISP GeolP2-Country是Maxmind支持的商业数据库。如果没有指定,将默认为与Logstash一起发布的GeoLite2 City数据库。
可以去下载GeoLite2 City数据库文件(需要注册账号)。
建议定期更新自己服务器的geoip数据库(官方免费的约两周左右更新一次)。至于如何配置自动更新本地geoip数据库,我还没研究,知道的小伙伴可以评论里告诉我。
add_field
如果匹配成功,则添加字段。
geoip使用示例
filter {grok {patterns_dir => ["/usr/local/logstash/patterns/"]match => {"message" => "%{FKNGX}"}}geoip {source => "client_ip"target => "geoip"database => "/usr/local/logstash/GeoLite2-City.mmdb" # 指定geoip数据库文件add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]}if "_geoip_lookup_failure" in [tags] { drop { } }mutate {convert => [ "[geoip][coordinates]", "float"] # 将经纬度转换成浮点型(默认都是string类型)}
if “_geoip_lookup_failure” in [tags] { drop { } }:如果是一个内网地址访问产生的日志,由于geoip无法识别其地理位置,则就可以将该条消息删除,当然,这种方式有些粗暴,我们可以换一个姿势来过滤。就是增加if判断,如果是内网地址,就不进行geoip处理,但logstash还是可以将这条消息输出。如下:
if [client_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {geoip {source => "client_ip"target => "geoip"database => "/usr/local/logstash/GeoLite2-City.mmdb"add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]}}
输出如下:
"geoip" => {"coordinates" => [[0] "113.7266",[1] "34.7725"],"continent_code" => "AS","country_code2" => "CN","latitude" => 34.7725,"country_code3" => "CN","location" => {"lon" => 113.7266,"lat" => 34.7725},"timezone" => "Asia/Shanghai","country_name" => "China","ip" => "211.136.64.35","longitude" => 113.7266},
drop
用于删除符合条件的消息,如下:
filter {if [loglevel] == "debug" {drop { }}}
上面的配置中,表示如果loglevel字段值为debug,那么此条消息将被删除,不会被传入到es中。
mutate
mutate插件可以用来改变字段,比如重命名、删除、替换和修改事件中的字段。
其插件执行顺序如下:
- coerce
- rename
- update
- replace
- convert
- gsub
- uppercase
- capitalize
- lowercase
- strip
- remove
- split
- join
- merge
- copy
convert
convert的作用是将字段的值转换为其他类型,例如将字符串转换为整数,如果字段值为数组,则将转换所有成员。如果字段值为hash,则不会采取任何措施。
有效的转换目标及其在不同输入下的预期行为有:
integer:
- 字符串被解析;支持逗号分隔符(例如,字符串”1,000”产生一个值为一千的整数);当字符串具有小数部分时,它们将被截断。
- 浮点数和小数点被截断(例如,3.99变为3,-2.7变为-2)
- 布尔真和布尔假分别被转换为1和0
integer_eu:
- 与相同integer,但是字符串值支持点分隔符和逗号小数(例如,”1.000”产生一个值为一千的整数)
float:
- 整数转换为浮点数
- 字符串被解析;支持逗号分隔符和点小数(例如,”1,000.5”产生一个值为1000的整数)
- 布尔真和布尔假被转换为1.0和0.0分别
float_eu:
- 与相同float,不同之处在于字符串值支持点分隔符和逗号小数(例如,”1.000,5”产生一个值为1000的整数)
string:
- 所有值均已字符串化并使用UTF-8编码
boolean:
- 整数0转换为布尔值 false
- 整数1转换为布尔值 true
- 浮点数0.0转换为布尔值 false
- float 1.0转换为boolean true
- 字符串”true”,”t”,”yes”,”y”,”1”
and“1.0”被转换成布尔true - 字符串”false”,”f”,”no”,”n”,”0”和”0.0”被转换成布尔false
- 空字符串将转换为布尔值 false
- 所有其他值直接通过而不进行转换并记录警告消息
- 对于数组,使用上面的规则分别处理每个值
该插件可以转换同一文档中的多个字段,示例如下:
filter {mutate {convert => {"fieldname" => "integer""booleanfield" => "boolean"}}}
copy
copy用于将字段复制到另一个字段,现有目标字段将被覆盖。
如下:
filter {mutate {copy => { "source_field" => "dest_field" }}}
gsub
gsub用于将正则表达式与字段值匹配,然后将所有匹配项替换为替换字符串。仅支持字符串或字符串数组的字段。对于其他类型的字段,将不采取任何措施。
此配置采用每个字段/替换由3个元素组成的数组。
请注意转义配置文件中的任何反斜杠。
如下:
filter {mutate {gsub => [# 用下划线替换所有的斜杠"fieldname", "/", "_",# 用点号“ . ”替换\、?、#和-"fieldname2", "[\\?#-]", "."]}}
join
join用分隔符连接数组,对非数组字段不执行任何操作。
如:
filter {mutate {join => { "fieldname" => "," }}}
lowercase
lowercase用于将字符串转换为小写字符。
如:
filter {mutate {lowercase => [ "fieldname" ]}}
merge
用于合并两个数组或哈希字段,字符串字段将自动转换为数组,因此,数组+字符串将起作用,字符串+字符串将在dest_field中产生2个条目数组,数组和哈希将不起作用。
如:
filter {mutate {merge => { "dest_field" => "added_field" }}}
coerce
设置存在但为空的字段的默认值。
如:
filter {mutate {# 将'field1'字段的默认值设置为'default_value'coerce => { "field1" => "default_value" }}}
rename
重命名一个或多个字段。
如:
filter {mutate {# 将 'HOSTORIP' 列 重命名为 'client_ip'rename => { "HOSTORIP" => "client_ip" }}}
replace
用新值替换字段的值,新值可以包含%{foo}字符串,以帮助我们从事件的其他部分构建新值。
如:
filter {mutate {replace => { "message" => "%{source_host}: My new message" }}}
split
使用分隔符将字段拆分为数组。仅适用于字符串字段。
如:
filter {mutate {split => { "fieldname" => "," }}}
strip
从字段中删除空格。注意:这仅适用于开头和结尾的空格。
如:
filter {mutate {strip => ["field1", "field2"]}}
update
用新值更新现有字段。如果该字段不存在,则不采取任何操作。
如下:
filter {mutate {update => { "sample" => "My new message" }}}
uppercase
将字符串转换为对应的大写字母。
如:
filter {mutate {uppercase => [ "fieldname" ]}}
tag_on_failure
如果在此筛选器的应用过程中发生故障,则将终止其余操作,并将提供的标记添加到事件中。
通用插件
| 设置 | 输入类型 |
|---|---|
| add_field | hash |
| add_tag | array |
| enable_metric | boolean |
| id | string |
| periodic_flush | boolean |
| remove_field | array |
| remove_tag | array |
这里只挑选几个我常用的记录下,关于详细内容可以参考官方文档。
add_field
如果此筛选成功,字段将添加到此事件。字段名可以是动态的,并包含使用%{ffield}的事件的部分。
如下:
filter {mutate {add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }}}
# 也可以一次添加多个字段:filter {mutate {add_field => {"testField1" => "0""testField2" => "%{name}"}}}
如果事件有字段“somefield”==“hello”,这个过滤器成功时,将添加字段foo hello,如果它是存在的,与上面的值和%(host}块替换为该值从该事件。第二个示例还将添加硬编码字段。
remove_field
用于删除事件中的某个字段。
这个字段非常有用,很多时候,我们已经对日志中的事件进行了分段匹配,如以下输出:
{"@timestamp" => 2020-05-18T23:49:05.822Z,"method" => "POST","http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()","message" => "10.252.201.136 - - [27/Apr/2020:00:00:01 +0800] \"POST /api-stkp/callback HTTP/1.1 200 221\" \"//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()\" \"Java/1.8.0_77\"","host" => "es2","path" => "/var/log/http.log","http_date" => "27/Apr/2020:00:00:01 +0800","url" => "/api-stkp/callback","http_x_forwarded_for" => "Java/1.8.0_77","http_version" => "HTTP/1.1","status" => 200,"body_bytes" => 221,"@version" => "1","client" => "10.252.201.136"}
可以看到messages中的所有内容,我们都使用grok进行了匹配赋值,那么此时,我们还需要messages这个字段么?所以为了减少资源浪费,可以将其删除。
如下:
filter {grok {patterns_dir => ["/usr/local/logstash/patterns/"]match => {"message" => "%{FKNGX}"}}mutate {remove_field => [ "message" ]}}output {stdout { codec => rubydebug }}
输出如下:
{"@timestamp" => 2020-05-18T23:51:45.870Z,"method" => "POST","http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugg"host" => "es2","path" => "/var/log/http.log","http_date" => "27/Apr/2020:00:00:01 +0800","url" => "/api-stkp/callback","http_x_forwarded_for" => "Java/1.8.0_77","http_version" => "HTTP/1.1","status" => 200,"body_bytes" => 221,"@version" => "1","client" => "10.252.201.136"}
解决@timestamp和日志实际时间相差8小时
@timestamp时间默认使用UTC时间,所以会比北京时间慢8个小时,如果不解决这个问题,那么导致的结果就是我们每天在早上8点输出到es的数据才会创建新的索引,对于此问题,我是这么解决的,使用ruby插件,增加新的索引:
ruby {code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"}mutate {remove_field => [ "message","@version","@timestamp" ]# @timestamp就没啥用了,可以删除了}
输出如下:
{"url" => "/api-stkp/callback","path" => "/var/log/http.log","http_x_forwarded_for" => "Java/1.8.0_77","http_version" => "HTTP/1.1","http_date" => "27/Apr/2020:2:59:59 +0800","client" => "10.252.201.136","status" => 200,"body_bytes" => 221,"index_day" => "2020-05-23", # 增加新的索引“index_day”"http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()","host" => "es2","method" => "POST"}
这样,我们在输出到es时,就可以这样指定了:
output {if [type] == "gt_third_ngx" {elasticsearch {hosts => ["elasticsearch1:9200" ,"elasticsearch2:9200", "elasticsearch3:9200"]index => "ngx-%{index_day}"}}}
细心的同学可能已经发现我上面的有问题了,哎,你这不对啊,http_date是 “27/Apr/2020…..”可index_day怎么是2020-05-23,呵呵,那是因为,index_day和http_date压根没什么关系,http_date记录的是日志产生的时间,比如用户2020-05-12 13:00产生的,但logstash是2020-05-23处理这条信息的,那么就会出现上面的情况(在ruby插件中,timestamp时间被转换成了系统时间,然后又保存到index_day字段中),正常情况下,这种情况不会产生,因为从日志采集端到logstash处理,误差可能只有几微秒,但如果我们处理的是几个月之前的日志呢?(logstash刚搭建,要把前几个月的日志都处理掉),那么可以这样写规则:
date {match => [ "http_date","dd/MMM/yyyy:HH:mm:ss Z" ]}# date匹配到的日期保存的target默认为@timestamp。然后ruby再处理@timestamp。ruby {code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"remove_field => [ "message","@version","@timestamp" ]}
然后处理后,输出如下:
{"http_x_forwarded_for" => "Java/1.8.0_77","body_bytes" => 221,"http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()","http_version" => "HTTP/1.1","url" => "/api-stkp/callback","path" => "/var/log/http.log","host" => "es2","index_day" => "2018-04-27", # 这里的日期和http_date就对上了"status" => 200,"client" => "10.252.201.136","method" => "POST","http_date" => "27/Apr/2018:2:59:59 +0800"}
过滤插件中的通用选项
以下所有插件都支持配置:

好了,关于logstash就啃到这里了,日后有东西的话,再接着补充吧。
