1.安装Logstash
- 官网下载压缩包
- 解压
tar -zxvf logstash-7.15.0-linux-x86_64.tar.gz - 启动logstash
bin/logstash -e 'input { stdin {} } output { stdout {} }'
选项 -e 的意思是允许你从命令行指定配置 - 输入hello world
2.MYSQL数据源
CREATE TABLE `goods_store` (`id` int NOT NULL AUTO_INCREMENT COMMENT '主键id',`store_name` varchar(256) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺名称',`store_introduction` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺简介',`store_brand` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺品牌',`open_date` date DEFAULT NULL COMMENT '开店时间',`store_photo` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺图片',`store_tags` varchar(256) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺标签',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (1, '小米销售店', '小米产品销售店', '小米', '2021-06-20', '1', '手机,电子');INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (2, '华为销售店', '华为产品销售店', '华为', '2021-06-10', '1', '手机,电子');INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (3, '苹果销售店', '苹果产品销售店', '苹果', '2021-06-27', '1', '手机,电子');
3.Pipeline配置
创建配置pipeline文件vim my-pipeline.conf,mysql-connection jar包需要手动复制到指定路径。
input {jdbc {jdbc_driver_library => "/software/elasticsearch/logstash-7.15.0/config/mysql-connector-java-8.0.26.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.101.4:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"jdbc_user => "root"jdbc_password => "123456"# cron表达式,定时读取schedule => "* * * * *"statement => "SELECT * FROM goods_store WHERE update_time >= :sql_last_value"# 使用递增列的值use_column_value => true# 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型tracking_column_type => "timestamp"# 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamptracking_column => "update_time"# 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改last_run_metadata_path => "syncpoint_table"}}filter {# 转换成日期格式ruby {code => "event.set('openDate', event.get('open_date').time.localtime.strftime('%Y-%m-%d'))"}# 下划线转驼峰ruby {code => "event.set('storeName', event.get('store_name'));event.set('storeIntroduction', event.get('store_introduction'));event.set('storeBrand', event.get('store_brand'));event.set('storePhoto', event.get('store_photo'));event.set('storeTags', event.get('store_tags'));"}# 移除logstash生成的不需要的字段mutate {remove_field => ["tags","@version","@timestamp","open_date","store_name","store_introduction"]}}output {elasticsearch {# ES的IP地址及端口hosts => ["192.168.113.140:9200"]# 索引名称 可自定义index => "store_index"# 需要关联的数据库中有一个id字段,对应类型中的iddocument_id => "%{id}"document_type => "store"}stdout {# JSON格式输出codec => json_lines}}
4.Logstash启动脚本
创建脚本
start-logstash.sh#!/bin/bashcd /software/elasticsearch/logstash-7.15.0/binnohup ./logstash -f ../config/my-pipeline.conf > /software/elasticsearch/logs/start-logstash.log 2>&1 &
授权
chmod +x start-logstash.sh,chmod 777 start-logstash.sh- 启动脚本
start-logstash.sh - 查看ES索引文档

