基本原理

image.png

pipline

  • 包括inout、filter、output三个处理步骤,它是通过队列进行管理

    示例

  • 日志打印输出

    1. input{
    2. file{
    3. path => "E:/software/data/test.log"
    4. start_position => beginning
    5. }
    6. }
    7. filter{
    8. }
    9. output{
    10. stdout()
    11. }
  • mysql表数据同步


    • ``` 执行命令: ./logstash-7.9.3/bin/logstash -f ./logstash-7.9.3/config/mysql.conf mysql.conf: input { jdbc {

      jdbc相关配置

      jdbc_driver_library => “/home/admin/little-project-elasticsearch/mysql-connector-java-8.0.21.jar” jdbc_driver_class => “com.mysql.cj.jdbc.Driver” jdbc_connection_string => “jdbc:mysql://localhost:3306/elasticsearch?useUnicode=true&serverTimezone=GMT&characterEncoding=utf8&useOldAliasMetadataBehavior=true &allowMultiQueries=true” jdbc_user => “root” jdbc_password => “root”

      要执行的 sql

      statement => “SELECT * FROM t_goods_store”

      使用递增列的值

      use_column_value => true

      递增字段的类型,numeric表示数值类型,timestamp 表示时间戳类型

      tracking_column_type => “numeric”

      递增字段的名称

      tracking_column => “id”

      同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改

      last_run_metadata_path => “syncpoint_table” } }

filter {

  1. # 转换成日期格式
  2. ruby {
  3. code => "event.set('openDate', event.get('open_date').time.localtime.strftime('%Y-%m-%d'))"
  4. }
  5. # 下划线转驼峰
  6. ruby {
  7. code => "event.set('storeName', event.get('store_name'));
  8. event.set('storeIntroduction', event.get('store_introduction'));
  9. event.set('storeBrand', event.get('store_brand'));
  10. event.set('storePhoto', event.get('store_photo'));
  11. event.set('storeTags', event.get('store_tags'));"
  12. }
  13. # 移除logstash生成的不需要的字段
  14. mutate {
  15. remove_field => ["tags","@version","@timestamp","open_date","store_name","store_introduction","store_brand","store_photo","store_tags"]
  16. }

}

output { elasticsearch {

  1. # ES的IP地址及端口
  2. hosts => ["localhost:9200"]
  3. # 索引名称 可自定义
  4. index => "store_index"
  5. # 需要关联的数据库中有一个id字段,对应es中的id
  6. document_id => "%{id}"
  7. }
  8. stdout {
  9. # JSON格式输出,日志打印
  10. codec => json_lines
  11. }

} ```

logstash event

  • 它是数据信息在Logstash中流转的形式
  • 数据源的数据通过input进入到Logstash以后就会被转化为Logstash Event,这个Event会经过filter过滤最终event会通过output输出到目标数据源,也就是Elasticsearch中

image.png