1.安装Logstash
- 官网下载压缩包
- 解压
tar -zxvf logstash-7.15.0-linux-x86_64.tar.gz
- 启动logstash
bin/logstash -e 'input { stdin {} } output { stdout {} }'
选项 -e 的意思是允许你从命令行指定配置 - 输入hello world
2.MYSQL数据源
CREATE TABLE `goods_store` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键id',
`store_name` varchar(256) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺名称',
`store_introduction` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺简介',
`store_brand` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺品牌',
`open_date` date DEFAULT NULL COMMENT '开店时间',
`store_photo` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺图片',
`store_tags` varchar(256) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺标签',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (1, '小米销售店', '小米产品销售店', '小米', '2021-06-20', '1', '手机,电子');
INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (2, '华为销售店', '华为产品销售店', '华为', '2021-06-10', '1', '手机,电子');
INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (3, '苹果销售店', '苹果产品销售店', '苹果', '2021-06-27', '1', '手机,电子');
3.Pipeline配置
创建配置pipeline文件vim my-pipeline.conf
,mysql-connection jar包需要手动复制到指定路径。
input {
jdbc {
jdbc_driver_library => "/software/elasticsearch/logstash-7.15.0/config/mysql-connector-java-8.0.26.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.101.4:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
jdbc_user => "root"
jdbc_password => "123456"
# cron表达式,定时读取
schedule => "* * * * *"
statement => "SELECT * FROM goods_store WHERE update_time >= :sql_last_value"
# 使用递增列的值
use_column_value => true
# 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
tracking_column_type => "timestamp"
# 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
tracking_column => "update_time"
# 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
last_run_metadata_path => "syncpoint_table"
}
}
filter {
# 转换成日期格式
ruby {
code => "event.set('openDate', event.get('open_date').time.localtime.strftime('%Y-%m-%d'))"
}
# 下划线转驼峰
ruby {
code => "event.set('storeName', event.get('store_name'));
event.set('storeIntroduction', event.get('store_introduction'));
event.set('storeBrand', event.get('store_brand'));
event.set('storePhoto', event.get('store_photo'));
event.set('storeTags', event.get('store_tags'));"
}
# 移除logstash生成的不需要的字段
mutate {
remove_field => ["tags","@version","@timestamp","open_date","store_name","store_introduction"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.113.140:9200"]
# 索引名称 可自定义
index => "store_index"
# 需要关联的数据库中有一个id字段,对应类型中的id
document_id => "%{id}"
document_type => "store"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
4.Logstash启动脚本
创建脚本
start-logstash.sh
#!/bin/bash
cd /software/elasticsearch/logstash-7.15.0/bin
nohup ./logstash -f ../config/my-pipeline.conf > /software/elasticsearch/logs/start-logstash.log 2>&1 &
授权
chmod +x start-logstash.sh
,chmod 777 start-logstash.sh
- 启动脚本
start-logstash.sh
- 查看ES索引文档