一、软件下载

软件下载集结:https://elasticsearch.cn/download,注意软件版本需要一致,我这里用的是 v7.1.1,另外如果你想下载 Head 插件,可以搜下谷歌插件,更方便。

二、采集架构

废话先不多说,看下日志采集架构:
image.png

Filebeat:收集服务的日志信息
Logstash:将日志信息格式化加工处理,发送到 ES 或执行其它操作
Elasticsearch:日志信息存储
Kibana:结合 ES 进行搜索,还提供了图表功能,可以添加 Dashboard

这个简单的 demo 在我本地 Mac 运行,只做演示。

2.1 重点提示

  • Logstash 中 grok 的配置,是按我的日志进行配置的,实际上需要根据具体的日志格式进行不同的处理,否则 grok 会解析失败,可能会看到这个错误提示:_grokparsefailure,日志就原样输出了,并没有按照你定义的格式输出,配置过程中需要注意。

三、Filebeat

为什么不直接使用 Logstash 采集日志?Filebeat 更加的轻量级,保证了文件至少有一次输出,会记录读取文件的偏移量。

官方参考文档,非常详细。

3.1 配置

Filebeat 对日志按行进行读取,在 Filebeat 软件目录中官方提供了一个 filebeat.reference.yml参考配置文件,提供了对常用软件的配置参考,例如:Kafka、ELK等。

Filebeat 比较简单不需要做太多的操作,只要把数据收集到 Logstash 中就行,唯一要注意的可能是 Java 日志会打印堆栈信息,而 Filebeat 是按行读取的,这里我们需要注意下。

对 filebeat.yml 进行配置:

  1. filebeat.inputs:
  2. - type: log
  3. paths:
  4. - /mnt/logstash/2019-12-10/app-info.log
  5. fields:
  6. type: info
  7. clientIp: 192.168.3.77
  8. serverName: app
  9. - type: log
  10. paths:
  11. - /mnt/logstash/2019-12-10/app-warn.log
  12. # 还可以配置其它的路径日志
  13. #- /mnt/logs/other.log
  14. multiline:
  15. # 正则
  16. pattern: ^\d{4}
  17. negate: true
  18. match: after
  19. fields:
  20. type: warn
  21. clientIp: 192.168.3.77
  22. serverName: app
  23. - type: log
  24. paths:
  25. - /mnt/logstash/2019-12-10/app-error.log
  26. multiline:
  27. pattern: ^\d{4}
  28. negate: true
  29. match: after
  30. fields:
  31. type: error
  32. clientIp: 192.168.3.77
  33. serverName: app
  34. output.logstash:
  35. hosts: ["localhost:5043"]

这个配置可以用来参考:

  1. filebeat.inputs:
  2. - type: log
  3. # 开关
  4. enabled: true
  5. paths:
  6. # 日志文件路径,可以用用通配符,可以配置多个路径,下面两个等同于:/mnt/*/log-info.log
  7. - /mnt/log/log-info.log
  8. - /mnt/logs/log-info.log
  9. # 自定义属性,用于 Logstash 中
  10. # 扫描间隔,单位为秒,设置太小会引起filebeat频繁扫描文件,导致cpu占用百分比过高
  11. # scan_frequency: 50
  12. # 不同日志处理方式不同
  13. - type: log
  14. enabled: true
  15. paths:
  16. - /mnt/log/log-error.log
  17. # 日志多行处理,例如异常堆栈信息,支持正则表达式
  18. multiline:
  19. # 匹配前缀为数字开头,如果不是日期,该行日志接到上一行后尾
  20. pattern: ^\d{4}
  21. negate: true
  22. match: after
  23. output.logstash:
  24. # 输出到logstash的安装位置,以及监听的端口
  25. hosts: ["127.0.0.1:5043"]

3.2 启动

执行命令:

  1. # 指定配置文件启动
  2. ./filebeat -e -c filebeat.yml

四、Logstash

这里是重点,主要是利用 Logstash 提供的插件对数据进行处理,再输出到 ES 中,最好能统一应用的输出日志,例如这里的格式是在 logback 中定义好,用 || 来分隔:

  1. %d{yyyy-MM-dd HH:mm:ss.SSS} || %5p || ${PID:- } || [%t] || %-40.40logger{39} : || %m%n%wex

