title: 跟我一起学习logstash #标题tags: logstash #标签
date: 2020-05-31
categories: elastic stack # 分类
参考文档:
- 官方文档
- ELKstack中文社区
logstash简介
logstash在elkstack中是承担了一个过滤、定义消息格式的角色,filebeat收集日志发送到kafka,logstash去kafka消费消息,然后通过input、filters、output等一系列操作,将格式化处理后的数据发送到es集群进行存储。
logstash工作示意图如下:
logstash安装比较简单,配置下java环境,下载相应的包即可(官方提供的tar.gz包是免安装的,开箱即用),这里就跳过安装了。
hello world
先用logstash输出一个hello world信息,开始学习之路。
# 启动logstash
$ logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
# 终端会等待我们输入,然后输入hello world
hello world # 输出如下
{
"@timestamp" => 2020-05-12T03:51:23.532Z,
"@version" => "1",
"message" => "hello world",
"host" => "es2"
}
从上面可以看出,logstash会给我们输入的事件添加一些额外的信息,各个信息解释如下:
- @timestamp:用来标记事件的发生事件,因为这个字段涉及到logstash的内部流转,所以必须是一个joda对象,如果你尝试自定义一个字符串的名字为@timestamp的话,那么logstash会直接报错。
- host:标记事件发生在哪里。
- type:标记事件的唯一类型。
- tags:标记事件的某方面属性。这是一个属组,一个事件可以有多个标签。
我们可以随意给事件添加字段或者从事件里删除字段。
使用logstash解析日志
在开始之前,请先去官方下载样本数据集,位置如下:
配置filebeat
在创建Logstash管道之前,将配置Filebeat以将日志行发送到Logstash。filebeat配置如下:
$ cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/logstash-tutorial-dataset # 模板日志文件
output.logstash:
hosts: ["es2:5044"]
$ ./filebeat -c filebeat.yml & # 后台启动此yml文件
配置logstash
$ cat first-pipline.conf
input {
beats {
port => 5044 # 输入是5044端口
}
}
output {
stdout { codec => rubydebug } # 输出到屏幕
}
$ logstash -f first-pipline.conf --config.test_and_exit # 检测配置文件语法并报告任何错误
$ logstash -f first-pipline.conf --config.reload.automatic # 如果语法无误,则使用此指令启动
- —config.test_and_exit:解析配置文件并报告任何错误。
- —config.reload.automatic:启用自动重新加载配置,因此不必在每次修改配置文件时都停止并重新启动Logstash。
当logstash启动后,我们可以看到很多下面那样的事件输出到屏幕上,如下:
使用grok过滤器插件解析web日志
在上面输出的事件消息中,可以看到日志消息格式不是很理想,因此,需要使用grok过滤器插件。
插件相关查询指令
bin/logstash-plugin list # 列出已安装的插件
bin/logstash-plugin list --verbose # 列出已安装的插件以及版本信息
bin/logstash-plugin list --group output # 将列出指定组的所有已安装插件 (input, filter, codec, output)
插件相关安装指令
可以使用下面的指令安装公共资源库上托管的插件:
$ bin/logstash-plugin install logstash-output-kafka
更新插件
$ bin/logstash-plugin update # 更新所有插件
$ bin/logstash-plugin update logstash-output-kafka # 仅更新指定插件
删除插件
$ bin/logstash-plugin remove logstash-output-kafka
配置安装代理
由于安装、更新插件等操作,需要访问RubyGems.org,而一般国内会出现超时现象,所以也可以使用下面的指令,为logstash配置代理,通过代理去访问RubyGems.org。
$ export HTTP_PROXY=http://127.0.0.1:3128
$ bin/logstash-plugin install logstash-output-kafka
插件相关指令至此结束。
使用grok过滤插件,可以将非结构化日志数据解析为结构化和可查询的内容。
由于grok过滤器插件会在传入的日志数据中查找匹配,因此要定义我们感兴趣的字段。下面是一段经典的web日志格式:
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /projects/xdotool/ HTTP/1.1" 200 12292 "http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
该行开头的IP地址很容易识别,括号中的时间戳也很容易识别。要解析数据,可以使用%{COMBINEDAPACHELOG}grok模式,该模式使用以下模式从日志构造行:
信息 | 列名 |
---|---|
IP地址 | clientip |
用户身份 | ident |
用户认证 | auth |
时间戳记 | timestamp |
HTTP动词 | verb |
请求体 | request |
HTTP版本 | httpversion |
HTTP状态码 | response |
字节数 | bytes |
推荐连结网址 | referrer |
用户代理 | agent |
现在我们再来编辑logstash的配置文件,如下:
$ cat config/first-pipline.conf
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
stdout { codec => rubydebug }
}
配置filebeat重新发送消息:
$ rm -rf data/registry/ # 删除filebeat上的注册文件
$ filebeat -c filebeat.yml # 重启filebeat
logstash收到的消息如下:
{
"bytes" => "12292",
"host" => {
"name" => "es1"
},
"log" => {
"offset" => 23786,
"file" => {
"path" => "/tmp/logstash-tutorial-dataset"
}
},
"referrer" => "\"http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions\"",
"message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /projects/xdotool/ HTTP/1.1\" 200 12292 \"http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"input" => {
"type" => "log"
},
"agent" => {
"ephemeral_id" => "1d06239f-272e-4617-881a-11a294b8ff96",
"type" => "filebeat",
"version" => "7.6.2",
"hostname" => "es1",
"id" => "6d0a05bb-0135-407f-bc56-7f158cb484d3"
},
"@version" => "1",
"ecs" => {
"version" => "1.4.0"
},
"httpversion" => "1.1",
"clientip" => "86.1.76.62",
"response" => "200",
"verb" => "GET",
"auth" => "-",
"request" => "/projects/xdotool/",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"timestamp" => "04/Jan/2015:05:30:37 +0000",
"@timestamp" => 2020-05-12T14:02:10.125Z,
"ident" => "-"
}
%{COMBINEDAPACHELOG}模式
%{COMBINEDAPACHELOG} 是logstash自带的匹配模式,它的grok表达式如下:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:req}
自定义grok语法表达式
grok语法表达式学习参考:logstash过滤插件——grok
一般在logstash中时这样使用grok插件的:
filter {
if [type] == "gt_third_ngx" {
grok {
patterns_dir => ["/opt/logstash/conf/patterns/"]
match => {"message" => "%{GT_THIRD}"}
}
}
而patterns_dir
指定的/opt/logstash/conf/patterns/
目录下,则有一个名为NGINXACCESS
(此文件名是自定义),文件内容如下:
# gt third
GT_THIRD \[%{TIMESTAMP_ISO8601:time_local}\] %{NOTSPACE:status:int} %{NOTSPACE:uuid} %{NOTSPACE:X_system} %{NOTSPACE:hostname} %{IPORHOST:remote_addr} \[%{NOSQUARE:upstream_addr}\] %{NUMBER:request_time:float} (%{NUMBER:upstraem_response_time}|-) %{NOTSPACE:Method} %{NOTSPACE:proxy_host} %{URIPATH:uri}(%{URIPARAM:uri_param})? %{NUMBER:body_bytes_sent:int} \| %{NOTSPACE:X_debug}
使用geoip插件
参考:配置geoip
该geoip插件主要是提取客户的ip,以便在kibana中可以根据geoip提取的经纬度,画出客户分布图。一般在web日志中,clientip字段包含IP地址(此IP地址必须为公网IP,否则无法匹配)。
由于筛选器是按顺序求值的,因此请确保该geoip部分位于grok配置文件的该部分之后,并且grok和geoip部分都嵌套在该filter部分中。
完整的logstash配置文件应如下所示:
$ cat config/first-pipline.conf
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
保存上面的配置文件后,删除filebeat的registry文件,并重新启动filebeat,则会发现logstash收到的日志消息中输出中包含以下信息:
"geoip" => {
"latitude" => 34.7725,
"ip" => "114.114.114.114",
"continent_code" => "AS",
"country_code2" => "CN",
"country_code3" => "CN",
"location" => {
"lat" => 34.7725,
"lon" => 113.7266
},
"country_name" => "China",
"timezone" => "Asia/Shanghai",
"longitude" => 113.7266
},
# 上面的信息中包含ip所处国家、城市、经纬度等等详细信息。
geoip的其他配置参数
target
指定Logstash应将geoip数据存储到的字段。例如,如果你有src_ip和dst_ip字段,并且想要两个IP的GeoIP信息,这将很有用。默认保存到geoip字段。
database
Logstash使用的Maxmind数据库文件的路径。默认的数据库是GeoLite2-City。GeoLite2-City、GeoLite2-Country、GeoLite2-ASN是Maxmind支持的免费数据库。GeolP2-City、GeolP2-ISP GeolP2-Country是Maxmind支持的商业数据库。如果没有指定,将默认为与Logstash一起发布的GeoLite2 City数据库。
可以去下载GeoLite2 City数据库文件(需要注册账号)。
建议定期更新自己服务器的geoip数据库(官方免费的约两周左右更新一次)。至于如何配置自动更新本地geoip数据库,我还没研究,知道的小伙伴可以评论里告诉我。
add_field
如果匹配成功,则添加字段。
geoip使用示例
filter {
grok {
patterns_dir => ["/usr/local/logstash/patterns/"]
match => {"message" => "%{FKNGX}"}
}
geoip {
source => "client_ip"
target => "geoip"
database => "/usr/local/logstash/GeoLite2-City.mmdb" # 指定geoip数据库文件
add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
}
if "_geoip_lookup_failure" in [tags] { drop { } }
mutate {
convert => [ "[geoip][coordinates]", "float"] # 将经纬度转换成浮点型(默认都是string类型)
}
if “_geoip_lookup_failure” in [tags] { drop { } }:如果是一个内网地址访问产生的日志,由于geoip无法识别其地理位置,则就可以将该条消息删除,当然,这种方式有些粗暴,我们可以换一个姿势来过滤。就是增加if判断,如果是内网地址,就不进行geoip处理,但logstash还是可以将这条消息输出。如下:
if [client_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
geoip {
source => "client_ip"
target => "geoip"
database => "/usr/local/logstash/GeoLite2-City.mmdb"
add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
}
}
输出如下:
"geoip" => {
"coordinates" => [
[0] "113.7266",
[1] "34.7725"
],
"continent_code" => "AS",
"country_code2" => "CN",
"latitude" => 34.7725,
"country_code3" => "CN",
"location" => {
"lon" => 113.7266,
"lat" => 34.7725
},
"timezone" => "Asia/Shanghai",
"country_name" => "China",
"ip" => "211.136.64.35",
"longitude" => 113.7266
},
drop
用于删除符合条件的消息,如下:
filter {
if [loglevel] == "debug" {
drop { }
}
}
上面的配置中,表示如果loglevel
字段值为debug
,那么此条消息将被删除,不会被传入到es中。
mutate
mutate插件可以用来改变字段,比如重命名、删除、替换和修改事件中的字段。
其插件执行顺序如下:
- coerce
- rename
- update
- replace
- convert
- gsub
- uppercase
- capitalize
- lowercase
- strip
- remove
- split
- join
- merge
- copy
convert
convert的作用是将字段的值转换为其他类型,例如将字符串转换为整数,如果字段值为数组,则将转换所有成员。如果字段值为hash,则不会采取任何措施。
有效的转换目标及其在不同输入下的预期行为有:
integer:
- 字符串被解析;支持逗号分隔符(例如,字符串”1,000”产生一个值为一千的整数);当字符串具有小数部分时,它们将被截断。
- 浮点数和小数点被截断(例如,3.99变为3,-2.7变为-2)
- 布尔真和布尔假分别被转换为1和0
integer_eu:
- 与相同integer,但是字符串值支持点分隔符和逗号小数(例如,”1.000”产生一个值为一千的整数)
float:
- 整数转换为浮点数
- 字符串被解析;支持逗号分隔符和点小数(例如,”1,000.5”产生一个值为1000的整数)
- 布尔真和布尔假被转换为1.0和0.0分别
float_eu:
- 与相同float,不同之处在于字符串值支持点分隔符和逗号小数(例如,”1.000,5”产生一个值为1000的整数)
string:
- 所有值均已字符串化并使用UTF-8编码
boolean:
- 整数0转换为布尔值 false
- 整数1转换为布尔值 true
- 浮点数0.0转换为布尔值 false
- float 1.0转换为boolean true
- 字符串”true”,”t”,”yes”,”y”,”1”
and
“1.0”被转换成布尔true - 字符串”false”,”f”,”no”,”n”,”0”和”0.0”被转换成布尔false
- 空字符串将转换为布尔值 false
- 所有其他值直接通过而不进行转换并记录警告消息
- 对于数组,使用上面的规则分别处理每个值
该插件可以转换同一文档中的多个字段,示例如下:
filter {
mutate {
convert => {
"fieldname" => "integer"
"booleanfield" => "boolean"
}
}
}
copy
copy用于将字段复制到另一个字段,现有目标字段将被覆盖。
如下:
filter {
mutate {
copy => { "source_field" => "dest_field" }
}
}
gsub
gsub用于将正则表达式与字段值匹配,然后将所有匹配项替换为替换字符串。仅支持字符串或字符串数组的字段。对于其他类型的字段,将不采取任何措施。
此配置采用每个字段/替换由3个元素组成的数组。
请注意转义配置文件中的任何反斜杠。
如下:
filter {
mutate {
gsub => [
# 用下划线替换所有的斜杠
"fieldname", "/", "_",
# 用点号“ . ”替换\、?、#和-
"fieldname2", "[\\?#-]", "."
]
}
}
join
join用分隔符连接数组,对非数组字段不执行任何操作。
如:
filter {
mutate {
join => { "fieldname" => "," }
}
}
lowercase
lowercase用于将字符串转换为小写字符。
如:
filter {
mutate {
lowercase => [ "fieldname" ]
}
}
merge
用于合并两个数组或哈希字段,字符串字段将自动转换为数组,因此,数组+字符串将起作用,字符串+字符串将在dest_field中产生2个条目数组,数组和哈希将不起作用。
如:
filter {
mutate {
merge => { "dest_field" => "added_field" }
}
}
coerce
设置存在但为空的字段的默认值。
如:
filter {
mutate {
# 将'field1'字段的默认值设置为'default_value'
coerce => { "field1" => "default_value" }
}
}
rename
重命名一个或多个字段。
如:
filter {
mutate {
# 将 'HOSTORIP' 列 重命名为 'client_ip'
rename => { "HOSTORIP" => "client_ip" }
}
}
replace
用新值替换字段的值,新值可以包含%{foo}
字符串,以帮助我们从事件的其他部分构建新值。
如:
filter {
mutate {
replace => { "message" => "%{source_host}: My new message" }
}
}
split
使用分隔符将字段拆分为数组。仅适用于字符串字段。
如:
filter {
mutate {
split => { "fieldname" => "," }
}
}
strip
从字段中删除空格。注意:这仅适用于开头和结尾的空格。
如:
filter {
mutate {
strip => ["field1", "field2"]
}
}
update
用新值更新现有字段。如果该字段不存在,则不采取任何操作。
如下:
filter {
mutate {
update => { "sample" => "My new message" }
}
}
uppercase
将字符串转换为对应的大写字母。
如:
filter {
mutate {
uppercase => [ "fieldname" ]
}
}
tag_on_failure
如果在此筛选器的应用过程中发生故障,则将终止其余操作,并将提供的标记添加到事件中。
通用插件
设置 | 输入类型 |
---|---|
add_field | hash |
add_tag | array |
enable_metric | boolean |
id | string |
periodic_flush | boolean |
remove_field | array |
remove_tag | array |
这里只挑选几个我常用的记录下,关于详细内容可以参考官方文档。
add_field
如果此筛选成功,字段将添加到此事件。字段名可以是动态的,并包含使用%{ffield}的事件的部分。
如下:
filter {
mutate {
add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
}
}
# 也可以一次添加多个字段:
filter {
mutate {
add_field => {
"testField1" => "0"
"testField2" => "%{name}"
}
}
}
如果事件有字段“somefield”==“hello”,这个过滤器成功时,将添加字段foo hello,如果它是存在的,与上面的值和%(host}块替换为该值从该事件。第二个示例还将添加硬编码字段。
remove_field
用于删除事件中的某个字段。
这个字段非常有用,很多时候,我们已经对日志中的事件进行了分段匹配,如以下输出:
{
"@timestamp" => 2020-05-18T23:49:05.822Z,
"method" => "POST",
"http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()",
"message" => "10.252.201.136 - - [27/Apr/2020:00:00:01 +0800] \"POST /api-stkp/callback HTTP/1.1 200 221\" \"//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()\" \"Java/1.8.0_77\"",
"host" => "es2",
"path" => "/var/log/http.log",
"http_date" => "27/Apr/2020:00:00:01 +0800",
"url" => "/api-stkp/callback",
"http_x_forwarded_for" => "Java/1.8.0_77",
"http_version" => "HTTP/1.1",
"status" => 200,
"body_bytes" => 221,
"@version" => "1",
"client" => "10.252.201.136"
}
可以看到messages中的所有内容,我们都使用grok进行了匹配赋值,那么此时,我们还需要messages这个字段么?所以为了减少资源浪费,可以将其删除。
如下:
filter {
grok {
patterns_dir => ["/usr/local/logstash/patterns/"]
match => {"message" => "%{FKNGX}"}
}
mutate {
remove_field => [ "message" ]
}
}
output {
stdout { codec => rubydebug }
}
输出如下:
{
"@timestamp" => 2020-05-18T23:51:45.870Z,
"method" => "POST",
"http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugg
"host" => "es2",
"path" => "/var/log/http.log",
"http_date" => "27/Apr/2020:00:00:01 +0800",
"url" => "/api-stkp/callback",
"http_x_forwarded_for" => "Java/1.8.0_77",
"http_version" => "HTTP/1.1",
"status" => 200,
"body_bytes" => 221,
"@version" => "1",
"client" => "10.252.201.136"
}
解决@timestamp和日志实际时间相差8小时
@timestamp时间默认使用UTC时间,所以会比北京时间慢8个小时,如果不解决这个问题,那么导致的结果就是我们每天在早上8点输出到es的数据才会创建新的索引,对于此问题,我是这么解决的,使用ruby插件,增加新的索引:
ruby {
code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
}
mutate {
remove_field => [ "message","@version","@timestamp" ]
# @timestamp就没啥用了,可以删除了
}
输出如下:
{
"url" => "/api-stkp/callback",
"path" => "/var/log/http.log",
"http_x_forwarded_for" => "Java/1.8.0_77",
"http_version" => "HTTP/1.1",
"http_date" => "27/Apr/2020:2:59:59 +0800",
"client" => "10.252.201.136",
"status" => 200,
"body_bytes" => 221,
"index_day" => "2020-05-23", # 增加新的索引“index_day”
"http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()",
"host" => "es2",
"method" => "POST"
}
这样,我们在输出到es时,就可以这样指定了:
output {
if [type] == "gt_third_ngx" {
elasticsearch {
hosts => ["elasticsearch1:9200" ,"elasticsearch2:9200", "elasticsearch3:9200"]
index => "ngx-%{index_day}"
}
}
}
细心的同学可能已经发现我上面的有问题了,哎,你这不对啊,http_date是 “27/Apr/2020…..”可index_day
怎么是2020-05-23,呵呵,那是因为,index_day和http_date压根没什么关系,http_date记录的是日志产生的时间,比如用户2020-05-12 13:00产生的,但logstash是2020-05-23处理这条信息的,那么就会出现上面的情况(在ruby插件中,timestamp时间被转换成了系统时间,然后又保存到index_day字段中),正常情况下,这种情况不会产生,因为从日志采集端到logstash处理,误差可能只有几微秒,但如果我们处理的是几个月之前的日志呢?(logstash刚搭建,要把前几个月的日志都处理掉),那么可以这样写规则:
date {
match => [ "http_date","dd/MMM/yyyy:HH:mm:ss Z" ]
}
# date匹配到的日期保存的target默认为@timestamp。然后ruby再处理@timestamp。
ruby {
code => "event.set('index_day', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
remove_field => [ "message","@version","@timestamp" ]
}
然后处理后,输出如下:
{
"http_x_forwarded_for" => "Java/1.8.0_77",
"body_bytes" => 221,
"http_refere" => "//192.168.20.4:5601/app/kibana#/dev_tools/grokdebugger?_g=()",
"http_version" => "HTTP/1.1",
"url" => "/api-stkp/callback",
"path" => "/var/log/http.log",
"host" => "es2",
"index_day" => "2018-04-27", # 这里的日期和http_date就对上了
"status" => 200,
"client" => "10.252.201.136",
"method" => "POST",
"http_date" => "27/Apr/2018:2:59:59 +0800"
}
过滤插件中的通用选项
以下所有插件都支持配置:
好了,关于logstash就啃到这里了,日后有东西的话,再接着补充吧。