Logstash基本语法组成

logstash之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置,而filter就是过滤器插件,可以在这部分实现各种日志过滤功能。

Logstash 组成部分

input —— 输入插件

读取文件(File)

logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个sincedb数据文件的默认路径在 /plugins/inputs/file下面,文件名类似于.sincedb_452905a167cf4509fd08acb964fdb20c,而表示logstash插件存储目录,默认是LOGSTASH_HOME/data。
看下面一个事件配置文件:

  1. input {
  2. file {
  3. path => ["/var/log/messages"]
  4. type => "system"
  5. start_position => "beginning"
  6. }
  7. }
  8. // 这里直接输出到控制台 以方便调试
  9. output {
  10. stdout{
  11. codec=>rubydebug
  12. }
  13. }
  1. nohup bin/logstash -f logstash-myconf.conf & ##启动配置

这个配置是监听并接收本机的/var/log/messages文件内容,start_position表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似cat命令,默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。type用来标记事件类型,通常会在输入区域通过type标记事件类型。

标准输入(Stdin)

stdin是从标准输入获取信息,关于stdin的使用,前面已经做过了一些简单的介绍,这里再看一个稍微复杂一点的例子,下面是一个关于stdin的事件配置文件:

  1. input{
  2. stdin{
  3. add_field=>{"key"=>"logstash"} // 添加一个自定义字段 key : logstash
  4. tags=>["add1"] // tags 可以取多个
  5. type=>"test1" // 标记类型
  6. }
  7. }
  8. output {
  9. stdout{
  10. codec=>rubydebug
  11. }
  12. }

codec —— 编码插件

其实我们就已经用过编码插件codec了,也就是这个rubydebug,它就是一种codec,虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。 编码插件(Codec)可以在logstash输入或输出时处理不同类型的数据
Logstash不只是一个input—>filter—>output的数据流,而是一个input—>decode—>filter—>encode—>output的数据流。常见的插件有以下几种

plain

plain是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。下面是一个包含plain编码的事件配置文件

  1. input{
  2. stdin{
  3. }
  4. }
  5. output{
  6. stdout{
  7. codec => "plain" // 对输出进行处理 plain不进行任何处理
  8. }
  9. }

json & json_lines

如果发送给logstash的数据内容为json格式,可以在input字段加入codec=>json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让logstash输出为json格式,可以在output字段加入codec=>json,下面是一个包含json编码的事件配置文件:

  1. input {
  2. stdin {
  3. }
  4. }
  5. output {
  6. stdout {
  7. codec => json
  8. }
  9. }
  1. // 输入
  2. successfully started Logstash API endpoint {:port=>9600}
  3. // 输出
  4. {
  5. "message ": "Successfully started Logstash API endpoint {:port=>9600)",
  6. "@timestamp":" 2018-01-19T15:40:20.972Z",
  7. "host":"filebeatserven",
  8. "@version":"1"
  9. }

这就是json格式的输出,可以看出,json每个字段是key:values格式,多个字段之间通过逗号分隔。有时候,如果json文件比较长,需要换行的话,那么就要用json_lines编码格式了。json编码格式很适合向es传递,并进行存储

filter —— 过滤器插件

Grok 正则捕获

grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。 Grok 的语法规则是:%{语法: 语义}
“语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址
例如输入的内容为:172.16.213.132 [07/Jan/2021:16:24:19 +0800] “GET / HTTP/1.1” 403 5039
语法规则 :%{IP:clientip}
匹配模式将获得的结果为: clientip: 172.16.213.132
例如:

  1. input{
  2. stdin{}
  3. }
  4. filter{
  5. grok{
  6. match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
  7. }
  8. }
  9. output{
  10. stdout{
  11. codec => "rubydebug"
  12. }
  13. }

输入内容:
172.16.213.132 [07/Jan/2021:16:24:19 +0800] “GET / HTTP/1.1” 403 5039
输出内容:

  1. {
  2. "@timestamp" => 2021-01-20T13:42:24.2722,
  3. "message" => "172.16.213.132 [07/Jan/2021:16:24:19 +0800] \"GET / HTТP/1.1\" 403 5039",
  4. "timestamp" => "07/Jan/2021:16:24:19 +0800",
  5. "@version" => "1",
  6. "host" => "filebeatserver",
  7. "response" => "403",
  8. "clientip" => "172.16.213.132",
  9. "referren" => "\"GET / HTTP/1.1",
  10. "bytes"=> "5039"
  11. }

此时我们还有原字段message,我们需要将原字段删除

  1. filter{
  2. grok{
  3. match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
  4. remove_field => ["message"]
  5. }
  6. }

有时候我们需要调试我们的匹配模式,我们可以使用在线的grok正则表达式工具,此在线工具需要科学上网:http://grokdebug.herokuapp.com

时间处理(Date)

date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里,这在之前已经做过简单的介绍。下面是date插件的一个配置示例(这里仅仅列出filter部分):

  1. filter {
  2. grok {
  3. match => ["message", "%{HTTPDATE:timestamp}"] // 匹配好对应日志
  4. }
  5. date {
  6. match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] // 格式化日志
  7. }
  8. mutate {
  9. remove_field => ["timestamp"] // 删除原有日志,这里放在最后,以防在未格式化前被删除
  10. }
  11. }

