date: 2020-07-16title: filebeat及logstash配置 #标题
tags: elk线上配置 #标签
categories: elastic stack # 分类
记录下filebeat及logstash配置语法。
配置filebeat收集nginx日志及java日志
filebeat.inputs:- type: log enabled: True fields: log_type: nginx-access project_name: shsp log_topic: common_nginx fields_under_root: true paths: - /apps/usr/nginx/logs/access.log- type: log enabled: True fields: project_name: shsp log_type: nginx-error log_topic: common_nginx fields_under_root: true paths: - /apps/usr/nginx/logs/error.log- type: log enabled: True multiline.pattern: '^[[:space:]]+|^Caused by:' multiline.negate: false multiline.match: after fields: project_name: shsp log_type: app_log log_topic: app_all fields_under_root: true paths: - /apps/usr/appdata/logs/*.logoutput.kafka: hosts: ["192.168.20.2:9092", "192.168.20.3:9092", "192.168.20.4:9092"] topic: '%{[log_topic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000processors: - drop_fields: fields: ['ecs', 'beat', 'input', '@version', 'agent']
logstash处理nginx日志
input { kafka { bootstrap_servers => "192.168.20.2:9092,192.168.20.3:9092,192.168.20.4:9092" topics => ["common_nginx"] codec => json { charset => "UTF-8" } group_id => "standard" consumer_threads => 8 }}filter { if [log_type] == "nginx-access" { grok { match => { "message" => ["%{IPORHOST:[access][remote_ip]} - %{DATA:[access][user_name]} \[%{HTTPDATE:[access][time]}\] \"%{WORD:[access][method]} %{DATA:[access][url]} HTTP/%{NUMBER:[access][http_version]}\" %{NUMBER:[access][response_code]} %{NUMBER:[access][body_sent][bytes]} \"%{DATA:[access][referrer]}\" \"%{DATA:[access][agent]}\""] } remove_field => "message" } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } } date { match => [ "[access][time]", "dd/MMM/YYYY:H:m:s Z" ] remove_field => "[access][time]" } useragent { source => "[access][agent]" target => "[access][user_agent]" remove_field => "[access][agent]" } geoip { source => "[access][remote_ip]" target => "[geoip]" } } else if [log_type] == "nginx-error" { grok { match => { "message" => ["%{DATA:[error][time]} \[%{DATA:[error][level]}\] %{NUMBER:[error][pid]}#%{NUMBER:[error][tid]}: (\*%{NUMBER:[error][connection_id]} )?%{GREEDYDATA:[error][message]}"] } remove_field => "message" } mutate { rename => { "@timestamp" => "read_timestamp" } } date { match => [ "[error][time]", "YYYY/MM/dd H:m:s"] remove_field => "[error][time]" } }}output { #stdout { # codec => rubydebug #} elasticsearch { hosts => [ "192.168.20.11:9200","192.168.20.12:9200","192.168.20.13:9200" ] user => "elastic" password => "abcd" index => "logstash-nginx-%{project_name}-%{+YYYY.MM.dd}" codec => json { charset => "UTF-8" } }}
logstash处理java日志
input { kafka { bootstrap_servers => "192.168.20.2:9092,192.168.20.3:9092,192.168.20.4:9092" topics => ["app_all"] codec => json { charset => "UTF-8" } group_id => "standard" consumer_threads => 8 }}filter { mutate { add_field => { "log_path" => "%{[log][file][path]}" } add_field => { "host_name" => "%{[host][name]}"} } mutate { split => ["[log][file][path]", "/"] } mutate { split => ["[log][file][path][-1]", "."] } mutate { add_field => { "log_name" => "%{[log][file][path][-1][0]}" } } mutate{ remove_field => ["log", "host"] } }output { #stdout { # codec => rubydebug #} elasticsearch { hosts => [ "192.168.20.11:9200","192.168.20.12:9200","192.168.20.13:9200" ] user => "elastic" password => "abcd" index => "logstash-app_%{project_name}-%{+YYYY.MM.dd}" codec => json { charset => "UTF-8" } }}