Logstash从数据库读取到ES里面

    1. input {
    2. jdbc {
    3. #设置连接字符串
    4. jdbc_connection_string =>"jdbc:mysql://101.133.224.127:3306/guli_edu?useUnicode=true&characterEncoding=utf8"
    5. #用户名和密码
    6. jdbc_user => "root"
    7. jdbc_password =>"11111111"
    8. #数据库的驱动地址
    9. jdbc_driver_library =>"/usr/local/logstash-6.8.4/sync-db/mysql-connector-java-5.1.46.jar"
    10. #驱动类名
    11. jdbc_driver_class => "com.mysql.jdbc.Driver"
    12. #开启分页
    13. jdbc_paging_enabled => "true"
    14. #分页每页数量
    15. jdbc_page_size =>"1000"
    16. #执行sql文件路径
    17. statement_filepath =>"/usr/local/logstash-6.8.4/sync-db/guli.sql"
    18. #设置定时任务间隔, 含义:分,时,天,月,年,全部为* 默认含义为每分钟跑一次
    19. schedule => "* * * * *"
    20. #索引类型
    21. type => "_doc"
    22. #是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path的文件
    23. use_column_value => true
    24. #记录上次追踪的结果值 文件路径
    25. last_run_metadata_path => ""
    26. #如果 use_column_value 为 ture ,配置本参数追踪的column名 可以是自增id或者时间
    27. tracking_column =>"gmt_modified"
    28. #tracking_column 对应的字段类型
    29. tracking_column_type => "timestamp"
    30. #是否清除 last_run_metadata_path 的记录,true表示每次都从头开始查找
    31. clean_run => false
    32. #数据库字段名称大写转小写
    33. lowercase_column_names => false
    34. }
    35. }
    36. output {
    37. elasticsearch {
    38. #es地址
    39. hosts =>[""]
    40. #同步的索引名
    41. index =>"xiaohu"
    42. #设置_docID和数据相同
    43. document_id => "%{id}"
    44. }
    45. #日志输出
    46. stdout {
    47. codec => json_lines
    48. }
    49. }

    FileBeat 读取配置文件到 Kafka

    ###################### Filebeat Configuration Example #########################
    filebeat.prospectors:
    
    - input_type: log
    
      paths:
        ## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
        - /usr/local/web/api/edu/logs/app-service-edu.log
      #定义写入 ES 时的 _type 值
      document_type: "service-edu-log"
      multiline:
        #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
        pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
        negate: true                                # 是否匹配到
        match: after                                # 合并到上一行的末尾
        max_lines: 2000                             # 最大的行数
        timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
      fields:
        logbiz: service-edu
        logtopic: app-log-service-edu   ## 按服务划分用作kafka topic
        evn: dev
    
    - input_type: log
    
      paths:
        -/usr/local/web/api/edu/logs/error-service-edu.log
      document_type: "service-edu-error-log"
      multiline:
        #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
        pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
        negate: true                                # 是否匹配到
        match: after                                # 合并到上一行的末尾
        max_lines: 2000                             # 最大的行数
        timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
      fields:
        logbiz: service-edu
        logtopic: error-log-service-edu   ## 按服务划分用作kafka topic
        evn: dev
    
    output.kafka:
      enabled: true
      hosts: ["127.0.0.1:9092"]
      topic: '%{[fields.logtopic]}'
      partition.hash:
        reachable_only: true
      compression: gzip
      max_message_bytes: 1000000
      required_acks: 1
    logging.to_files: true
    
    
    kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic app-log-service-edu --partitions 1  --replication-factor 1
    

    LogStash读取Kafka的数据到ES

    ## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
    input {
      kafka {
        ## app-log-服务名称
        topics_pattern => "service-edu-.*"
        bootstrap_servers => "127.0.0.1:9092"  #kafka地址
        codec => json #输出格式json
        consumer_threads => 1    ## 增加consumer的并行消费线程数 优化之一 Logstash开启多少个线程消费
        decorate_events => true
        #auto_offset_rest => "latest"
        group_id => "app-log-service-edu-group" #kafka的分组
       }
    
       kafka {
        ## error-log-服务名称
        topics_pattern => "error-service-edu-.*"
        bootstrap_servers => "127.0.0.1:9092"
        codec => json
        consumer_threads => 1
        decorate_events => true
        #auto_offset_rest => "latest"
        group_id => "error-log-service-edu-group"
       }
    
    }
    
    filter {
    
      ## 时区转换 es的时间问题所以我们得时间转换不然日志输出会有问题,我们比es的时间慢半个小时 东八区
      ruby {
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
      }
    #filebeat 中定义的 [fields][logtopic] 字段属性
      if "app-log" in [fields][logtopic]{
        grok {
            ## 表达式,这里对应的是Springboot输出的日志格式
            match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
      }
    
      if "error-log" in [fields][logtopic]{
        grok {
            ## 表达式
            match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
      }
    
    }
    
    ## 测试输出到控制台:
    output {
      stdout { codec => rubydebug }
    }
    
    
    ## elasticsearch:
    output {
    
      if "app-log" in [fields][logtopic]{
        ## es插件
        elasticsearch {
              # es服务地址
            hosts => ["127.0.0.1:9200"]
            # 用户名密码      
            # user => "elastic"
            # password => "123456"
            ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
            ## javalog-app-service-2019.01.23 
            index => "app-log-service-edu-%{[fields][logbiz]}-%{index_time}"
            # 是否嗅探集群ip:一般设置true;http://192.168.8.170:9200/_nodes/http?pretty
            # 通过嗅探机制进行es集群负载均衡发日志消息
            sniffing => true
            # logstash默认自带一个mapping模板,进行模板覆盖
            template_overwrite => true
        } 
      }
    
      if "error-log" in [fields][logtopic]{
        elasticsearch {
            hosts => ["127.0.0.1:9200"]    
            user => "elastic"
            password => "123456"
            index => "error-log-service-edu-%{[fields][logbiz]}-%{index_time}"
            sniffing => true
            template_overwrite => true
        } 
      }
    
    
    }