简单测试启动

指定文件

test.conf

  1. input {
  2. stdin{ }
  3. }
  4. output {
  5. stdout { codec=>"rubydebug" }
  6. }

logstash -f test.conf

指定启动参数

logstash -e “input { stdin{ } } output { stdout { } }”

指定 jdk

bin/logstash 会加载 logstash.lib.sh
image.png

logstash.lib.sh 中指定 JAVACMD

logstash 启动参数

logstash [OPTIONS]
-n, —node.name

-f, —config CONFIG_PATH
可指定一个目录, 读取目录下所有文件拼成一个大文件
文件按字母顺序读取, input filter output 按顺序

-e, —config.string CONFIG_STRING
-w, —filterworkers COUNT
—watchdog-timeout SECONDS
-l, —log FILE
-p, —pluginpath PATH
-w, —pipeline.workers COUNT
-b, —pipeline.batch.size SIZE
-u, —pipeline.batch.delay DELAY_IN_MS
-t, —config.test_and_exit
-r, —config.reload.automatic
—config.reload.interval RELOAD_INTERVAL
—path.data PATH
—path.settings 包含 logstash.yml 的目录

nohup /apps/logstash/bin/logstash -f /apps/logstash/config/filebeat.conf —config.reload.automatic &> /apps/logstash/logstash.log &

启动多个进程


每个进程需要单独的 path.data

/data/scripts/logstash-oslog.sh
#!/bin/bash
#
nohup /apps/logstash/bin/logstash -f /apps/logstash/oslog.conf —path.data /data/logstash/data/oslog —config.reload.automatic &> /data/scripts/logstash-oslog.log &

/data/scripts/logstash-www.sh
#!/bin/bash
#
nohup /apps/logstash/bin/logstash -f /apps/logstash/www.conf —path.data /data/logstash/data/www —config.reload.automatic &> /data/scripts/logstash-www.log &

input

beats

  1. input {
  2. beats {
  3. port => 5044
  4. }
  5. }

file

从指定的文件中读取事件流;
使用 FileWatch(Ruby Gem库)监听文件的变化。
.sincedb_xxxxxxxx:记录了每个被监听的文件的inode, major number, minor nubmer, pos;

file {
path => [ “/data/mysql/3306/3306-error.err” ]
type => “mysql.err”
start_position => “beginning”
}

close_older
默认 1h
已经监听的文件,在此时间内没有更新,就关闭它的文件句柄

discover_interval
默认 15s
每隔多久去检测指定 path 下是否有新文件
乘上 stat_interval

exclude
排除不被监听的文件

ignore_older

path
array
[“xxx”,”yyy”]
/var/log/*/.log 递归搜索 /var/log/ 下面所有 .log

stat_interval
Default value is “1 second”

sincedb_path
/plugins/inputs/file/.sincedb_xxxx一大坨

sincedb_write_interval
默认 15s
每隔多久写入 .sincedb 文件

stat_interval
默认 1s
每隔多久去检测一次文件是否改变

start_position
Value can be any of: beginning, end
Default value is “end”
已监听的文件存在 .sincedb,会从记录过的 pos 开始
可将 sincedb_path 指向 /dev/null ,每次重头开始

filter

grok 正则捕获

  1. input {
  2. stdin{ }
  3. }
  4. filter {
  5. grok {
  6. match => { "message" => "%{COMBINEDAPACHELOG}" }
  7. }
  8. }
  9. output {
  10. stdout { }
  11. }

72.30.142.174 - - [08/Jul/2010:05:24:10 +0800] “GET /robots.txt HTTP/1.0” 200 27 “null” “Mozilla/5.0 (compatible; Yahoo! slurp; http://help.yahoo.com/help/us/ysearch/slurp)”

image.png

mutate 数据修改

字段重命名

优先级高,其他字段操作以新字段名为准
mutate {
rename => {“老字段名”” => “新字段名”}
}

替换

mutate {
replace => {“字段” => “新字段”}
}

正则替换

mutate {
gsub => [
“字段”,”原”,”目标”,
“字段”,”原”,”目标”,
]
}


分割为数组

mutate {
split => {“字段” => “分割符”}
}

删除字段

mutate {
remove_field => [“字段1”,”字段2”,…]
}

添加字段

mutate {
add_field => {
“k1” => “v1”
“accessRUL” => “%{[message][6]}”
“clientIP” => “%{[message][23]}”
}
}


geoip 定位

geoip { source => “cip” }

image.png

geoip {
source => “cip”
fields => [“timezone”,”location”] # 只显示 geoip信息 只需要的字段
}
image.png

output

es

  1. elasticsearch {
  2. hosts => ["es1", "es2","es3"]
  3. }

钉钉

output {
http {
url => “https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxx
http_method => “post”
content_type => “application/json; charset=utf-8”
format => “message”
message => ‘{“msgtype”:”text”,”text”:{“content”:”app logstash”}}’
}
}

推送的 message 要使用字段
message => ‘{“msgtype”:”text”,”text”:{“content”:”app logstash %{[host][ip][0]} \n %{[message]}”}}’