Logstash 概述

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
image.png

Pipeline 概念

包含了 input (数据采集)、 filter(数据转换)、output(数据发送) 三个阶段的处理流程,目前支持 200+ 插件,支持队列管理。
数据在 Logstash 内部流转时的具体表现形式 被称之为 Logstash Event。数据在 input 阶段被转换为 Event,在 output 被转化成目标格式数据。Event 其实是一个 Java Object,在配置文件中,对 Event 进行增删改查。
image.png
Input 插件

一个 Pipeline 可以由多个 input 插件

  • Stdin 、 File
  • Beats 、 Log4J 、Elasticsearch、JDBC、Kafka、Rabbitmq、Redis
  • JMX、HTTP、Websocket、UDP、TCP
  • Google Cloud Storage / S3
  • Github / Twitter

Filter 插件

内置的 Filter Plugins(https://www.elastic.co/guide/en/logstash/7.1/filter-plugins.html

  • Mutate:操作 Event 的字段
  • Metrics:Aggregate metrics
  • Ruby:执行 Ruby 代码

Output 插件

常见的 Output Plugins:https://www.elastic.co/guide/en/logstash/7.1/output-plugins.html

  • Elasticsearch
  • Email 、 Pageduty
  • Influxdb 、 Kafka 、Mongodb、Opentsdb、Zabbix
  • Http、TCP、Websocket

Codec 插件

内置的 Codec Plugins:https://www.elastic.co/guide/en/logstash/7.1/codec-plugins.html

  • Line、Multiline
  • JSON 、Avro、Cef(ArcSight Common Event Format)
  • Dots 、Rubydebu

    Logstash 配置文件结构

    ```shell input {

}

filter {

}

output {

}

  1. <a name="ILXx3"></a>
  2. ## Logstash Queue
  3. In Memory Queue:进程奔溃,机器宕机,都会引发数据的丢失。<br />Persistent Queue:机器宕机数据也不会丢失,数据保证会被消费,可以代替 Kafka 等消息队列缓冲区的作用。
  4. <a name="cukX7"></a>
  5. # 同步数据库数据到 Elasticsearch
  6. > 需求:将数据库中的数据同步到 Elasticsearch
  7. - 需要把新增数据库信息同步到 Elasticsearch
  8. - 数据库信息 Update 后,需要能被更新到 Elasticsearch
  9. - 支持增量更新
  10. - 数据库信息被删除后,不能被 Elasticsearch 搜索到
  11. <a name="qYjUF"></a>
  12. ## JDBC Input Plugin
  13. ```shell
  14. input {
  15. jdbc {
  16. # mysql 数据库链接,db_fengche为数据库名 ?tinyInt1isBit=false 禁止整型转换为bool
  17. jdbc_connection_string => "jdbc:mysql://192.168.9.201/db_fengche?tinyInt1isBit=false"
  18. # 用户名和密码
  19. jdbc_user => "db_fengche"
  20. jdbc_password => "HbDZF63D5865BjjE"
  21. # 驱动
  22. jdbc_driver_library => "/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.28.jar"
  23. # 驱动类名
  24. jdbc_driver_class => "com.mysql.jdbc.Driver"
  25. jdbc_paging_enabled => "true"
  26. jdbc_page_size => "50000"
  27. # 启动追踪,如果为true,则需要指定 tracking_colum
  28. use_column_value => true
  29. # 指定追踪的字段
  30. tracking_column => "last_update"
  31. # 追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是 numeric
  32. tracking_column_type => "numeric"
  33. # 记录最后一次运行的结果
  34. record_last_run => true
  35. # 上次运行结果的保存位置
  36. last_run_metadata_path => "/logstash/config/goods/jdbc-position.txt"
  37. # 执行的sql 文件路径+名称
  38. statement_filepath => "/logstash/config/goods/jdbc.sql"
  39. # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
  40. schedule => "* * * * *"
  41. }
  42. }
  43. output {
  44. elasticsearch {
  45. # ES的IP地址及端口
  46. hosts => ["192.168.9.220:9200"]
  47. # 索引名称
  48. index => "goods"
  49. document_type => "_doc"
  50. # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
  51. document_id => "%{goods_id}"
  52. }
  53. stdout {
  54. # JSON格式输出
  55. codec => json_lines
  56. }
  57. }
  1. SELECT * FROM `fc_goods` where last_update > :sql_last_value

开启同步监控

  1. /logstash/bin/logstash -f /logstash/config/goods/jdbc.conf