1.安装Logstash

  1. 官网下载压缩包
  2. 解压tar -zxvf logstash-7.15.0-linux-x86_64.tar.gz
  3. 启动logstashbin/logstash -e 'input { stdin {} } output { stdout {} }'
    选项 -e 的意思是允许你从命令行指定配置
  4. 输入hello world

874963-20180811153425324-1597854249.png

2.MYSQL数据源

  1. CREATE TABLE `goods_store` (
  2. `id` int NOT NULL AUTO_INCREMENT COMMENT '主键id',
  3. `store_name` varchar(256) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺名称',
  4. `store_introduction` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺简介',
  5. `store_brand` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺品牌',
  6. `open_date` date DEFAULT NULL COMMENT '开店时间',
  7. `store_photo` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺图片',
  8. `store_tags` varchar(256) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '店铺标签',
  9. `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
  12. INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (1, '小米销售店', '小米产品销售店', '小米', '2021-06-20', '1', '手机,电子');
  13. INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (2, '华为销售店', '华为产品销售店', '华为', '2021-06-10', '1', '手机,电子');
  14. INSERT INTO goods_store (id, store_name, store_introduction, store_brand, open_date, store_photo, store_tags) VALUES (3, '苹果销售店', '苹果产品销售店', '苹果', '2021-06-27', '1', '手机,电子');

3.Pipeline配置

创建配置pipeline文件vim my-pipeline.conf,mysql-connection jar包需要手动复制到指定路径。

  1. input {
  2. jdbc {
  3. jdbc_driver_library => "/software/elasticsearch/logstash-7.15.0/config/mysql-connector-java-8.0.26.jar"
  4. jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  5. jdbc_connection_string => "jdbc:mysql://192.168.101.4:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
  6. jdbc_user => "root"
  7. jdbc_password => "123456"
  8. # cron表达式,定时读取
  9. schedule => "* * * * *"
  10. statement => "SELECT * FROM goods_store WHERE update_time >= :sql_last_value"
  11. # 使用递增列的值
  12. use_column_value => true
  13. # 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
  14. tracking_column_type => "timestamp"
  15. # 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
  16. tracking_column => "update_time"
  17. # 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
  18. last_run_metadata_path => "syncpoint_table"
  19. }
  20. }
  21. filter {
  22. # 转换成日期格式
  23. ruby {
  24. code => "event.set('openDate', event.get('open_date').time.localtime.strftime('%Y-%m-%d'))"
  25. }
  26. # 下划线转驼峰
  27. ruby {
  28. code => "event.set('storeName', event.get('store_name'));
  29. event.set('storeIntroduction', event.get('store_introduction'));
  30. event.set('storeBrand', event.get('store_brand'));
  31. event.set('storePhoto', event.get('store_photo'));
  32. event.set('storeTags', event.get('store_tags'));"
  33. }
  34. # 移除logstash生成的不需要的字段
  35. mutate {
  36. remove_field => ["tags","@version","@timestamp","open_date","store_name","store_introduction"]
  37. }
  38. }
  39. output {
  40. elasticsearch {
  41. # ES的IP地址及端口
  42. hosts => ["192.168.113.140:9200"]
  43. # 索引名称 可自定义
  44. index => "store_index"
  45. # 需要关联的数据库中有一个id字段,对应类型中的id
  46. document_id => "%{id}"
  47. document_type => "store"
  48. }
  49. stdout {
  50. # JSON格式输出
  51. codec => json_lines
  52. }
  53. }

4.Logstash启动脚本

  1. 创建脚本start-logstash.sh

    1. #!/bin/bash
    2. cd /software/elasticsearch/logstash-7.15.0/bin
    3. nohup ./logstash -f ../config/my-pipeline.conf > /software/elasticsearch/logs/start-logstash.log 2>&1 &
  2. 授权chmod +x start-logstash.shchmod 777 start-logstash.sh

  3. 启动脚本start-logstash.sh
  4. 查看ES索引文档

image.png
资料:https://www.cnblogs.com/toov5/p/11355596.html