输出的日志格式:

  1. 2019-12-10 22:22:30.007 || ERROR || 86354 || [pool-1-thread-1] || c.tensquare.manager.ManagerApplication : || 错误日志:
  2. java.lang.ArithmeticException: / by zero
  3. at com.tensquare.manager.ManagerApplication.m2(ManagerApplication.java:44)
  4. at com.tensquare.manager.ManagerApplication.m1(ManagerApplication.java:40)
  5. at com.tensquare.manager.ManagerApplication.log(ManagerApplication.java:31)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  7. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  8. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  9. at java.lang.reflect.Method.invoke(Method.java:498)
  10. at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
  11. at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
  12. at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
  13. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  14. at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
  15. at java.util.concurrent.FutureTask.run(FutureTask.java)
  16. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  17. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  18. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  19. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  20. 2019-12-10 22:22:30.008 || WARN || 86354 || [pool-1-thread-1] || c.tensquare.manager.ManagerApplication : || warn 日志,一些重要的操作
  21. 2019-12-10 22:22:30.005 || INFO || 86354 || [pool-1-thread-1] || c.tensquare.manager.ManagerApplication : || info 日志,记录一些普通的信息

4.1 配置

Logstash 的特点就是提供丰富的插件,例如:Grok 等,配置 logstash.conf 文件:

  1. input { # 指定输入源-beats
  2. beats {
  3. host => "localhost"
  4. port => "5043"
  5. }
  6. }
  7. filter {
  8. mutate {
  9. # 根据 || 分隔
  10. split => { "message" => "||"}
  11. # 添加字段
  12. add_field => { "create_time" => "%{[message][0]}" }
  13. add_field => { "level" => "%{[message][2]}" }
  14. add_field => { "class" => "%{[message][8]}" }
  15. add_field => { "msg" => "%{[message][10]}" }
  16. # 去空格
  17. strip => ["create_time"]
  18. strip => ["level"]
  19. strip => ["class"]
  20. strip => ["msg"]
  21. # 删除不需要的字段等等
  22. remove_field => [ "message","@timestamp","host","version" ]
  23. }
  24. }
  25. output {
  26. # 输出到控制台调试,可以注释掉
  27. stdout {
  28. codec => rubydebug
  29. }
  30. elasticsearch {
  31. hosts => [ "localhost:9200" ]
  32. # 在es中存储的索引格式,按照“服务名-日期”进行索引
  33. index => "%{[fields][serverName]}-%{+YYYY.MM.dd}"
  34. }
  35. }

这个配置可以用来参考:

  1. # 指定输入源-beats
  2. input {
  3. beats {
  4. host => "localhost"
  5. port => "5043"
  6. }
  7. }
  8. # 日志格式化,过滤处理
  9. filter {
  10. # 如果是error类型的日志该怎么处理,在filebeat 的fields中定义
  11. if [fields][log_type] == 'error' {
  12. # 使用 grok 插件进行一整条日志信息格式成key-value信息
  13. grok {
  14. # 这里采用的是grok预制的一些正则,":"后面的logdate是我们自定义的key
  15. match => {
  16. "message" => "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{JAVACLASS:class} %{NUMBER:thread} %{JAVALOGMESSAGE:logmsg}"
  17. }
  18. }
  19. # 将 kibana 的查询时间改成日志的打印时间,方便之后查询,
  20. # 如果不改的话,kibana会有自己的时间,导致查询不方便
  21. date {
  22. match => ["logdate", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
  23. target => "@timestamp"
  24. }
  25. }
  26. # 如果是info类型该怎么格式,这里是重复的,如果有日志格式不一样比如nginx的日志类型,可以在这里自己定义
  27. if [fields][log_type] == 'info' {
  28. grok {
  29. match => {
  30. "message" => "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{JAVACLASS:class} %{NUMBER:thread} %{JAVALOGMESSAGE:logmsg}"
  31. }
  32. }
  33. date {
  34. match => ["logdate", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
  35. target => "@timestamp"
  36. }
  37. }
  38. }
  39. # 输出设置
  40. output {
  41. # 输出到控制台
  42. stdout {
  43. codec => rubydebug
  44. }
  45. # 输出到elasticsearch,提供给kibana进行搜索
  46. elasticsearch {
  47. hosts => [ "localhost:9200" ]
  48. # 在es中存储的索引格式,按照“服务名-日期”进行索引
  49. index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
  50. }
  51. }

4.2 启动

执行命令:

  1. # 指定配置文件启动
  2. ./bin/logstash -f logstash.conf

五、Elasticsearch

5.1 配置

使用默认即可

5.2 启动

执行命令:

  1. ./bin/elasticsearch

六、Kibana

6.1 配置

如果想换成中文,在 config/kibana.yml 配置文件的最后一行换成中文即可,如下:

  1. i18n.locale: "zh-CN"

6.2 启动

执行命令:

  1. ./bin/kibana

七、查看数据

image.png

八、参考资料