时间对应格式说明
https://www.elastic.co/guide/en/logstash/7.12/plugins-filters-date.html

数据修改(Mutate)

(1)正则表达式替换匹配字段gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于mutate插件中gsub的示例(仅列出filter部分):

  1. filter {
  2. mutate {
  3. gsub => ["filed_name_1", "/" , "_"]
  4. }
  5. }

这个示例表示将filedname_1字段中所有”/“字符替换为”“。如果没有则不替换

(2)分隔符分割字符串为数组split可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于mutate插件中split的示例(仅列出filter部分):

  1. filter {
  2. mutate {
  3. split => ["filed_name_2", "|"]
  4. }
  5. }
  1. 这个示例表示将filed_name_2字段以"|"为区间分隔为数组。

(3)重命名字段rename可以实现重命名某个字段的功能,下面是一个关于mutate插件中rename的示例(仅列出filter部分):

  1. filter {
  2. mutate {
  3. rename => { "old_field" => "new_field" }
  4. }
  5. }

这个示例表示将字段old_field重命名为new_field。

(4)删除字段remove_field可以实现删除某个字段的功能,下面是一个关于mutate插件中remove_field的示例(仅列出filter部分):

  1. filter {
  2. mutate {
  3. remove_field => ["timestamp"]
  4. }
  5. }

这个示例表示将字段timestamp删除。

综合例子:

  1. input {
  2. stdin {}
  3. }
  4. filter {
  5. grok {
  6. match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
  7. remove_field => [ "message" ]
  8. }
  9. date {
  10. match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
  11. }
  12. mutate {
  13. rename => { "response" => "response_new" }
  14. convert => [ "response","float" ]
  15. gsub => ["referrer","\"",""]
  16. remove_field => ["timestamp"]
  17. split => ["clientip", "."]
  18. }
  19. }
  20. output {
  21. stdout {
  22. codec => "rubydebug"
  23. }
  24. }

GeoIP 地址查询归类

GeoIP是最常见的免费IP地址归类查询库,当然也有收费版可以使用。GeoIP库可以根据IP 地址提供对应的地域信息,包括国别,省市,经纬度等,此插件对于可视化地图和区域统计非常有用。下面是一个关于GeoIP插件的简单示例(仅列出filter部分)::

  1. filter {
  2. geoip {
  3. source => "ip_field" // 其中,ip_field字段是输出IP地址的一个字段。
  4. }
  5. }

filter插件综合应用实例

下面给出一个业务系统输出的日志格式,由于业务系统输出的日志格式无法更改,因此就需要我们通过logstash的filter过滤功能以及grok插件来获取需要的数据格式,此业务系统输出的日志内容以及原始格式如下:2018-02-09T10:57:42+08:00|~|123.87.240.97|~|Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202Safari/604.1|~|http://m.sina.cn/cm/ads_ck_wap.html|~|1460709836200|~|DF0184266887D0E 可以看出,这段日志都是以“|~|”为区间进行分隔的,那么刚好我们就以“|~|”为区间分隔符,将这段日志内容分割为6个字段。这里通过grok插件进行正则匹配组合就能完成这个功能。
完整的grok正则匹配组合语句如下:
%{TIMESTAMP_ISO8601:localtime}|~|%{IPORHOST:clientip}|~|(%{GREEDYDATA:http_user_agent})|~|(%{DATA:http_referer})|~|%{GREEDYDATA:mediaid}|~|%{GREEDYDATA:osid}

output —— 输出插件

output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:
file: 表示将日志数据写入磁盘上的文件。
elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。
graphite:表示将日志数据发送给graphite,graphite是一种流行的开源工具,用于存储和绘制数据指标。此外,Logstash还支持输出到nagios、hdfs、email(发送邮件)和Exec(调用命令执行)。

输出到标准输出(stdout)

stdout与之前介绍过的stdin插件一样,它是最基础和简单的输出插件,下面是一个配置实例:

  1. output {
  2. stdout {
  3. codec => rubydebug
  4. }
  5. }

stdout插件,主要的功能和用途就是用于调试,这个插件,在前面已经多次使用过。这里不再过多介绍。

保存为文件(file)

file插件可以将输出保存到一个文件中,配置实例如下:

  1. output {
  2. file {
  3. path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
  4. }
  5. }

上面这个配置中,使用了变量匹配,用于自动匹配时间和主机名,这在实际使用中很有帮助。

输出到elasticsearch

Logstash将过滤、分析好的数据输出到elasticsearch中进行存储和查询,是最经常使用的方法。下面是一个配置实例:

  1. output {
  2. elasticsearch {
  3. host => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
  4. index => "logstash-%{+YYYY.MM.dd}"
  5. manage_template => false
  6. template_name => "template-web_access_log"
  7. }
  8. }

上面配置中每个配置项含义如下:
host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。
manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
template_name:这个配置项用来设置在Elasticsearch中模板的名称。