一、 介绍
Logstash 是具有管道输送能力的开源数据收集引擎。它是一款日志而不仅限于日志的搜集处理框架,它可以动态地从分散的数据源收集数据,并且可以标准化数据格式输送到你选择的目的地。
二、 操作步骤
1.下载logstash(这里我放在/es目录下):
[root@localhost es]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.2.tar.gz
(注意:如果出现"-bash: wget: 未找到命令",则使用命令先安装wget: yum -y install wget)
2.解压:[root@localhost es]# tar -zxvf logstash-7.3.2.tar.gz
3.修改jvm内存(默认是1G,如果机器内存足够,这里可以不用更改)
[root@localhost config]# vi jvm.options
-Xms256m
-Xmx256m
4. 测试能不能运行,进入到/es/logstash-7.3.2/bin目录,然后执行命令:
[root@localhost bin]# ./logstash -e 'input { stdin { } } output { stdout {} }'
(注意:如果出现:Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME.则需要先安装jdk)
5. 安装 jdbc 和 elasticsearch 插件,ctrl+c取消运行然后分别输入以下命令:
a.安装jdbc插件(这条命令要等好久。。。。):
[root@localhost logstash-7.3.2]# bin/logstash-plugin install logstashinput-
jdbc
b.安装elasticsearch 插件:
[root@localhost logstash-7.3.2]# bin/logstash-plugin install logstashoutput-
elasticsearch
6.准备mysql-connector-java驱动包,放到/es/logstash-7.3.2/config目录下
7.在config目录下,编写同步配置文件 logstash.conf:
logstash.conf:
input {
jdbc {
type => "product"
# mysql相关jdbc配置
jdbc_connection_string => "jdbc:mysql://49.232.91.87:3306/ec-goods?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "oracle!123"
# jdbc连接mysql驱动的文件 此处路径一定要正确 否则会报com.mysql.jdbc.Driver could not be loaded
jdbc_driver_library => "/es/logstash-7.3.2/mysql-connector-java-5.1.46.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 开启分页查询
jdbc_paging_enabled => true
jdbc_page_size => "50000"
# 同步SQL语句:
# 如果要使字段和实体类的驼峰命名法一致则需要这样写:
statement => "select id,name,description,price,stock,level1_id as level1Id,level2_id as level2Id,level3_id as level3Id,main_img as mainImg,sub_imgs as subImgs,status,create_time as createTime,update_time as updateTime from product where update_time >= :sql_last_value order by update_time asc"
# 定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
#是否把大写字段名称全改成小写
lowercase_column_names => "false"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的跟踪字段的值记录下来,保存到
last_run_metadata_path 指定的文件中
record_last_run => true
# 是否需要记录某个字段的值,如果record_last_run为真,可以自定义我们需要跟踪的字段名称,此时该参数就要为 true. 否则默认跟踪的是 timestamp 的值.
use_column_value => true
# 如果 use_column_value 为真,需配置此参数.跟踪的数据库字段名,该字段必须是递增的. 如果字段使用了别名,这里需要使用别名
tracking_column => "updateTime"
#跟踪字段的类型
tracking_column_type => "timestamp"
# 最后更新时间文件位置
last_run_metadata_path => "record_last_run_product"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
}
jdbc {
type => "category"
jdbc_connection_string => "jdbc:mysql://49.232.91.87:3306/ec-goods?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "oracle!123"
jdbc_driver_library => "/es/logstash-7.3.2/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => "50000"
statement => "select id,name,parent_id as parentId,level,icon,status,create_time as createTime,update_time as updateTime from category where update_time >= :sql_last_value order by update_time asc"
schedule => "* * * * *"
lowercase_column_names => "false"
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "record_last_run_category"
clean_run => false
}
}
output {
if [type] == "product" {
elasticsearch {
hosts => ["192.168.180.110:9200"]
# index名 自定义相当于数据库
index => "product"
#需要关联的数据库中有一个id字段,对应索引的id号
document_id => "%{id}"
}
}
if [type] == "category" {
elasticsearch {
hosts => ["192.168.180.110:9200"]
# index名 自定义相当于数据库
index => "category"
#需要关联的数据库中有一个id字段,对应索引的id号
document_id => "%{id}"
}
}# 这里输出调试,正式运行时可以注释掉
stdout {
codec => json_lines
}
}
9.启动logstash,进入到/es/logstash-7.3.2目录:
[root@localhost logstash-7.3.2]# ./bin/logstash -f ./config/logstash.conf
(注意:如果报"Expected one of #,input,filter,output at line 1,column 1(byte1)after"错误,则把配置文件改为UTF-8无BOM模式即可)UTF-8+BOM
10.设置logstash同步mysql数据到es使用ik中文分词器只需在同步前在kibana创建索引设置对应中文字段的分词器为ik_max_word即可
测试IK分词是否可用:
get _analyze
{
"analyzer":"ik_max_word",
"text":"中华人民共和国"
}
三、测试
#设置index中文字段为ik分词器
#查看索引mapping(结构映射)
get product/_mapping
get category/_mapping
#添加索引
put product
put category
#设置索引mapping(结构映射)
post product/_mapping
{
"properties" : {
"@timestamp" : {
"type" : "date"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"createTime" : {
"type" : "date"
},
"description" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"id" : {
"type" : "long"
},
"level1Id" : {
"type" : "long"
},
"level2Id" : {
"type" : "long"
},
"level3Id" : {
"type" : "long"
},
"mainImg" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"name" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"price" : {
"type" : "float"
},
"status" : {
"type" : "long"
},
"stock" : {
"type" : "long"
},
"type" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"updateTime" : {
"type" : "date"
}
}
}
post category/_mapping
{
"properties" : {
"@timestamp" : {
"type" : "date"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"createTime" : {
"type" : "date"
},
"icon" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"id" : {
"type" : "long"
},
"level" : {
"type" : "long"
},
"name" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"parentId" : {
"type" : "long"
},
"status" : {
"type" : "long"
},
"type" : {
"analyzer" : "ik_max_word",
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"updateTime" : {
"type" : "date"
}
}
}
#删除last_run_metadata_path文件,启动logstash重新同步数据