logstash是一个数据分析软件,主要目的是分析log日志。

日志经过采集后,就会输入到logstash,由logstash进行加工过滤分析,再输入到ES中。这样,ES索引中的文档就会获取到所需的字段

logstash的配置分为

  1. input
  2. filter
  3. output

Input

input模块主要用户收集数据。

  1. 文件

    1. input{
    2. file{
    3. #path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
    4. path => ['pathA''pathB']
    5. #表示多久去path路径下查看是够有新的文件产生。默认是15秒检查一次。
    6. discover_interval => 15
    7. #排除那些文件,也就是不去读取那些文件
    8. exclude => ['fileName1','fileName2']
    9. #被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
    10. close_older => 3600
    11. #在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
    12. ignore_older => 86400
    13. #logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
    14. stat_interval => 1
    15. #sincedb记录数据上一次的读取位置的一个index
    16. sincedb_path => '$HOME/. sincedb'
    17. #logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
    18. start_position => 'beginning'
    19. }
    20. }

    如果需要每次都从头开始读取文件的话,设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null

  2. 数据库

    1. input{
    2. jdbc{
    3. #jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
    4. jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
    5. #jdbc class 不同数据库有不同的 class 配置
    6. jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    7. #配置数据库连接 ip 和端口,以及数据库
    8. jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db"
    9. #配置数据库用户名
    10. jdbc_user =>
    11. #配置数据库密码
    12. jdbc_password =>
    13. # 定时器 多久执行一次SQL,默认是一分钟
    14. # schedule => 分 时 天 月 年
    15. # schedule => * 22 * * * 表示每天22点执行一次
    16. schedule => "* * * * *"
    17. #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    18. clean_run => false
    19. #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
    20. #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    21. use_column_value => true
    22. #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
    23. tracking_column => create_time
    24. #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    25. record_last_run => true
    26. #只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
    27. last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
    28. #是否将字段名称转小写。
    29. #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
    30. #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
    31. lowercase_column_names => false
    32. #你的SQL的位置,当然,你的SQL也可以直接写在这里。
    33. #statement => SELECT * FROM tabeName t WHERE t.creat_time > :last_sql_value
    34. statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
    35. #ES数据类型
    36. type => "my_info"
    37. }
    38. }
    39. }

    外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。

    1. 通过filebeat收集

      1. input {
      2. beats {
      3. #接受数据端口
      4. port => 5044
      5. #数据类型
      6. type => "logs"
      7. }
      8. }
    2. tcp,通常一些日志通过tcp连接暴露

      1. input {
      2. tcp {
      3. port => "5044"
      4. codec => "json"
      5. add_field => ["log_channel", "kong"]
      6. }
      7. }

filter

filter是过滤器,会对收集到的数据进行处理。elastic官方还提供了许多插件,可以协助数据处理

filter是logstash最复杂的一个模块,这里只做简述

  1. grok 可以匹配一切数据
  2. Geoip 获取 IP 对应的地理位置
  3. json 对于 json 格式的 log,可以通过 codec 的 json 编码进行解析
  4. split logstash 同样支持将一行数据变成多个事件,它提供了 split 插件,用来把一行数据拆分成多个事件
  5. mutate logstash 支持在 filter 中对事件中的数据进行修改
  6. date 用于处理时间

以下是一个分析eventti日志的filter

首先创建了一个模式文件,用于存取正则模式

  1. # /usr/share/logstash/eventti/patterns
  2. DATE1 [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]+
  3. LEVEL (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)
  4. JAVA_SOURCE \[.*\]
  5. JAVALOGMESSAGE (.*)
  6. DATE2 [0-9]{4}\/[0-9]{2}\/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}
  7. LEVEL2 (\[I\])|(\[D\])|(\[E\])|(\[W\])|(\[F\])
  8. ID [a-zA-Z0-9-]+
  9. MESSAGE (.*?)
  10. TYPE [a-zA-Z0-9_]+
  11. TIME1 [0-9]+
  12. USERNAME (.*)

然后在logstash配置中使用filter进行过滤分析

  1. filter {
  2. grok {
  3. # 正则模式位置
  4. patterns_dir => ["/usr/share/logstash/eventti/patterns"]
  5. # 将匹配到的模式转化为字段
  6. match => {
  7. "message" => "%{DATE2:logdate} %{LEVEL2:loglevel} id:%{ID:logid} message:%{MESSAGE:logmessage}\stype:%{TYPE:logtype} timestamp:%{TIME1:logtime} user:%{USERNAME:username}"
  8. }
  9. }
  10. date {
  11. # 匹配时间转化为时间戳
  12. match => [ "logtime", "UNIX_MS" ]
  13. target => "@timestamp"
  14. }
  15. mutate {
  16. # 转化格式
  17. convert =>{"logtime" => "integer"}
  18. }
  19. }

output

output用于将处理好的数据进行输出

  1. 文件

    1. output{
    2. file{
    3. path => "/home/app/logbak/%{+YYYY.MM.dd}-file.txt"
    4. codec => line {format => "%{[collectValue]}"} # 设置根据原始数据格式保存,不会带Json格式
    5. }
    6. }
  2. ES

    1. output{
    2. elasticsearch{
    3. # ES地址
    4. hosts=>["172.132.12.3:9200"]
    5. # es要执行的动作 index, delete, create, update
    6. action=>"index"
    7. index=>"indextemplate-logstash"
    8. #document_type=>"%{@type}"
    9. # 为索引提供document id ,对重写elasticsearch中相同id词目很有用
    10. document_id=>"ignore"
    11. # 有效的filepath 设置自己的template文件路径
    12. template=>"/opt/logstash-conf/es-template.json"
    13. template_name=>"es-template.json"
    14. template_overwrite=>true
    15. }
    16. }

pipeline

Logstash 通常需要处理多个并行事件流,如果总是通过if进行判断则显得臃肿。

logstash在6.0.0之后引入了pipeline。用户可以在配置文件pipelines.yml中添加新的 pipeline 配置并指定其配置文件就可以了

  1. - pipeline.id: kong
  2. path.config: "/usr/share/logstash/kong/kong.conf"
  3. - pipeline.id: eventti
  4. path.config: "/usr/share/logstash/eventti/eventti.conf"

然后在不同的配置文件配置不同的日志分析策略