Filebeat日志发送到Kafka
[root@server12 ~]# vim /usr/local/filebeat-6.6.0/filebeat.yml
filebeat.inputs:
- type: log
tail_files: true
backoff: “1s”
paths:
- /usr/local/nginx/logs/access.json.log
fields:
type: access
fields_under_root: true
output:
kafka:
hosts: [“192.168.10.12:9092”]
topic: jaking
[root@server12 ~]# ps aux | grep filebeat
root 18346 0.0 0.6 432748 17780 pts/0 Sl Mar14 0:07 /usr/local/filebeat-6.6.0/filebeat -e -c /usr/local/filebeat-6.6.0/filebeat.yml
root 21945 0.0 0.0 112652 956 pts/0 R+ 21:48 0:00 grep —color=auto filebeat
[root@server12 ~]# kill -9 18346
[root@server12 ~]# nohup /usr/local/filebeat-6.6.0/filebeat -e -c /usr/local/filebeat-6.6.0/filebeat.yml &>/tmp/filebeat.log &
[3] 21946
[1] Killed nohup /usr/local/filebeat-6.6.0/filebeat -e -c /usr/local/filebeat-6.6.0/filebeat.yml >> /tmp/filebeat.log
[root@server12 ~]# ps aux | grep filebeat
root 21946 0.2 0.5 289884 16420 pts/0 Sl 21:49 0:00 /usr/local/filebeat-6.6.0/filebeat -e -c /usr/local/filebeat-6.6.0/filebeat.yml
root 21954 0.0 0.0 112652 960 pts/0 R+ 21:49 0:00 grep —color=auto filebeat
[root@server12 ~]# tail -f /tmp/filebeat.log
2022-03-15T21:49:18.451+0800 INFO instance/beat.go:403 filebeat start running.
2022-03-15T21:49:18.452+0800 INFO registrar/registrar.go:134 Loading registrar data from /usr/local/filebeat-6.6.0/data/registry
2022-03-15T21:49:18.452+0800 INFO registrar/registrar.go:141 States Loaded from registrar: 3
2022-03-15T21:49:18.452+0800 WARN beater/filebeat.go:367 Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2022-03-15T21:49:18.452+0800 INFO crawler/crawler.go:72 Loading Inputs: 1
2022-03-15T21:49:18.452+0800 INFO [monitoring] log/log.go:117 Starting metrics logging every 30s
2022-03-15T21:49:18.453+0800 INFO log/input.go:138 Configured paths: [/usr/local/nginx/logs/access.json.log]
2022-03-15T21:49:18.453+0800 INFO input/input.go:114 Starting input of type: log; ID: 4627602243620244963
2022-03-15T21:49:18.453+0800 INFO crawler/crawler.go:106 Loading and starting Inputs completed. Enabled inputs: 1
2022-03-15T21:49:48.465+0800 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {“monitoring”: {“metrics”: {“beat”:{“cpu”:{“system”:{“ticks”:20,”time”:{“ms”:25}},”total”:{“ticks”:50,”time”:{“ms”:64},”value”:50},”user”:{“ticks”:30,”time”:{“ms”:39}}},”handles”:{“limit”:{“hard”:4096,”soft”:1024},”open”:6},”info”:{“ephemeral_id”:”77177f91-b682-45a0-b3af-1cc92f70bd10”,”uptime”:{“ms”:30044}},”memstats”:{“gc_next”:4194304,”memory_alloc”:2482056,”memory_total”:4513448,”rss”:17088512}},”filebeat”:{“events”:{“added”:1,”done”:1},”harvester”:{“open_files”:0,”running”:0}},”libbeat”:{“config”:{“module”:{“running”:0}},”output”:{“type”:”kafka”},”pipeline”:{“clients”:1,”events”:{“active”:0,”filtered”:1,”total”:1}}},”registrar”:{“states”:{“current”:3,”update”:1},”writes”:{“success”:1,”total”:1}},”system”:{“cpu”:{“cores”:1},”load”:{“1”:0.29,”15”:0.23,”5”:0.28,”norm”:{“1”:0.29,”15”:0.23,”5”:0.28}}}}}}
Logstash读取Kafka
[root@server12 ~]# vim /usr/local/logstash-6.6.0/config/logstash.conf
input {
kafka {
bootstrap_servers => “192.168.10.12:9092”
topics => [“jaking”]
group_id => “jaking”
codec => “json”
}
}
filter {
if [type] == “access” {
json {
source => “message”
remove_field => [“message”,”@version”,”path”,”beat”,”input”,”log”,”offset”,”prospector”,”source”,”tags”]
}
}
}
output {
stdout {
codec=>rubydebug
}
}
[root@server12 ~]# /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/config/logstash.conf
Kafka查看队列信息
1. 查看Group: ./kafka-consumer-groups.sh —bootstrap-server 192.168.10.12:9092 –list
[root@server12 ~]# /usr/local/kafka_2.11/bin/kafka-consumer-groups.sh —bootstrap-server 192.168.10.12:9092 –list
jaking
2. 查看队列:./kafka-consumer-groups.sh —bootstrap-server 192.168.10.12:9092 —group jaking —describe
[root@server12 ~]# /usr/local/kafka_2.11/bin/kafka-consumer-groups.sh —bootstrap-server 192.168.10.12:9092 —group jaking —describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
jaking 0 0 0 0 logstash-0-e0dee025-0864-4a15-b2ab-089438645da9 /192.168.10.12 logstash-0
[root@server12 ~]# curl 127.0.0.1
<!DOCTYPE html>
Welcome to nginx!
If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.
For online documentation and support please refer to
nginx.org.
](http://nginx.org/">nginx.org.
)
Commercial support is available at
nginx.com.
Thank you for using nginx.
[root@server11 ~]# curl 192.168.10.12
<!DOCTYPE html>
Welcome to nginx!
If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.
For online documentation and support please refer to
nginx.org.
](http://nginx.org/">nginx.org.
)
Commercial support is available at
nginx.com.
Thank you for using nginx.
查看logstash输出信息
[2022-03-15T10:21:00,727][INFO ][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=jaking] Node 0 was unable to process the fetch request with (sessionId=756771730, epoch=365): INVALID_FETCH_SESSION_EPOCH.
{
“type” => “access”,
“referer” => “-“,
“bodysize” => 612,
“clientip” => “127.0.0.1”,
“url” => “/index.html”,
“status” => 200,
“host” => {
“name” => “server12”
},
“ua” => “curl/7.29.0”,
“handletime” => 0.0,
“@timestamp” => 2022-03-15T14:27:11.000Z
}
{
“type” => “access”,
“referer” => “-“,
“bodysize” => 612,
“clientip” => “192.168.10.11”,
“url” => “/index.html”,
“status” => 200,
“host” => {
“name” => “server12”
},
“ua” => “curl/7.29.0”,
“handletime” => 0.0,
“@timestamp” => 2022-03-15T14:28:38.000Z
}