

它在 ELK 中扮演的角色是数据加工,一般分为三个步骤:输入、过滤器和输出,相当于一个管道,对文件数据进行加工处理,然后输出。它的一大特点是支持丰富的插件,例如:grok,json 等

下载 Logstash

Logstash 要和 ES 版本保持一致,下载:

  1. wget https://artifacts.elastic.co/downloads/logstash/logstash-7.1.1.tar.gz




  1. # -e 指定 要执行的配置
  2. bin/logstash -e 'input { stdin { } } output { stdout {} }'


和 Nginx 一样,也可以检查配置文件是否正确:bin/logstash -f first-pipeline.conf—config.test_and_exit 如果正确会返回:Configuration OK

使用该命令启动,以后修改配置会自动加载,不需要停止或重启 bin/logstash -f first-pipeline.conf —config.reload.automatic


命令行配置启动,简单的测试还可以,实际上会把配置写在配置文件中启动,参考官方文档。新建文件 logstash-simple.conf:

  1. input { stdin { } }
  2. output {
  3. elasticsearch { hosts => ["localhost:9200"] }
  4. # 在控制台输入保存到 ES 中,同时也在控制台输出
  5. stdout { codec => rubydebug }
  6. }


  1. # -f 表示指定配置文件启动
  2. bin/logstash -f logstash-simple.conf

在控制台和 ES 中显示内容:


官方 demo 演示

官方演示了一个 将日志文件从 Filebeat 读取到 Logstash 的 demo,首先下载演示文件,配置 Filebeat,创建 filebeat.yml,添加配置:

  1. filebeat.inputs:
  2. - type: log
  3. paths:
  4. # 这个路径是你刚才那个演示文件的绝对路径
  5. - /app/es/logstash-7.1.1/logstash-tutorial.log
  6. # 输出到 Logstash,端口是 5044
  7. output.logstash:
  8. hosts: ["localhost:5044"]

配置 Logstash,创建 first-pipeline.conf 文件,添加配置:

  1. # 输入源,从 Filebeat 输入进来,Logstash 的端口是 5044
  2. input {
  3. beats {
  4. port => "5044"
  5. }
  6. }
  7. # 插件配置,顺序执行插件
  8. filter {
  9. # grok 插件
  10. grok {
  11. # 解析 Apache Web 日志
  12. match => { "message" => "%{COMBINEDAPACHELOG}" }
  13. }
  14. # geo 插件,clientip 这个字段来自message中
  15. geoip {
  16. source => "clientip"
  17. }
  18. }
  19. # 在控制台输出
  20. output {
  21. stdout { codec => rubydebug }
  22. }

启动 Logstash:

  1. bin/logstash -f first-pipeline.conf --config.reload.automatic

启动 Filebeat:

  1. ./filebeat -e -c filebeat.yml -d "publish"

正常的话你在 Logstash 的控制台看到类似格式的数据:

上面只是将数据输出到控制台,接下来将数据保存到 ES 中,继续配置 Filebeat,在 first-pipeline.conf ,如下:

  1. # 输入源,从 Filebeat 输入进来,Logstash 的端口是 5044
  2. input {
  3. beats {
  4. port => "5044"
  5. }
  6. }
  7. # 插件配置,顺序执行插件
  8. filter {
  9. # grok 插件
  10. grok {
  11. # 解析 Apache Web 日志
  12. match => { "message" => "%{COMBINEDAPACHELOG}" }
  13. }
  14. # geo 插件,clientip 这个字段来自message中
  15. geoip {
  16. source => "clientip"
  17. }
  18. }
  19. # 在控制台输出
  20. output {
  21. # 输出到 ES 中
  22. elasticsearch {
  23. hosts => [ "localhost:9200" ]
  24. }
  25. # stdout { codec => rubydebug }
  26. }

启动 ES,再重启 Filebeat,也是上面那条命令,但是这里还要删除 Filebeat 目录中的 data/registry 文件夹。

