简单测试启动
指定文件
test.conf
input {
stdin{ }
}
output {
stdout { codec=>"rubydebug" }
}
logstash -f test.conf
指定启动参数
logstash -e “input { stdin{ } } output { stdout { } }”
指定 jdk
bin/logstash 会加载 logstash.lib.sh
logstash.lib.sh 中指定 JAVACMD
logstash 启动参数
logstash [OPTIONS]
-n, —node.name
-f, —config CONFIG_PATH
可指定一个目录, 读取目录下所有文件拼成一个大文件
文件按字母顺序读取, input filter output 按顺序
-e, —config.string CONFIG_STRING
-w, —filterworkers COUNT
—watchdog-timeout SECONDS
-l, —log FILE
-p, —pluginpath PATH
-w, —pipeline.workers COUNT
-b, —pipeline.batch.size SIZE
-u, —pipeline.batch.delay DELAY_IN_MS
-t, —config.test_and_exit
-r, —config.reload.automatic
—config.reload.interval RELOAD_INTERVAL
—path.data PATH
—path.settings 包含 logstash.yml 的目录
nohup /apps/logstash/bin/logstash -f /apps/logstash/config/filebeat.conf —config.reload.automatic &> /apps/logstash/logstash.log &
启动多个进程
每个进程需要单独的 path.data
/data/scripts/logstash-oslog.sh
#!/bin/bash
#
nohup /apps/logstash/bin/logstash -f /apps/logstash/oslog.conf —path.data /data/logstash/data/oslog —config.reload.automatic &> /data/scripts/logstash-oslog.log &
/data/scripts/logstash-www.sh
#!/bin/bash
#
nohup /apps/logstash/bin/logstash -f /apps/logstash/www.conf —path.data /data/logstash/data/www —config.reload.automatic &> /data/scripts/logstash-www.log &
input
beats
input {
beats {
port => 5044
}
}
file
从指定的文件中读取事件流;
使用 FileWatch(Ruby Gem库)监听文件的变化。
.sincedb_xxxxxxxx:记录了每个被监听的文件的inode, major number, minor nubmer, pos;
file {
path => [ “/data/mysql/3306/3306-error.err” ]
type => “mysql.err”
start_position => “beginning”
}
close_older
默认 1h
已经监听的文件,在此时间内没有更新,就关闭它的文件句柄
discover_interval
默认 15s
每隔多久去检测指定 path 下是否有新文件
乘上 stat_interval
exclude
排除不被监听的文件
ignore_older
path
array
[“xxx”,”yyy”]
/var/log/*/.log 递归搜索 /var/log/ 下面所有 .log
stat_interval
Default value is “1 second”
sincedb_path
sincedb_write_interval
默认 15s
每隔多久写入 .sincedb 文件
stat_interval
默认 1s
每隔多久去检测一次文件是否改变
start_position
Value can be any of: beginning, end
Default value is “end”
已监听的文件存在 .sincedb,会从记录过的 pos 开始
可将 sincedb_path 指向 /dev/null ,每次重头开始
filter
grok 正则捕获
input {
stdin{ }
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
stdout { }
}
72.30.142.174 - - [08/Jul/2010:05:24:10 +0800] “GET /robots.txt HTTP/1.0” 200 27 “null” “Mozilla/5.0 (compatible; Yahoo! slurp; http://help.yahoo.com/help/us/ysearch/slurp)”
mutate 数据修改
字段重命名
优先级高,其他字段操作以新字段名为准
mutate {
rename => {“老字段名”” => “新字段名”}
}
替换
mutate {
replace => {“字段” => “新字段”}
}
正则替换
mutate {
gsub => [
“字段”,”原”,”目标”,
“字段”,”原”,”目标”,
]
}
分割为数组
mutate {
split => {“字段” => “分割符”}
}
删除字段
mutate {
remove_field => [“字段1”,”字段2”,…]
}
添加字段
mutate {
add_field => {
“k1” => “v1”
“accessRUL” => “%{[message][6]}”
“clientIP” => “%{[message][23]}”
}
}
geoip 定位
geoip { source => “cip” }
geoip {
source => “cip”
fields => [“timezone”,”location”] # 只显示 geoip信息 只需要的字段
}
output
es
elasticsearch {
hosts => ["es1", "es2","es3"]
}
钉钉
output {
http {
url => “https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxx“
http_method => “post”
content_type => “application/json; charset=utf-8”
format => “message”
message => ‘{“msgtype”:”text”,”text”:{“content”:”app logstash”}}’
}
}
推送的 message 要使用字段
message => ‘{“msgtype”:”text”,”text”:{“content”:”app logstash %{[host][ip][0]} \n %{[message]}”}}’