10.1 安装 Logstash

  1. 下载 Logstash

https://www.elastic.co/cn/downloads/logstash,下载 windows 版本
image.png

  1. 解压 zip 包,复制 elasticsearch 下的 config\certs 文件复制到 Logstash 的 config\certs
  2. 进入 Logstash 的 config 文件夹,新建 sqlserver.conf 文件,这里以连接 sql server 为例
    1. input {
    2. jdbc {
    3. # jdbc 驱动包位置
    4. jdbc_driver_library => "D:\logstash-8.2.0\mssql-jdbc-9.4.1.jre8.jar"
    5. # 要使用驱动包的类
    6. jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    7. # sqlserver 连接信息
    8. jdbc_connection_string => "jdbc:sqlserver://172.18.1.33;databaseName=DISEASE_SURVEILLANCE"
    9. # 数据库用户名
    10. jdbc_user => "sa"
    11. # 数据库密码
    12. jdbc_password => "123456"
    13. # 定时任务,多久执行一次,默认一分钟执行一次,要想没有延迟,可以使用 * * * * * *
    14. schedule => "* * * * *"
    15. use_column_value => true
    16. tracking_column => "id"
    17. # 你要执行的语句
    18. statement => "SELECT * FROM T_CDC_GIS_POI_INFO WHERE id > :sql_last_value"
    19. }
    20. }
    21. filter {
    22. mutate {
    23. remove_field => ["@version", "@timestamp"]
    24. }
    25. }
    26. output {
    27. elasticsearch {
    28. hosts => ["https://172.18.1.31:1001"]
    29. # es 用户名
    30. user => "elastic"
    31. # es 密码
    32. password => "5_QD9o4D-VZn0dUJDO=c"
    33. # es 的 ca 副本文件路径
    34. cacert => "D:\logstash-8.2.0\config\certs\http_ca.crt"
    35. # 启动 ssl 连接
    36. ssl => true
    37. # 索引
    38. index => "poi"
    39. # _id
    40. document_id => "%{id}"
    41. }
    42. }
    schedule => “ *” 表示每隔一分钟重复执行一次数据读取的操作,它支持 crontab 的语法,所以我们可以根据需要灵活设置读取数据的间隔。内置变量 sql_last_value 会在本地保存一个值,它记录了上次读取的最后一条记录中的一个值,如果 use_column_value 被设置为 true 且 tracking_column 被设置为 “id”,则 sql_last_value 保存的就是 id 列的最后一个值(在关系型数据库中,id 列是比较常见的设计)。
    上面配置的含义为:
  • sql_last_value 变量总是记录上次读取的最后一条记录中的 id。
  • 每隔一分钟执行一次数据读取操作。
  • 每次只读取上次读取后新增的数据。

    10.2 启动 Logstash

    进入 Logstash 的 bin 目录,新建 startup.cmd 文件,输入如下内容后启动即可
    1. ./logstash.bat -f D:\logstash-8.2.0\config\sqlserver.conf
    进入 kibana,Stack Management——索引管理,发现索引已经成功创建
    image.png