参考资料
- 官方文档:https://www.elastic.co/guide/en/logstash/7.x/logstash-7-1-1.html 官方文档特别全,我觉得比 Spring 写的还要好
简介
它在 ELK 中扮演的角色是数据加工,一般分为三个步骤:输入、过滤器和输出,相当于一个管道,对文件数据进行加工处理,然后输出。它的一大特点是支持丰富的插件,例如:grok,json 等
下载 Logstash
Logstash 要和 ES 版本保持一致,下载:
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.1.1.tar.gz
使用
命令行配置启动
参考官方文档:
# -e 指定 要执行的配置
bin/logstash -e 'input { stdin { } } output { stdout {} }'
这个意思在控制台进行标准的输入和输出,也就是你在控制台输入什么,同时也会输出,如下:
和 Nginx 一样,也可以检查配置文件是否正确:bin/logstash -f first-pipeline.conf—config.test_and_exit 如果正确会返回:Configuration OK
使用该命令启动,以后修改配置会自动加载,不需要停止或重启 bin/logstash -f first-pipeline.conf —config.reload.automatic
配置文件启动
命令行配置启动,简单的测试还可以,实际上会把配置写在配置文件中启动,参考官方文档。新建文件 logstash-simple.conf:
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
# 在控制台输入保存到 ES 中,同时也在控制台输出
stdout { codec => rubydebug }
}
执行命令:
# -f 表示指定配置文件启动
bin/logstash -f logstash-simple.conf
在控制台和 ES 中显示内容:
官方 demo 演示
官方演示了一个 将日志文件从 Filebeat 读取到 Logstash 的 demo,首先下载演示文件,配置 Filebeat,创建 filebeat.yml,添加配置:
filebeat.inputs:
- type: log
paths:
# 这个路径是你刚才那个演示文件的绝对路径
- /app/es/logstash-7.1.1/logstash-tutorial.log
# 输出到 Logstash,端口是 5044
output.logstash:
hosts: ["localhost:5044"]
配置 Logstash,创建 first-pipeline.conf 文件,添加配置:
# 输入源,从 Filebeat 输入进来,Logstash 的端口是 5044
input {
beats {
port => "5044"
}
}
# 插件配置,顺序执行插件
filter {
# grok 插件
grok {
# 解析 Apache Web 日志
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# geo 插件,clientip 这个字段来自message中
geoip {
source => "clientip"
}
}
# 在控制台输出
output {
stdout { codec => rubydebug }
}
启动 Logstash:
bin/logstash -f first-pipeline.conf --config.reload.automatic
启动 Filebeat:
./filebeat -e -c filebeat.yml -d "publish"
正常的话你在 Logstash 的控制台看到类似格式的数据:
上面只是将数据输出到控制台,接下来将数据保存到 ES 中,继续配置 Filebeat,在 first-pipeline.conf ,如下:
# 输入源,从 Filebeat 输入进来,Logstash 的端口是 5044
input {
beats {
port => "5044"
}
}
# 插件配置,顺序执行插件
filter {
# grok 插件
grok {
# 解析 Apache Web 日志
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# geo 插件,clientip 这个字段来自message中
geoip {
source => "clientip"
}
}
# 在控制台输出
output {
# 输出到 ES 中
elasticsearch {
hosts => [ "localhost:9200" ]
}
# stdout { codec => rubydebug }
}
启动 ES,再重启 Filebeat,也是上面那条命令,但是这里还要删除 Filebeat 目录中的 data/registry 文件夹。
然后你可以通过 Kibana 或者 Head 或者使用 HTTP 请求查看 ES 中刚刚存进来的数据。
通过浏览器查看:
查看索引:localhost:9200/_cat/indices?v
查看某个索引的数据:localhost:9200/${索引名称}/_search?pretty&q=response=200
input 匹配多行
在读取 Java 日志中,打印的堆栈信息经常是多行,但默认是按行读取,所以使用 Multiline 插件:
# 例如一个input 数据源
Exception in thread "main" java.lang.ArithmeticException: / by zero
at com.wallet.v2.rpc.EthInfuraRpcServiceTest.print(EthInfuraRpcServiceTest.java:115)
at com.wallet.v2.rpc.EthInfuraRpcServiceTest.main(EthInfuraRpcServiceTest.java:111)
multiline.conf 配置:
input {
stdin {
codec => multiline {
# 支持正则
pattern => "^\s"
what => "previous"
}
}
}
output {
stdout {
codec => rubydebug
}
}
sprintf 格式
实际上配置中不可能写死,我们希望动态配置信息,例如根据日期来建立索引:
output {
# 输出到 ES 中
elasticsearch {
hosts => [ "localhost:9200" ]
# 文档id
document_id => "%{id}"
# 文档类型
document_type => "_doc"
# 自定义索引
index => "custom-index-%{+YYYYMMdd}"
}
}
上面的 %{+YYYYMMdd} 就是 Logstash 支持的 sprintf 格式,用 %{} 可以获取动态的数据。再比如上面用的 COMBINEDAPACHELOG 则是 grok 预置的表达式。还要就是希望能获取 Filebeat 传过来的字段,可以用 %[fields][字段名称] 的方式来获取。
小结
通过上面的几个案例你会发现,Logstash 的配置文件,一般分为三部分,其中每一部分里面都可以配置 N 个选项,input 插件、filter 插件、output 插件:
# 输入
input {
file {
path => "/var/log/messages"
type => "syslog"
}
file {
path => "/var/log/apache/access.log"
type => "apache"
}
...
}
# 处理过滤
filter {
# 这里会定义各种插件,进行数据处理,还可以写表达式逻辑判断
...
}
# 输出到文件,或者ES,控制台等等
output {
...
}
同步 MySQL 数据
logstash 同步 MySQL 中数据配置,使用 jdbc 插件,操作方式和上面一样,下面是配置文件介绍 mysql.conf:
input {
jdbc {
# 数据库地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/test_db?characterEncoding=utf-8"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
# mysql-connector-java 驱动 jar 包路径
jdbc_driver_library => "/usr/local/java/mysql-connector-java-5.1.6.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 是否分页
jdbc_paging_enabled => "true"
# 分页条数
jdbc_page_size => "50"
#以下对应着要执行的sql的绝对路径,或者直接执行 sql
#statement_filepath => ""
statement => "select id,title,content from tb_article"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
# 这里表示每分钟执行一次
schedule => "* * * * *"
}
}
output {
elasticsearch {
#ESIP地址与端口
hosts => "127.0.0.1:9200"
#ES索引名称(自己定义的)
index => "article"
#自增ID编号,这里的id对应上面sql语句的id
document_id => "%{id}"
# ES 文档类型
document_type => "article"
}
stdout {
#以JSON格式输出
codec => json_lines
}
}
更多基于配置文件的复杂案例,参考官方文档。