Logstash基本语法组成
logstash之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置,而filter就是过滤器插件,可以在这部分实现各种日志过滤功能。
Logstash 组成部分
input —— 输入插件
读取文件(File)
logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个sincedb数据文件的默认路径在
看下面一个事件配置文件:
input {
file {
path => ["/var/log/messages"]
type => "system"
start_position => "beginning"
}
}
// 这里直接输出到控制台 以方便调试
output {
stdout{
codec=>rubydebug
}
}
nohup bin/logstash -f logstash-myconf.conf & ##启动配置
这个配置是监听并接收本机的/var/log/messages文件内容,start_position表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似cat命令,默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。type用来标记事件类型,通常会在输入区域通过type标记事件类型。
标准输入(Stdin)
stdin是从标准输入获取信息,关于stdin的使用,前面已经做过了一些简单的介绍,这里再看一个稍微复杂一点的例子,下面是一个关于stdin的事件配置文件:
input{
stdin{
add_field=>{"key"=>"logstash"} // 添加一个自定义字段 key : logstash
tags=>["add1"] // tags 可以取多个
type=>"test1" // 标记类型
}
}
output {
stdout{
codec=>rubydebug
}
}
codec —— 编码插件
其实我们就已经用过编码插件codec了,也就是这个rubydebug,它就是一种codec,虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。 编码插件(Codec)可以在logstash输入或输出时处理不同类型的数据
Logstash不只是一个input—>filter—>output的数据流,而是一个input—>decode—>filter—>encode—>output的数据流。常见的插件有以下几种
plain
plain是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。下面是一个包含plain编码的事件配置文件
input{
stdin{
}
}
output{
stdout{
codec => "plain" // 对输出进行处理 plain不进行任何处理
}
}
json & json_lines
如果发送给logstash的数据内容为json格式,可以在input字段加入codec=>json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让logstash输出为json格式,可以在output字段加入codec=>json,下面是一个包含json编码的事件配置文件:
input {
stdin {
}
}
output {
stdout {
codec => json
}
}
// 输入
successfully started Logstash API endpoint {:port=>9600}
// 输出
{
"message ": "Successfully started Logstash API endpoint {:port=>9600)",
"@timestamp":" 2018-01-19T15:40:20.972Z",
"host":"filebeatserven",
"@version":"1"
}
这就是json格式的输出,可以看出,json每个字段是key:values格式,多个字段之间通过逗号分隔。有时候,如果json文件比较长,需要换行的话,那么就要用json_lines编码格式了。json编码格式很适合向es传递,并进行存储
filter —— 过滤器插件
Grok 正则捕获
grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。
Grok 的语法规则是:%{语法: 语义}
“语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址
例如输入的内容为:172.16.213.132 [07/Jan/2021:16:24:19 +0800] “GET / HTTP/1.1” 403 5039
语法规则 :%{IP:clientip}
匹配模式将获得的结果为:
clientip: 172.16.213.132
例如:
input{
stdin{}
}
filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
}
}
output{
stdout{
codec => "rubydebug"
}
}
输入内容:
172.16.213.132 [07/Jan/2021:16:24:19 +0800] “GET / HTTP/1.1” 403 5039
输出内容:
{
"@timestamp" => 2021-01-20T13:42:24.2722,
"message" => "172.16.213.132 [07/Jan/2021:16:24:19 +0800] \"GET / HTТP/1.1\" 403 5039",
"timestamp" => "07/Jan/2021:16:24:19 +0800",
"@version" => "1",
"host" => "filebeatserver",
"response" => "403",
"clientip" => "172.16.213.132",
"referren" => "\"GET / HTTP/1.1",
"bytes"=> "5039"
}
此时我们还有原字段message,我们需要将原字段删除
filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
remove_field => ["message"]
}
}
有时候我们需要调试我们的匹配模式,我们可以使用在线的grok正则表达式工具,此在线工具需要科学上网:http://grokdebug.herokuapp.com
时间处理(Date)
date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里,这在之前已经做过简单的介绍。下面是date插件的一个配置示例(这里仅仅列出filter部分):
filter {
grok {
match => ["message", "%{HTTPDATE:timestamp}"] // 匹配好对应日志
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] // 格式化日志
}
mutate {
remove_field => ["timestamp"] // 删除原有日志,这里放在最后,以防在未格式化前被删除
}
}
时间对应格式说明
https://www.elastic.co/guide/en/logstash/7.12/plugins-filters-date.html
数据修改(Mutate)
(1)正则表达式替换匹配字段gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于mutate插件中gsub的示例(仅列出filter部分):
filter {
mutate {
gsub => ["filed_name_1", "/" , "_"]
}
}
这个示例表示将filedname_1字段中所有”/“字符替换为”“。如果没有则不替换
(2)分隔符分割字符串为数组split可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于mutate插件中split的示例(仅列出filter部分):
filter {
mutate {
split => ["filed_name_2", "|"]
}
}
这个示例表示将filed_name_2字段以"|"为区间分隔为数组。
(3)重命名字段rename可以实现重命名某个字段的功能,下面是一个关于mutate插件中rename的示例(仅列出filter部分):
filter {
mutate {
rename => { "old_field" => "new_field" }
}
}
这个示例表示将字段old_field重命名为new_field。
(4)删除字段remove_field可以实现删除某个字段的功能,下面是一个关于mutate插件中remove_field的示例(仅列出filter部分):
filter {
mutate {
remove_field => ["timestamp"]
}
}
这个示例表示将字段timestamp删除。
综合例子:
input {
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
remove_field => [ "message" ]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
rename => { "response" => "response_new" }
convert => [ "response","float" ]
gsub => ["referrer","\"",""]
remove_field => ["timestamp"]
split => ["clientip", "."]
}
}
output {
stdout {
codec => "rubydebug"
}
}
GeoIP 地址查询归类
GeoIP是最常见的免费IP地址归类查询库,当然也有收费版可以使用。GeoIP库可以根据IP 地址提供对应的地域信息,包括国别,省市,经纬度等,此插件对于可视化地图和区域统计非常有用。下面是一个关于GeoIP插件的简单示例(仅列出filter部分)::
filter {
geoip {
source => "ip_field" // 其中,ip_field字段是输出IP地址的一个字段。
}
}
filter插件综合应用实例
下面给出一个业务系统输出的日志格式,由于业务系统输出的日志格式无法更改,因此就需要我们通过logstash的filter过滤功能以及grok插件来获取需要的数据格式,此业务系统输出的日志内容以及原始格式如下:2018-02-09T10:57:42+08:00|~|123.87.240.97|~|Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202Safari/604.1|~|http://m.sina.cn/cm/ads_ck_wap.html|~|1460709836200|~|DF0184266887D0E 可以看出,这段日志都是以“|~|”为区间进行分隔的,那么刚好我们就以“|~|”为区间分隔符,将这段日志内容分割为6个字段。这里通过grok插件进行正则匹配组合就能完成这个功能。
完整的grok正则匹配组合语句如下:
%{TIMESTAMP_ISO8601:localtime}|~|%{IPORHOST:clientip}|~|(%{GREEDYDATA:http_user_agent})|~|(%{DATA:http_referer})|~|%{GREEDYDATA:mediaid}|~|%{GREEDYDATA:osid}
output —— 输出插件
output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:
file: 表示将日志数据写入磁盘上的文件。
elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。
graphite:表示将日志数据发送给graphite,graphite是一种流行的开源工具,用于存储和绘制数据指标。此外,Logstash还支持输出到nagios、hdfs、email(发送邮件)和Exec(调用命令执行)。
输出到标准输出(stdout)
stdout与之前介绍过的stdin插件一样,它是最基础和简单的输出插件,下面是一个配置实例:
output {
stdout {
codec => rubydebug
}
}
stdout插件,主要的功能和用途就是用于调试,这个插件,在前面已经多次使用过。这里不再过多介绍。
保存为文件(file)
file插件可以将输出保存到一个文件中,配置实例如下:
output {
file {
path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
}
}
上面这个配置中,使用了变量匹配,用于自动匹配时间和主机名,这在实际使用中很有帮助。
输出到elasticsearch
Logstash将过滤、分析好的数据输出到elasticsearch中进行存储和查询,是最经常使用的方法。下面是一个配置实例:
output {
elasticsearch {
host => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
index => "logstash-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-web_access_log"
}
}
上面配置中每个配置项含义如下:
host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。
manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
template_name:这个配置项用来设置在Elasticsearch中模板的名称。