date: 2020-07-16title: filebeat及logstash配置 #标题
tags: elk线上配置 #标签
categories: elastic stack # 分类
记录下filebeat及logstash配置语法。
配置filebeat收集nginx日志及java日志
filebeat.inputs:
- type: log
enabled: True
fields:
log_type: nginx-access
project_name: shsp
log_topic: common_nginx
fields_under_root: true
paths:
- /apps/usr/nginx/logs/access.log
- type: log
enabled: True
fields:
project_name: shsp
log_type: nginx-error
log_topic: common_nginx
fields_under_root: true
paths:
- /apps/usr/nginx/logs/error.log
- type: log
enabled: True
multiline.pattern: '^[[:space:]]+|^Caused by:'
multiline.negate: false
multiline.match: after
fields:
project_name: shsp
log_type: app_log
log_topic: app_all
fields_under_root: true
paths:
- /apps/usr/appdata/logs/*.log
output.kafka:
hosts: ["192.168.20.2:9092", "192.168.20.3:9092", "192.168.20.4:9092"]
topic: '%{[log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
processors:
- drop_fields:
fields: ['ecs', 'beat', 'input', '@version', 'agent']
logstash处理nginx日志
input {
kafka {
bootstrap_servers => "192.168.20.2:9092,192.168.20.3:9092,192.168.20.4:9092"
topics => ["common_nginx"]
codec => json { charset => "UTF-8" }
group_id => "standard"
consumer_threads => 8
}
}
filter {
if [log_type] == "nginx-access" {
grok {
match => { "message" => ["%{IPORHOST:[access][remote_ip]} - %{DATA:[access][user_name]} \[%{HTTPDATE:[access][time]}\] \"%{WORD:[access][method]} %{DATA:[access][url]} HTTP/%{NUMBER:[access][http_version]}\" %{NUMBER:[access][response_code]} %{NUMBER:[access][body_sent][bytes]} \"%{DATA:[access][referrer]}\" \"%{DATA:[access][agent]}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
date {
match => [ "[access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[access][time]"
}
useragent {
source => "[access][agent]"
target => "[access][user_agent]"
remove_field => "[access][agent]"
}
geoip {
source => "[access][remote_ip]"
target => "[geoip]"
}
}
else if [log_type] == "nginx-error" {
grok {
match => { "message" => ["%{DATA:[error][time]} \[%{DATA:[error][level]}\] %{NUMBER:[error][pid]}#%{NUMBER:[error][tid]}: (\*%{NUMBER:[error][connection_id]} )?%{GREEDYDATA:[error][message]}"] }
remove_field => "message"
}
mutate {
rename => { "@timestamp" => "read_timestamp" }
}
date {
match => [ "[error][time]", "YYYY/MM/dd H:m:s"]
remove_field => "[error][time]"
}
}
}
output {
#stdout {
# codec => rubydebug
#}
elasticsearch {
hosts => [ "192.168.20.11:9200","192.168.20.12:9200","192.168.20.13:9200" ]
user => "elastic"
password => "abcd"
index => "logstash-nginx-%{project_name}-%{+YYYY.MM.dd}"
codec => json { charset => "UTF-8" }
}
}
logstash处理java日志
input {
kafka {
bootstrap_servers => "192.168.20.2:9092,192.168.20.3:9092,192.168.20.4:9092"
topics => ["app_all"]
codec => json { charset => "UTF-8" }
group_id => "standard"
consumer_threads => 8
}
}
filter {
mutate {
add_field => { "log_path" => "%{[log][file][path]}" }
add_field => { "host_name" => "%{[host][name]}"}
}
mutate {
split => ["[log][file][path]", "/"]
}
mutate {
split => ["[log][file][path][-1]", "."]
}
mutate {
add_field => { "log_name" => "%{[log][file][path][-1][0]}" }
}
mutate{
remove_field => ["log", "host"]
}
}
output {
#stdout {
# codec => rubydebug
#}
elasticsearch {
hosts => [ "192.168.20.11:9200","192.168.20.12:9200","192.168.20.13:9200" ]
user => "elastic"
password => "abcd"
index => "logstash-app_%{project_name}-%{+YYYY.MM.dd}"
codec => json { charset => "UTF-8" }
}
}