Logstash从数据库读取到ES里面
input {jdbc {#设置连接字符串jdbc_connection_string =>"jdbc:mysql://101.133.224.127:3306/guli_edu?useUnicode=true&characterEncoding=utf8"#用户名和密码jdbc_user => "root"jdbc_password =>"11111111"#数据库的驱动地址jdbc_driver_library =>"/usr/local/logstash-6.8.4/sync-db/mysql-connector-java-5.1.46.jar"#驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"#开启分页jdbc_paging_enabled => "true"#分页每页数量jdbc_page_size =>"1000"#执行sql文件路径statement_filepath =>"/usr/local/logstash-6.8.4/sync-db/guli.sql"#设置定时任务间隔, 含义:分,时,天,月,年,全部为* 默认含义为每分钟跑一次schedule => "* * * * *"#索引类型type => "_doc"#是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path的文件use_column_value => true#记录上次追踪的结果值 文件路径last_run_metadata_path => ""#如果 use_column_value 为 ture ,配置本参数追踪的column名 可以是自增id或者时间tracking_column =>"gmt_modified"#tracking_column 对应的字段类型tracking_column_type => "timestamp"#是否清除 last_run_metadata_path 的记录,true表示每次都从头开始查找clean_run => false#数据库字段名称大写转小写lowercase_column_names => false}}output {elasticsearch {#es地址hosts =>[""]#同步的索引名index =>"xiaohu"#设置_docID和数据相同document_id => "%{id}"}#日志输出stdout {codec => json_lines}}
FileBeat 读取配置文件到 Kafka
###################### Filebeat Configuration Example #########################
filebeat.prospectors:
- input_type: log
paths:
## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
- /usr/local/web/api/edu/logs/app-service-edu.log
#定义写入 ES 时的 _type 值
document_type: "service-edu-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: service-edu
logtopic: app-log-service-edu ## 按服务划分用作kafka topic
evn: dev
- input_type: log
paths:
-/usr/local/web/api/edu/logs/error-service-edu.log
document_type: "service-edu-error-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: service-edu
logtopic: error-log-service-edu ## 按服务划分用作kafka topic
evn: dev
output.kafka:
enabled: true
hosts: ["127.0.0.1:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic app-log-service-edu --partitions 1 --replication-factor 1
LogStash读取Kafka的数据到ES
## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {
kafka {
## app-log-服务名称
topics_pattern => "service-edu-.*"
bootstrap_servers => "127.0.0.1:9092" #kafka地址
codec => json #输出格式json
consumer_threads => 1 ## 增加consumer的并行消费线程数 优化之一 Logstash开启多少个线程消费
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-service-edu-group" #kafka的分组
}
kafka {
## error-log-服务名称
topics_pattern => "error-service-edu-.*"
bootstrap_servers => "127.0.0.1:9092"
codec => json
consumer_threads => 1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-service-edu-group"
}
}
filter {
## 时区转换 es的时间问题所以我们得时间转换不然日志输出会有问题,我们比es的时间慢半个小时 东八区
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
#filebeat 中定义的 [fields][logtopic] 字段属性
if "app-log" in [fields][logtopic]{
grok {
## 表达式,这里对应的是Springboot输出的日志格式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
## 测试输出到控制台:
output {
stdout { codec => rubydebug }
}
## elasticsearch:
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["127.0.0.1:9200"]
# 用户名密码
# user => "elastic"
# password => "123456"
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.23
index => "app-log-service-edu-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://192.168.8.170:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["127.0.0.1:9200"]
user => "elastic"
password => "123456"
index => "error-log-service-edu-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}
