一、软件下载
软件下载集结:https://elasticsearch.cn/download,注意软件版本需要一致,我这里用的是 v7.1.1,另外如果你想下载 Head 插件,可以搜下谷歌插件,更方便。
二、采集架构
废话先不多说,看下日志采集架构:
Filebeat:收集服务的日志信息
Logstash:将日志信息格式化加工处理,发送到 ES 或执行其它操作
Elasticsearch:日志信息存储
Kibana:结合 ES 进行搜索,还提供了图表功能,可以添加 Dashboard
这个简单的 demo 在我本地 Mac 运行,只做演示。
2.1 重点提示
- Logstash 中 grok 的配置,是按我的日志进行配置的,实际上需要根据具体的日志格式进行不同的处理,否则 grok 会解析失败,可能会看到这个错误提示:_grokparsefailure,日志就原样输出了,并没有按照你定义的格式输出,配置过程中需要注意。
三、Filebeat
为什么不直接使用 Logstash 采集日志?Filebeat 更加的轻量级,保证了文件至少有一次输出,会记录读取文件的偏移量。
官方参考文档,非常详细。
3.1 配置
Filebeat 对日志按行进行读取,在 Filebeat 软件目录中官方提供了一个 filebeat.reference.yml参考配置文件,提供了对常用软件的配置参考,例如:Kafka、ELK等。
Filebeat 比较简单不需要做太多的操作,只要把数据收集到 Logstash 中就行,唯一要注意的可能是 Java 日志会打印堆栈信息,而 Filebeat 是按行读取的,这里我们需要注意下。
对 filebeat.yml 进行配置:
filebeat.inputs:
- type: log
paths:
- /mnt/logstash/2019-12-10/app-info.log
fields:
type: info
clientIp: 192.168.3.77
serverName: app
- type: log
paths:
- /mnt/logstash/2019-12-10/app-warn.log
# 还可以配置其它的路径日志
#- /mnt/logs/other.log
multiline:
# 正则
pattern: ^\d{4}
negate: true
match: after
fields:
type: warn
clientIp: 192.168.3.77
serverName: app
- type: log
paths:
- /mnt/logstash/2019-12-10/app-error.log
multiline:
pattern: ^\d{4}
negate: true
match: after
fields:
type: error
clientIp: 192.168.3.77
serverName: app
output.logstash:
hosts: ["localhost:5043"]
这个配置可以用来参考:
filebeat.inputs:
- type: log
# 开关
enabled: true
paths:
# 日志文件路径,可以用用通配符,可以配置多个路径,下面两个等同于:/mnt/*/log-info.log
- /mnt/log/log-info.log
- /mnt/logs/log-info.log
# 自定义属性,用于 Logstash 中
# 扫描间隔,单位为秒,设置太小会引起filebeat频繁扫描文件,导致cpu占用百分比过高
# scan_frequency: 50
# 不同日志处理方式不同
- type: log
enabled: true
paths:
- /mnt/log/log-error.log
# 日志多行处理,例如异常堆栈信息,支持正则表达式
multiline:
# 匹配前缀为数字开头,如果不是日期,该行日志接到上一行后尾
pattern: ^\d{4}
negate: true
match: after
output.logstash:
# 输出到logstash的安装位置,以及监听的端口
hosts: ["127.0.0.1:5043"]
3.2 启动
执行命令:
# 指定配置文件启动
./filebeat -e -c filebeat.yml
四、Logstash
这里是重点,主要是利用 Logstash 提供的插件对数据进行处理,再输出到 ES 中,最好能统一应用的输出日志,例如这里的格式是在 logback 中定义好,用 || 来分隔:
%d{yyyy-MM-dd HH:mm:ss.SSS} || %5p || ${PID:- } || [%t] || %-40.40logger{39} : || %m%n%wex
输出的日志格式:
2019-12-10 22:22:30.007 || ERROR || 86354 || [pool-1-thread-1] || c.tensquare.manager.ManagerApplication : || 错误日志:
java.lang.ArithmeticException: / by zero
at com.tensquare.manager.ManagerApplication.m2(ManagerApplication.java:44)
at com.tensquare.manager.ManagerApplication.m1(ManagerApplication.java:40)
at com.tensquare.manager.ManagerApplication.log(ManagerApplication.java:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2019-12-10 22:22:30.008 || WARN || 86354 || [pool-1-thread-1] || c.tensquare.manager.ManagerApplication : || warn 日志,一些重要的操作
2019-12-10 22:22:30.005 || INFO || 86354 || [pool-1-thread-1] || c.tensquare.manager.ManagerApplication : || info 日志,记录一些普通的信息
4.1 配置
Logstash 的特点就是提供丰富的插件,例如:Grok 等,配置 logstash.conf 文件:
input { # 指定输入源-beats
beats {
host => "localhost"
port => "5043"
}
}
filter {
mutate {
# 根据 || 分隔
split => { "message" => "||"}
# 添加字段
add_field => { "create_time" => "%{[message][0]}" }
add_field => { "level" => "%{[message][2]}" }
add_field => { "class" => "%{[message][8]}" }
add_field => { "msg" => "%{[message][10]}" }
# 去空格
strip => ["create_time"]
strip => ["level"]
strip => ["class"]
strip => ["msg"]
# 删除不需要的字段等等
remove_field => [ "message","@timestamp","host","version" ]
}
}
output {
# 输出到控制台调试,可以注释掉
stdout {
codec => rubydebug
}
elasticsearch {
hosts => [ "localhost:9200" ]
# 在es中存储的索引格式,按照“服务名-日期”进行索引
index => "%{[fields][serverName]}-%{+YYYY.MM.dd}"
}
}
这个配置可以用来参考:
# 指定输入源-beats
input {
beats {
host => "localhost"
port => "5043"
}
}
# 日志格式化,过滤处理
filter {
# 如果是error类型的日志该怎么处理,在filebeat 的fields中定义
if [fields][log_type] == 'error' {
# 使用 grok 插件进行一整条日志信息格式成key-value信息
grok {
# 这里采用的是grok预制的一些正则,":"后面的logdate是我们自定义的key
match => {
"message" => "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{JAVACLASS:class} %{NUMBER:thread} %{JAVALOGMESSAGE:logmsg}"
}
}
# 将 kibana 的查询时间改成日志的打印时间,方便之后查询,
# 如果不改的话,kibana会有自己的时间,导致查询不方便
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
target => "@timestamp"
}
}
# 如果是info类型该怎么格式,这里是重复的,如果有日志格式不一样比如nginx的日志类型,可以在这里自己定义
if [fields][log_type] == 'info' {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{JAVACLASS:class} %{NUMBER:thread} %{JAVALOGMESSAGE:logmsg}"
}
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
target => "@timestamp"
}
}
}
# 输出设置
output {
# 输出到控制台
stdout {
codec => rubydebug
}
# 输出到elasticsearch,提供给kibana进行搜索
elasticsearch {
hosts => [ "localhost:9200" ]
# 在es中存储的索引格式,按照“服务名-日期”进行索引
index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
}
}
4.2 启动
执行命令:
# 指定配置文件启动
./bin/logstash -f logstash.conf
五、Elasticsearch
5.1 配置
使用默认即可
5.2 启动
执行命令:
./bin/elasticsearch
六、Kibana
6.1 配置
如果想换成中文,在 config/kibana.yml 配置文件的最后一行换成中文即可,如下:
i18n.locale: "zh-CN"
6.2 启动
执行命令:
./bin/kibana
七、查看数据
- 查看所有索引:http://localhost:9200/_cat/indices
- 在 Kibana 中删除索引:DELETE /app-2019.12.10
- 查看索引中的文档,在浏览器输入:http://localhost:9200/app-2019-12-10/_search?pretty 数据如下: