Logstash工作原理
Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。
1.安装logstash
[root@master ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.2.0.tar.gz
[root@master ~]# tar zxvf logstash-7.2.0.tar.gz
[root@master ~]# mv logstash-7.2.0 /usr/local/logstash
[root@master ~]# vim /usr/local/logstash/config/test.conf
input {
beats {
port => 5044
}
}
###########################################################################################################################
filter {
if [fields][appname] =~ ".*access-log" {
grok {
match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time}|-)\;(%{IPORHOST:Client_ip}|-)\;(%{DATA:User_name}|-)\;(%{WORD:Request_id}|-)\;(%{PATH:Request_uri}|-)\;(%{INT:Response_time}|-)\;(%{INT:Status_code}|-)\;(%{NOTSPACE:Status_message}|-)"] }
}
geoip {
source => "Client_ip"
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
if [fields][appname] =~ ".*info-log" {
grok {
match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time}) (%{WORD:INFO}) (%{GREEDYDATA:Message})"] }
}
if [message] == ";" {
drop {}
}
}
if [fields][appname] =~ ".*error-log" {
grok {
match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time}) (%{WORD:INFO}) (%{INT:Code}) --- \[(%{GREEDYDATA:Noname})\] (%{GREEDYDATA:Method}) \:(%{GREEDYDATA:Message})"] }
}
}
if [fields][appname] =~ ".*bmw-info-log" {
grok {
match => { "message" => ["(\[%{DATA:Request_time}\](%{GREEDYDATA:Message}))"] }
}
}
# if [fields][appname] =~ ".*billing-log" {
# grok {
# match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time} \[ %{DATA:Schedul} \] - \[ %{DATA:Level} \] \[ %{DATA:Localtion} \] - %{GREEDYDATA:Message})"] }
# }
# }
date {
match => [ "Request_time" , "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
}
}
##########################################################################################################################
output {
if [fields][appname] == "online-bmw-info-log"{
elasticsearch {
hosts => ["ESIP:9200"]
user => "elastic"
password => "situ1234"
index => "bmw-info-log-%{+YYYY.MM.dd}"
}
}
if [fields][appname] == "staging-bmw-access-log" {
elasticsearch {
hosts => ["ESIP:9200"]
user => "elastic"
password => "situ1234"
index => "bmw-access-log-%{+YYYY.MM.dd}"
}
}
if [fields][appname] == "staging-bmw-beta-access-log" {
elasticsearch {
hosts => ["ESIP:9200"]
user => "elastic"
password => "situ1234"
index => "bmw-beta-access-log-%{+YYYY.MM.dd}"
}
}
}
#########################################################################################################################
2.准备启动服务
# 以下为测试方法
[root@master ~]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
# 手动输入hello world它也会输出hello world
启动logstash
root@master ~]# nohup bin/logstash -f /usr/local/logstash/config/test.conf &
[root@master ~]# netstat -ntplu
tcp6 0 0 :::5044 :::* LISTEN 14069/java
tcp6 0 0 127.0.0.1:9600 :::* LISTEN 14069/java
# 通过下面操作可以查看logstash和elasticsearch是否建立连接,刷新会看到数据变化。
[root@master ~]# curl http://192.168.3.128:9200/_cat/indices
yellow open filebeat-2019.07.16 PCHNvs83TvOuSd1iDU4S-A 5 1 4131 0 841.7kb 841.7kb
yellow open .kibana qXyuI2ubT3-bBwyknxoyhA 1 1 2 0 21kb 21kb