然后你可以通过 Kibana 或者 Head 或者使用 HTTP 请求查看 ES 中刚刚存进来的数据。


input 匹配多行

在读取 Java 日志中,打印的堆栈信息经常是多行,但默认是按行读取,所以使用 Multiline 插件:

  1. # 例如一个input 数据源
  2. Exception in thread "main" java.lang.ArithmeticException: / by zero
  3. at com.wallet.v2.rpc.EthInfuraRpcServiceTest.print(EthInfuraRpcServiceTest.java:115)
  4. at com.wallet.v2.rpc.EthInfuraRpcServiceTest.main(EthInfuraRpcServiceTest.java:111)

multiline.conf 配置:

  1. input {
  2. stdin {
  3. codec => multiline {
  4. # 支持正则
  5. pattern => "^\s"
  6. what => "previous"
  7. }
  8. }
  9. }
  10. output {
  11. stdout {
  12. codec => rubydebug
  13. }
  14. }

sprintf 格式


  1. output {
  2. # 输出到 ES 中
  3. elasticsearch {
  4. hosts => [ "localhost:9200" ]
  5. # 文档id
  6. document_id => "%{id}"
  7. # 文档类型
  8. document_type => "_doc"
  9. # 自定义索引
  10. index => "custom-index-%{+YYYYMMdd}"
  11. }
  12. }

上面的 %{+YYYYMMdd} 就是 Logstash 支持的 sprintf 格式,用 %{} 可以获取动态的数据。再比如上面用的 COMBINEDAPACHELOG 则是 grok 预置的表达式。还要就是希望能获取 Filebeat 传过来的字段,可以用 %[fields][字段名称] 的方式来获取。


通过上面的几个案例你会发现,Logstash 的配置文件,一般分为三部分,其中每一部分里面都可以配置 N 个选项,input 插件、filter 插件、output 插件:

  1. # 输入
  2. input {
  3. file {
  4. path => "/var/log/messages"
  5. type => "syslog"
  6. }
  7. file {
  8. path => "/var/log/apache/access.log"
  9. type => "apache"
  10. }
  11. ...
  12. }
  13. # 处理过滤
  14. filter {
  15. # 这里会定义各种插件,进行数据处理,还可以写表达式逻辑判断
  16. ...
  17. }
  18. # 输出到文件,或者ES,控制台等等
  19. output {
  20. ...
  21. }

同步 MySQL 数据

logstash 同步 MySQL 中数据配置,使用 jdbc 插件,操作方式和上面一样,下面是配置文件介绍 mysql.conf:

  1. input {
  2. jdbc {
  3. # 数据库地址
  4. jdbc_connection_string => "jdbc:mysql://localhost:3306/test_db?characterEncoding=utf-8"
  5. # 用户名和密码
  6. jdbc_user => "root"
  7. jdbc_password => "123456"
  8. # mysql-connector-java 驱动 jar 包路径
  9. jdbc_driver_library => "/usr/local/java/mysql-connector-java-5.1.6.jar"
  10. # the name of the driver class for mysql
  11. jdbc_driver_class => "com.mysql.jdbc.Driver"
  12. # 是否分页
  13. jdbc_paging_enabled => "true"
  14. # 分页条数
  15. jdbc_page_size => "50"
  16. #以下对应着要执行的sql的绝对路径,或者直接执行 sql
  17. #statement_filepath => ""
  18. statement => "select id,title,content from tb_article"
  19. #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  20. # 这里表示每分钟执行一次
  21. schedule => "* * * * *"
  22. }
  23. }
  24. output {
  25. elasticsearch {
  26. #ESIP地址与端口
  27. hosts => ""
  28. #ES索引名称(自己定义的)
  29. index => "article"
  30. #自增ID编号,这里的id对应上面sql语句的id
  31. document_id => "%{id}"
  32. # ES 文档类型
  33. document_type => "article"
  34. }
  35. stdout {
  36. #以JSON格式输出
  37. codec => json_lines
  38. }
  39. }
