锥智这套日志收集系统流程是:
各种日志文件 —> FileBeats —> Kafka —> Logstash —> ElasticSearch

FileBeats配置

  1. ###################### Filebeat Configuration Example #########################
  2. #=========================== Filebeat inputs =============================
  3. filebeat.inputs:
  4. - enabled: true
  5. paths:
  6. - /data/app/*/logs/mqtt.log
  7. - /data/app/*/logs/exception.log
  8. - /data/app/*/logs/messages.log
  9. - /data/app/*/logs/requestcontext.log
  10. - /data/app/*/logs/responsecontext.log
  11. - /data/app/*/logs/transaction_tracking.log
  12. - /data/app/*/logs/transaction_tracking_step.log
  13. type: log
  14. - enabled: true
  15. exclude_files: []
  16. paths:
  17. - /data/app/*/logs/run.log
  18. type: log
  19. ### Multiline options
  20. # 针对Java日志报错内容进行多行配置
  21. multiline.pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'
  22. multiline.negate: true
  23. multiline.match: after
  24. #============================= Filebeat modules ===============================
  25. filebeat.config.modules:
  26. # Glob pattern for configuration loading
  27. path: ${path.config}/modules.d/*.yml
  28. # Set to true to enable config reloading
  29. reload.enabled: false
  30. #==================== Elasticsearch template setting ==========================
  31. setup.template.settings:
  32. index.number_of_shards: 3
  33. #================================ General =====================================
  34. #============================== Dashboards =====================================
  35. #============================== Kibana =====================================
  36. setup.kibana:
  37. # ============================== kafaka output ===================================
  38. output.kafka:
  39. # hosts: ['10.111.212.165:9092', '10.111.212.166:9092','10.111.212.167:9092', '10.111.212.168:9092']
  40. hosts: ["10.11.7.25:9092"]
  41. topic: file_logging
  42. required_acks: 1

配置文件其实还是比较好懂得

  1. 输入配置两拨文件,run.log需要多行处理
  2. 发送到Kafka,并且acks方式为1,那就确保全收到

Logstash

  1. input {
  2. kafka {
  3. bootstrap_servers => "10.11.7.25:9092"
  4. topics => ["file_logging"]
  5. consumer_threads => 8
  6. codec => json {
  7. charset => "UTF-8"
  8. }
  9. client_id => "logstash-logging"
  10. type => "logging"
  11. }
  12. kafka {
  13. bootstrap_servers => "10.11.7.25:9092"
  14. topics => ["middleware_monitor"]
  15. consumer_threads => 4
  16. client_id => "logstash-monitor"
  17. codec => json {
  18. charset => "UTF-8"
  19. }
  20. type => "monitor"
  21. }
  22. kafka {
  23. bootstrap_servers => "10.11.7.25:9092"
  24. topics => ["server_monitor_topic"]
  25. consumer_threads => 4
  26. codec => json
  27. client_id => "logstash-services"
  28. type => "services"
  29. }
  30. }
  31. filter {
  32. if[type] == "logging"{
  33. mutate{
  34. split => {"source" => "/"}
  35. }
  36. mutate{
  37. add_field => {
  38. "service" => "%{[source][3]}"
  39. "is_nginx" => "%{[source][4]}"
  40. "level" => "%{[source][5]}"
  41. }
  42. }
  43. mutate{
  44. remove_field => ["beat", "source", "offset", "prospector", "input", "@metadata"]
  45. }
  46. }
  47. }
  48. output {
  49. if[type] == "monitor"{
  50. elasticsearch{
  51. hosts => ['10.11.7.24:9200']
  52. index => "monitor-%{+YYYY.MM.dd}"
  53. timeout => 300
  54. }
  55. } else if[type] == "services"{
  56. elasticsearch{
  57. hosts => ['10.11.7.24:9200']
  58. index => "server-monitor-%{+YYYY.MM.dd}"
  59. timeout => 300
  60. }
  61. }
  62. if[type] == "logging"{
  63. if[level] == "exception.log"{
  64. elasticsearch{
  65. hosts => ['10.11.7.24:9200']
  66. index => "exception-logging-%{+YYYY.MM.dd}"
  67. timeout => 300
  68. }
  69. }
  70. if[level] == "responsecontext.log"{
  71. elasticsearch{
  72. hosts => ['10.11.7.24:9200']
  73. index => "responsecontext-logging-%{+YYYY.MM.dd}"
  74. timeout => 300
  75. }
  76. }
  77. if[level] == "requestcontext.log"{
  78. elasticsearch{
  79. hosts => ['10.11.7.24:9200']
  80. index => "requestcontext-logging-%{+YYYY.MM.dd}"
  81. timeout => 300
  82. }
  83. }
  84. if[level] == "run.log"{
  85. elasticsearch{
  86. hosts => ['10.11.7.24:9200']
  87. index => "run-logging-%{+YYYY.MM.dd}"
  88. timeout => 300
  89. }
  90. }
  91. if[level] == "transaction_tracking_step.log"{
  92. elasticsearch{
  93. hosts => ['10.11.7.24:9200']
  94. index => "transaction-logging-%{+YYYY.MM.dd}"
  95. timeout => 300
  96. }
  97. }
  98. if[level] == "transaction_tracking.log"{
  99. elasticsearch{
  100. hosts => ['10.11.7.24:9200']
  101. index => "transaction-logging-%{+YYYY.MM.dd}"
  102. timeout => 300
  103. }
  104. }
  105. if[level] == "messages.log"{
  106. elasticsearch{
  107. hosts => ['10.11.7.24:9200']
  108. index => "messages-logging-%{+YYYY.MM.dd}"
  109. timeout => 300
  110. }
  111. }
  112. if[level] == "access.log"{
  113. elasticsearch{
  114. hosts => ['10.11.7.24:9200']
  115. index => "nginx-logging-%{+YYYY.MM.dd}"
  116. timeout => 300
  117. }
  118. }
  119. if[level] == "error.log"{
  120. elasticsearch{
  121. hosts => ['10.11.7.24:9200']
  122. index => "nginx-logging-%{+YYYY.MM.dd}"
  123. timeout => 300
  124. }
  125. }
  126. if[level] == "mqtt.log"{
  127. elasticsearch{
  128. hosts => ['10.11.7.24:9200']
  129. index => "mqtt-logging-%{+YYYY.MM.dd}"
  130. timeout => 300
  131. }
  132. }
  133. }
  134. }

Input配置

  • bootstrap_servers => “10.11.7.25:9092”:指定Kafka位置
  • topics => [“file_logging”]:指定Topics
  • consumer_threads => 8:指定消费者线程
  • codec => json {

    1. charset => "UTF-8"<br /> }
  • client_id => “logstash-logging”:消费者id

  • type => “logging”:给这组数据一个标签

    Filter配置

  • 通过mutate做了字段的分割,添加字段和移除字段

    Output配置

  • 进不同的索引