Logstash工作原理

Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。
image.png

1.安装logstash

  1. [root@master ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.2.0.tar.gz
  2. [root@master ~]# tar zxvf logstash-7.2.0.tar.gz
  3. [root@master ~]# mv logstash-7.2.0 /usr/local/logstash
  4. [root@master ~]# vim /usr/local/logstash/config/test.conf
  5. input {
  6. beats {
  7. port => 5044
  8. }
  9. }
  10. ###########################################################################################################################
  11. filter {
  12. if [fields][appname] =~ ".*access-log" {
  13. grok {
  14. match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time}|-)\;(%{IPORHOST:Client_ip}|-)\;(%{DATA:User_name}|-)\;(%{WORD:Request_id}|-)\;(%{PATH:Request_uri}|-)\;(%{INT:Response_time}|-)\;(%{INT:Status_code}|-)\;(%{NOTSPACE:Status_message}|-)"] }
  15. }
  16. geoip {
  17. source => "Client_ip"
  18. }
  19. mutate {
  20. convert => [ "[geoip][coordinates]", "float"]
  21. }
  22. }
  23. if [fields][appname] =~ ".*info-log" {
  24. grok {
  25. match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time}) (%{WORD:INFO}) (%{GREEDYDATA:Message})"] }
  26. }
  27. if [message] == ";" {
  28. drop {}
  29. }
  30. }
  31. if [fields][appname] =~ ".*error-log" {
  32. grok {
  33. match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time}) (%{WORD:INFO}) (%{INT:Code}) --- \[(%{GREEDYDATA:Noname})\] (%{GREEDYDATA:Method}) \:(%{GREEDYDATA:Message})"] }
  34. }
  35. }
  36. if [fields][appname] =~ ".*bmw-info-log" {
  37. grok {
  38. match => { "message" => ["(\[%{DATA:Request_time}\](%{GREEDYDATA:Message}))"] }
  39. }
  40. }
  41. # if [fields][appname] =~ ".*billing-log" {
  42. # grok {
  43. # match => { "message" => ["(%{TIMESTAMP_ISO8601:Request_time} \[ %{DATA:Schedul} \] - \[ %{DATA:Level} \] \[ %{DATA:Localtion} \] - %{GREEDYDATA:Message})"] }
  44. # }
  45. # }
  46. date {
  47. match => [ "Request_time" , "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
  48. }
  49. }
  50. ##########################################################################################################################
  51. output {
  52. if [fields][appname] == "online-bmw-info-log"{
  53. elasticsearch {
  54. hosts => ["ESIP:9200"]
  55. user => "elastic"
  56. password => "situ1234"
  57. index => "bmw-info-log-%{+YYYY.MM.dd}"
  58. }
  59. }
  60. if [fields][appname] == "staging-bmw-access-log" {
  61. elasticsearch {
  62. hosts => ["ESIP:9200"]
  63. user => "elastic"
  64. password => "situ1234"
  65. index => "bmw-access-log-%{+YYYY.MM.dd}"
  66. }
  67. }
  68. if [fields][appname] == "staging-bmw-beta-access-log" {
  69. elasticsearch {
  70. hosts => ["ESIP:9200"]
  71. user => "elastic"
  72. password => "situ1234"
  73. index => "bmw-beta-access-log-%{+YYYY.MM.dd}"
  74. }
  75. }
  76. }
  77. #########################################################################################################################

2.准备启动服务

  1. # 以下为测试方法
  2. [root@master ~]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
  3. # 手动输入hello world它也会输出hello world

启动logstash

  1. root@master ~]# nohup bin/logstash -f /usr/local/logstash/config/test.conf &
  2. [root@master ~]# netstat -ntplu
  3. tcp6 0 0 :::5044 :::* LISTEN 14069/java
  4. tcp6 0 0 127.0.0.1:9600 :::* LISTEN 14069/java
  5. # 通过下面操作可以查看logstash和elasticsearch是否建立连接,刷新会看到数据变化。
  6. [root@master ~]# curl http://192.168.3.128:9200/_cat/indices
  7. yellow open filebeat-2019.07.16 PCHNvs83TvOuSd1iDU4S-A 5 1 4131 0 841.7kb 841.7kb
  8. yellow open .kibana qXyuI2ubT3-bBwyknxoyhA 1 1 2 0 21kb 21kb