10.1 安装 Logstash
- 下载 Logstash
https://www.elastic.co/cn/downloads/logstash,下载 windows 版本
- 解压 zip 包,复制 elasticsearch 下的 config\certs 文件复制到 Logstash 的 config\certs
- 进入 Logstash 的 config 文件夹,新建 sqlserver.conf 文件,这里以连接 sql server 为例
schedule => “ *” 表示每隔一分钟重复执行一次数据读取的操作,它支持 crontab 的语法,所以我们可以根据需要灵活设置读取数据的间隔。内置变量 sql_last_value 会在本地保存一个值,它记录了上次读取的最后一条记录中的一个值,如果 use_column_value 被设置为 true 且 tracking_column 被设置为 “id”,则 sql_last_value 保存的就是 id 列的最后一个值(在关系型数据库中,id 列是比较常见的设计)。input {jdbc {# jdbc 驱动包位置jdbc_driver_library => "D:\logstash-8.2.0\mssql-jdbc-9.4.1.jre8.jar"# 要使用驱动包的类jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"# sqlserver 连接信息jdbc_connection_string => "jdbc:sqlserver://172.18.1.33;databaseName=DISEASE_SURVEILLANCE"# 数据库用户名jdbc_user => "sa"# 数据库密码jdbc_password => "123456"# 定时任务,多久执行一次,默认一分钟执行一次,要想没有延迟,可以使用 * * * * * *schedule => "* * * * *"use_column_value => truetracking_column => "id"# 你要执行的语句statement => "SELECT * FROM T_CDC_GIS_POI_INFO WHERE id > :sql_last_value"}}filter {mutate {remove_field => ["@version", "@timestamp"]}}output {elasticsearch {hosts => ["https://172.18.1.31:1001"]# es 用户名user => "elastic"# es 密码password => "5_QD9o4D-VZn0dUJDO=c"# es 的 ca 副本文件路径cacert => "D:\logstash-8.2.0\config\certs\http_ca.crt"# 启动 ssl 连接ssl => true# 索引index => "poi"# _iddocument_id => "%{id}"}}
上面配置的含义为:

